* gnu/java/nio/SelectorImpl.java

(selectThreadMutex): New field.
	(selectThread): New field.
	(unhandledWakeup): New field.
	(implCloseSelector): Added skeleton code which
	synchronizes as per Sun JRE JavaDoc.
	(keys): Throw ClosedSelectorException if selector
	is closed.
	(selectNow): Added comment that we're faking out
	an immediate select with a one-microsecond-timeout one.
	(select): Use 0 instead of -1 for infinite timeout.
	(implSelect): Changed comment in declaration.
	(select): Added synchronized to method declaration.
	Added synchronization and wakeup support as per Sun
	JRE JavaDoc.
	(selectedKeys): Throw ClosedSelectorException if selector
	is closed.
	(wakeup): Implemented.
	(deregisterCancelledKeys): Synchronize on cancelled key
	set before deregistering.
	(register): Synchronize on key set before registering.
	* java/nio/channels/spi/AbstractSelector.java
	Added import for java.nio.channels.ClosedSelectorException.
	(close): Added synchronized to method declaration.
	(cancelledKeys): Throw ClosedSelectorException if selector
	is closed.
	(cancelKey): Synchronize on cancelled key set before key.

From-SVN: r74879
This commit is contained in:
Mohan Embar 2003-12-20 15:33:24 +00:00 committed by Mohan Embar
parent 59687e1890
commit 677f99cce5
3 changed files with 248 additions and 82 deletions

View file

@ -1,3 +1,33 @@
2003-12-20 Mohan Embar <gnustuff@thisiscool.com>
* gnu/java/nio/SelectorImpl.java
(selectThreadMutex): New field.
(selectThread): New field.
(unhandledWakeup): New field.
(implCloseSelector): Added skeleton code which
synchronizes as per Sun JRE JavaDoc.
(keys): Throw ClosedSelectorException if selector
is closed.
(selectNow): Added comment that we're faking out
an immediate select with a one-microsecond-timeout one.
(select): Use 0 instead of -1 for infinite timeout.
(implSelect): Changed comment in declaration.
(select): Added synchronized to method declaration.
Added synchronization and wakeup support as per Sun
JRE JavaDoc.
(selectedKeys): Throw ClosedSelectorException if selector
is closed.
(wakeup): Implemented.
(deregisterCancelledKeys): Synchronize on cancelled key
set before deregistering.
(register): Synchronize on key set before registering.
* java/nio/channels/spi/AbstractSelector.java
Added import for java.nio.channels.ClosedSelectorException.
(close): Added synchronized to method declaration.
(cancelledKeys): Throw ClosedSelectorException if selector
is closed.
(cancelKey): Synchronize on cancelled key set before key.
2003-12-20 Michael Koch <konqueror@gmx.de> 2003-12-20 Michael Koch <konqueror@gmx.de>
* Makefile.am (ordinary_java_source_files): * Makefile.am (ordinary_java_source_files):

View file

@ -65,6 +65,28 @@ public class SelectorImpl extends AbstractSelector
private Set keys; private Set keys;
private Set selected; private Set selected;
/**
* A dummy object whose monitor regulates access to both our
* selectThread and unhandledWakeup fields.
*/
private Object selectThreadMutex = new Object ();
/**
* Any thread that's currently blocked in a select operation.
*/
private Thread selectThread;
/**
* Indicates whether we have an unhandled wakeup call. This can
* be due to either wakeup() triggering a thread interruption while
* a thread was blocked in a select operation (in which case we need
* to reset this thread's interrupt status after interrupting the
* select), or else that no thread was on a select operation at the
* time that wakeup() was called, in which case the following select()
* operation should return immediately with nothing selected.
*/
private boolean unhandledWakeup;
public SelectorImpl (SelectorProvider provider) public SelectorImpl (SelectorProvider provider)
{ {
super (provider); super (provider);
@ -81,28 +103,44 @@ public class SelectorImpl extends AbstractSelector
protected final void implCloseSelector() protected final void implCloseSelector()
throws IOException throws IOException
{ {
// FIXME: We surely need to do more here. // Cancel any pending select operation.
wakeup(); wakeup();
synchronized (keys)
{
synchronized (selected)
{
synchronized (cancelledKeys ())
{
// FIXME: Release resources here.
}
}
}
} }
public final Set keys() public final Set keys()
{ {
if (!isOpen())
throw new ClosedSelectorException();
return Collections.unmodifiableSet (keys); return Collections.unmodifiableSet (keys);
} }
public final int selectNow() public final int selectNow()
throws IOException throws IOException
{ {
// FIXME: We're simulating an immediate select
// via a select with a timeout of one millisecond.
return select (1); return select (1);
} }
public final int select() public final int select()
throws IOException throws IOException
{ {
return select (-1); return select (0);
} }
// A timeout value of -1 means block forever. // A timeout value of 0 means block forever.
private static native int implSelect (int[] read, int[] write, private static native int implSelect (int[] read, int[] write,
int[] except, long timeout) int[] except, long timeout)
throws IOException; throws IOException;
@ -144,29 +182,89 @@ public class SelectorImpl extends AbstractSelector
return result; return result;
} }
public int select (long timeout) public synchronized int select (long timeout)
throws IOException throws IOException
{ {
if (!isOpen()) if (!isOpen())
throw new ClosedSelectorException (); throw new ClosedSelectorException();
if (keys == null) synchronized (keys)
{
synchronized (selected)
{ {
return 0;
}
deregisterCancelledKeys(); deregisterCancelledKeys();
// Set only keys with the needed interest ops into the arrays. // Set only keys with the needed interest ops into the arrays.
int[] read = getFDsAsArray (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT); int[] read = getFDsAsArray (SelectionKey.OP_READ
int[] write = getFDsAsArray (SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT); | SelectionKey.OP_ACCEPT);
int[] except = new int [0]; // FIXME: We dont need to check this yet int[] write = getFDsAsArray (SelectionKey.OP_WRITE
int anzahl = read.length + write.length + except.length; | SelectionKey.OP_CONNECT);
// FIXME: We dont need to check this yet
int[] except = new int [0];
// Test to see if we've got an unhandled wakeup call,
// in which case we return immediately. Otherwise,
// remember our current thread and jump into the select.
// The monitor for dummy object selectThreadMutex regulates
// access to these fields.
// FIXME: Not sure from the spec at what point we should
// return "immediately". Is it here or immediately upon
// entry to this function?
// NOTE: There's a possibility of another thread calling
// wakeup() immediately after our thread releases
// selectThreadMutex's monitor here, in which case we'll
// do the select anyway. Since calls to wakeup() and select()
// among different threads happen in non-deterministic order,
// I don't think this is an issue.
synchronized (selectThreadMutex)
{
if (unhandledWakeup)
{
unhandledWakeup = false;
return 0;
}
else
{
selectThread = Thread.currentThread ();
}
}
// Call the native select() on all file descriptors. // Call the native select() on all file descriptors.
int result = 0;
try
{
begin(); begin();
int result = implSelect (read, write, except, timeout); result = implSelect (read, write, except, timeout);
}
finally
{
end(); end();
}
// If our unhandled wakeup flag is set at this point,
// reset our thread's interrupt flag because we were
// awakened by wakeup() instead of an external thread
// interruption.
//
// NOTE: If we were blocked in a select() and one thread
// called Thread.interrupt() on the blocked thread followed
// by another thread calling Selector.wakeup(), then race
// conditions could make it so that the thread's interrupt
// flag is reset even though the Thread.interrupt() call
// "was there first". I don't think we need to care about
// this scenario.
synchronized (selectThreadMutex)
{
if (unhandledWakeup)
{
unhandledWakeup = false;
selectThread.interrupted ();
}
selectThread = null;
}
Iterator it = keys.iterator (); Iterator it = keys.iterator ();
@ -204,14 +302,14 @@ public class SelectorImpl extends AbstractSelector
{ {
ops = ops | SelectionKey.OP_WRITE; ops = ops | SelectionKey.OP_WRITE;
// if (key.channel ().isConnected ()) // if (key.channel ().isConnected ())
// { // {
// ops = ops | SelectionKey.OP_WRITE; // ops = ops | SelectionKey.OP_WRITE;
// } // }
// else // else
// { // {
// ops = ops | SelectionKey.OP_CONNECT; // ops = ops | SelectionKey.OP_CONNECT;
// } // }
} }
} }
@ -226,24 +324,50 @@ public class SelectorImpl extends AbstractSelector
// Set new ready ops // Set new ready ops
key.readyOps (key.interestOps () & ops); key.readyOps (key.interestOps () & ops);
} }
deregisterCancelledKeys(); deregisterCancelledKeys();
return result; return result;
} }
}
}
public final Set selectedKeys() public final Set selectedKeys()
{ {
if (!isOpen())
throw new ClosedSelectorException();
return selected; return selected;
} }
public final Selector wakeup() public final Selector wakeup()
{ {
return null; // IMPLEMENTATION NOTE: Whereas the specification says that
// thread interruption should trigger a call to wakeup, we
// do the reverse under the covers: wakeup triggers a thread
// interrupt followed by a subsequent reset of the thread's
// interrupt status within select().
// First, acquire the monitor of the object regulating
// access to our selectThread and unhandledWakeup fields.
synchronized (selectThreadMutex)
{
unhandledWakeup = true;
// Interrupt any thread which is currently blocked in
// a select operation.
if (selectThread != null)
selectThread.interrupt ();
}
return this;
} }
private final void deregisterCancelledKeys() private final void deregisterCancelledKeys()
{ {
Iterator it = cancelledKeys().iterator(); Set ckeys = cancelledKeys ();
synchronized (ckeys)
{
Iterator it = ckeys.iterator();
while (it.hasNext ()) while (it.hasNext ())
{ {
@ -251,6 +375,7 @@ public class SelectorImpl extends AbstractSelector
it.remove (); it.remove ();
} }
} }
}
protected SelectionKey register (SelectableChannel ch, int ops, Object att) protected SelectionKey register (SelectableChannel ch, int ops, Object att)
{ {
@ -282,7 +407,11 @@ public class SelectorImpl extends AbstractSelector
throw new InternalError ("No known channel type"); throw new InternalError ("No known channel type");
} }
synchronized (keys)
{
keys.add (result); keys.add (result);
}
result.interestOps (ops); result.interestOps (ops);
result.attach (att); result.attach (att);
return result; return result;

View file

@ -39,6 +39,7 @@ exception statement from your version. */
package java.nio.channels.spi; package java.nio.channels.spi;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.Set; import java.util.Set;
@ -64,7 +65,7 @@ public abstract class AbstractSelector extends Selector
* *
* @exception IOException If an error occurs * @exception IOException If an error occurs
*/ */
public final void close () throws IOException public final synchronized void close () throws IOException
{ {
if (closed) if (closed)
return; return;
@ -102,12 +103,18 @@ public abstract class AbstractSelector extends Selector
protected final Set cancelledKeys() protected final Set cancelledKeys()
{ {
if (!isOpen())
throw new ClosedSelectorException();
return cancelledKeys; return cancelledKeys;
} }
final void cancelKey (AbstractSelectionKey key) final void cancelKey (AbstractSelectionKey key)
{ {
cancelledKeys.remove (key); synchronized (cancelledKeys)
{
cancelledKeys.remove(key);
}
} }
/** /**