dev@grizzly.java.net

Re: AsyncWriter proposal

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Mon, 05 Nov 2007 11:12:05 -0500

Salut,

Oleksiy Stashok wrote:
> Hello,
>
> Here is the 2nd try for AsyncQueueWriter. This time I have it integrated
> and tested.
> Attached AsyncWriteQueue.zip has implementation of AsyncWriteQueue;
> AsyncWriteQueue-complete.diff has complete svn diff for implementation
> and integration code.
>
> In diff file you can see refactoring I'm proposing for Context class.
> Currently context class has some logic, which is executed by
> Pipeline(ThreadPool): callback handler, protocol chain execution. I
> propose to move this logic to ContextTask interface implementations,
> this will simplify Context class. Because currently we have 2
> possibilities for Context: execute callback handler, protocol chain.
> After adding AsyncWriteQueue - 3 possibilities... Think it's too much to
> hold in Context?

If the functionality is the same but better designed, I'm +1 :-)

>
> I was thinking about async reading, looks like we can apply it for UDP
> protocol. (Discussion with Radim on users_at_grizzly mailing list). This
> way we can try to simulate some features, which TCP protocol has, for
> the server side.

Not only TCP/UDP, but also Charlie have presented (not sure implemented)
last year an API where you can register a key/byteBuffer/size and get
notified only when all the bytes requested are available (similar to
what AIO/NIO.2 will do). I think we should keep that idea in mind when
designing that interface.

Comments inline ;-) (almost until the end of the mails :-))

I'm +1 for almost everything (great works!). The only -1 is the
Context.execute() swicth to Context.execute(ContextTask). We need to
keep backward compatibility with the current implementation because
Sailfin and (I suspect) the ORB will no longer works. I'm sure we can
find a way to implement your idea and still support that method.

Thanks

--Jeanfrancois


>
> Thanks.
>
> WBR,
> Alexey.
>
# This patch file was generated by NetBeans IDE
# Following Index: paths are relative to:
C:\Projects\Grizzly\trunk\modules\grizzly\src\main\java\com\sun\grizzly
# This patch can be applied using context Tools: Patch action on
respective folder.
# It uses platform neutral UTF-8 encoding and \n newlines.
# Above lines and this line are ignored by the patching process.
Index: async/AsyncQueueWritable.java
--- async/AsyncQueueWritable.java Locally New
+++ async/AsyncQueueWritable.java Locally New
@@ -0,0 +1,183 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+
+package com.sun.grizzly.async;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+/**
+ * Object, which is able to send <code>ByteBuffer</code> data
asynchronously,
+ * using queue.
+ *
+ * @author Alexey Stashok
+ */
+public interface AsyncQueueWritable {
+ /**
+ * Method writes <code>ByteBuffer</code> using async write queue.
+ * First, if write queue is empty - it tries to write
<code>ByteBuffer</code>
+ * directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise it will be just logged by
+ * Grizzly framework.
+ *
+ * @param buffer <code>ByteBuffer</code>
+ * @return true, if <code>ByteBuffer</code> was written completely,
false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer) throws IOException;
+
+ /**
+ * Method writes <code>ByteBuffer</code> using async write queue.
+ * First, if write queue is empty - it tries to write
<code>ByteBuffer</code>
+ * directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise, if the
<code>ByteBuffer</code> is
+ * added to a writing queue - exception notification will come via
+ * <code>AsyncWriteCallbackHandler</code>
+ *
+ * @param buffer <code>ByteBuffer</code>
+ * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
+ * which will get notified, when
+ * <code>ByteBuffer</code> will be
completely written
+ * @return true, if <code>ByteBuffer</code> was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException;
+
+ /**
+ * Method writes <code>ByteBuffer</code> using async write queue.
+ * First, if write queue is empty - it tries to write
<code>ByteBuffer</code>
+ * directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise, if the
<code>ByteBuffer</code> is
+ * added to a writing queue - exception notification will come via
+ * <code>AsyncWriteCallbackHandler</code>
+ *
+ * @param buffer <code>ByteBuffer</code>
+ * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
+ * which will get notified, when
+ * <code>ByteBuffer</code> will be
completely written
+ * @param isCloneByteBuffer if true - <code>AsyncQueueWriter</code>
+ * will clone given
+ * <code>ByteBuffer</code> before puting
it to the
+ * <code>AsyncWriteQueue</code>
+ * @return true, if <code>ByteBuffer</code> was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException;
+
+ /**
+ * Method sends <code>ByteBuffer</code> using async write queue.
+ * First, if write queue is empty - it tries to send
<code>ByteBuffer</code>
+ * to the given <code>SocketAddress</code> directly
+ * (without putting to the queue).
+ * If associated write queue is not empty or after direct sending
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise it will be just logged by
+ * Grizzly framework.
+ *
+ * @param dstAddress destination <code>SocketAddress</code> data will
+ * be sent to
+ * @param buffer <code>ByteBuffer</code>
+ * @return true, if <code>ByteBuffer</code> was written completely,
false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer)
+ throws IOException;
+
+ /**
+ * Method sends <code>ByteBuffer</code> using async write queue.
+ * First, if write queue is empty - it tries to send
<code>ByteBuffer</code>
+ * to the given <code>SocketAddress</code> directly
+ * (without putting to the queue).
+ * If associated write queue is not empty or after direct sending
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise, if the
<code>ByteBuffer</code> is
+ * added to a writing queue - exception notification will come via
+ * <code>AsyncWriteCallbackHandler</code>
+ *
+ * @param dstAddress destination <code>SocketAddress</code> data will
+ * be sent to
+ * @param buffer <code>ByteBuffer</code>
+ * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
+ * which will get notified, when
+ * <code>ByteBuffer</code> will be
completely written
+ * @return true, if <code>ByteBuffer</code> was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException;
+
+ /**
+ * Method sends <code>ByteBuffer</code> using async write queue.
+ * First, if write queue is empty - it tries to send
<code>ByteBuffer</code>
+ * to the given <code>SocketAddress</code> directly
+ * (without putting to the queue).
+ * If associated write queue is not empty or after direct sending
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise, if the
<code>ByteBuffer</code> is
+ * added to a writing queue - exception notification will come via
+ * <code>AsyncWriteCallbackHandler</code>
+ *
+ * @param dstAddress destination <code>SocketAddress</code> data will
+ * be sent to
+ * @param buffer <code>ByteBuffer</code>
+ * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
+ * which will get notified, when
+ * <code>ByteBuffer</code> will be
completely written
+ * @param isCloneByteBuffer if true - <code>AsyncQueueWriter</code>
+ * will clone given
+ * <code>ByteBuffer</code> before puting
it to the
+ * <code>AsyncWriteQueue</code>
+ * @return true, if <code>ByteBuffer</code> was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException;
+}
Index: async/AsyncQueueWriter.java
--- async/AsyncQueueWriter.java Locally New
+++ async/AsyncQueueWriter.java Locally New
@@ -0,0 +1,247 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+
+/**
+ * Common inteface to be implemented by protocol dependant asynchronous
queue
+ * writers implementations
+ *
+ * @author Alexey Stashok
+ */
+public interface AsyncQueueWriter {
+ /**
+ * Method writes <code>ByteBuffer</code> to the
<code>SelectableChannel</code>
+ * First, if <code>SelectableChannel</code> associated write queue
is empty -
+ * it tries to write <code>ByteBuffer</code> to the given
+ * <code>SelectableChannel</code> directly (without putting to the
queue).
+ * If associated write queue is not empty or after direct writing
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>
+ * and <code>SelectableChannel<code> will be registered on
+ * <code>SelectorHandler</code>, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise it will be just logged by
+ * Grizzly framework.
+ *
+ * @param key <code>SelectionKey</code> associated with
+ * <code>SelectableChannel</code> <code>ByteBuffer</code>
+ * should be written to
+ * @param buffer <code>ByteBuffer</code>
+ * @return true, if <code>ByteBuffer</code> was written completely,
false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer) throws
IOException;
+
+ /**
+ * Method writes <code>ByteBuffer</code> to the
<code>SelectableChannel</code>
+ * First, if <code>SelectableChannel</code> associated write queue
is empty -
+ * it tries to write <code>ByteBuffer</code> to the given
+ * <code>SelectableChannel</code> directly (without putting to the
queue).
+ * If associated write queue is not empty or after direct writing
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>
+ * and <code>SelectableChannel<code> will be registered on
+ * <code>SelectorHandler</code>, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be
propogated

Typo propagated

+ * to the caller directly, otherwise, if the
<code>ByteBuffer</code> is
+ * added to a writing queue - exception notification will come via
+ * <code>AsyncWriteCallbackHandler</code>

add the AsyncWriteCallbackHandler.onIOException()

+ *
+ * @param key <code>SelectionKey</code> associated with
+ * <code>SelectableChannel</code> <code>ByteBuffer</code>
+ * should be written to
+ * @param buffer <code>ByteBuffer</code>
+ * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
+ * which will get notified, when
+ * <code>ByteBuffer</code> will be
completely written
+ * @return true, if <code>ByteBuffer</code> was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException;
+
+ /**
+ * Method writes <code>ByteBuffer</code> to the
<code>SelectableChannel</code>
+ * First, if <code>SelectableChannel</code> associated write queue
is empty -
+ * it tries to write <code>ByteBuffer</code> to the given
+ * <code>SelectableChannel</code> directly (without putting to the
queue).
+ * If associated write queue is not empty or after direct writing
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>
+ * and <code>SelectableChannel<code> will be registered on
+ * <code>SelectorHandler</code>, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise, if the
<code>ByteBuffer</code> is
+ * added to a writing queue - exception notification will come via
+ * <code>AsyncWriteCallbackHandler</code>
+ *
+ * @param key <code>SelectionKey</code> associated with
+ * <code>SelectableChannel</code> <code>ByteBuffer</code>
+ * should be written to
+ * @param buffer <code>ByteBuffer</code>
+ * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
+ * which will get notified, when
+ * <code>ByteBuffer</code> will be
completely written
+ * @param isCloneByteBuffer if true - <code>AsyncQueueWriter</code>
will
+ * clone given
+ * <code>ByteBuffer</code> before puting
it to the
+ * <code>AsyncWriteQueue</code>
+ * @return true, if <code>ByteBuffer</code> was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean
isCloneByteBuffer)
+ throws IOException;
+
+ /**
+ * Method sends <code>ByteBuffer</code> to the
<code>SocketAddress</code>
+ * First, if <code>SelectableChannel</code> associated write queue
is empty -
+ * it tries to write <code>ByteBuffer</code> to the given
+ * <code>SocketAddress</code> directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>
+ * and <code>SelectableChannel<code> will be registered on
+ * <code>SelectorHandler</code>, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise it will be just logged by
+ * Grizzly framework.
+ *
+ * @param key <code>SelectionKey</code> associated with
+ * <code>SelectableChannel</code>, which will be used to
+ * send<code>ByteBuffer</code> to
+ * @param dstAddress destination address <code>ByteBuffer</code>
will be sent to
+ * @param buffer <code>ByteBuffer</code>
+ * @return true, if <code>ByteBuffer</code> was written completely,
false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer) throws IOException;
+
+ /**
+ * Method sends <code>ByteBuffer</code> to the
<code>SocketAddress</code>
+ * First, if <code>SelectableChannel</code> associated write queue
is empty -
+ * it tries to write <code>ByteBuffer</code> to the given
+ * <code>SocketAddress</code> directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>
+ * and <code>SelectableChannel<code> will be registered on
+ * <code>SelectorHandler</code>, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise, if the
<code>ByteBuffer</code> is
+ * added to a writing queue - exception notification will come via
+ * <code>AsyncWriteCallbackHandler</code>
+ *
+ * @param key <code>SelectionKey</code> associated with
+ * <code>SelectableChannel</code> <code>ByteBuffer</code>
+ * should be written to
+ * @param dstAddress destination address <code>ByteBuffer</code>
will be sent to
+ * @param buffer <code>ByteBuffer</code>
+ * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
+ * which will get notified, when
+ * <code>ByteBuffer</code> will be
completely written
+ * @return true, if <code>ByteBuffer</code> was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler)
+ throws IOException;
+
+ /**
+ * Method sends <code>ByteBuffer</code> to the
<code>SocketAddress</code>
+ * First, if <code>SelectableChannel</code> associated write queue
is empty -
+ * it tries to write <code>ByteBuffer</code> to the given
+ * <code>SocketAddress</code> directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * <code>ByteBuffer</code> still has ready data to be written -
+ * <code>ByteBuffer</code> will be added to
<code>AsyncWriteQueue</code>
+ * and <code>SelectableChannel<code> will be registered on
+ * <code>SelectorHandler</code>, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be
propogated
+ * to the caller directly, otherwise, if the
<code>ByteBuffer</code> is
+ * added to a writing queue - exception notification will come via
+ * <code>AsyncWriteCallbackHandler</code>
+ *
+ * @param key <code>SelectionKey</code> associated with
+ * <code>SelectableChannel</code> <code>ByteBuffer</code>
+ * should be written to
+ * @param dstAddress destination address <code>ByteBuffer</code>
will be sent to
+ * @param buffer <code>ByteBuffer</code>
+ * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
+ * which will get notified, when
+ * <code>ByteBuffer</code> will be
completely written
+ * @param isCloneByteBuffer if true - <code>AsyncQueueWriter</code>
will
+ * clone given
+ * <code>ByteBuffer</code> before puting
it to the
+ * <code>AsyncWriteQueue</code>
+ * @return true, if <code>ByteBuffer</code> was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException;
+
+ /**
+ * Checks whether there is any data in <code>AsyncWriteQueue</code>
ready
+ * for the given <code>SelectableChannel</code>
+ *
+ * @param key <code>SelectionKey</code> associated with
<code>SelectableChannel</code>
+ * @return true, if there is ready data. False otherwise.
+ */
+ public boolean hasReadyAsyncWriteData(SelectionKey key);
+
+ /**
+ * Callback method, which should be called by
<code>SelectorHandler</code> to
+ * notify, that given <code>SelectableChannel</code> is ready to
transmit data.
+ *
+ * @param key <code>SelectionKey</code> associated with
<code>SelectableChannel</code>
+ * @throws java.io.IOException
+ */
+ public void onWrite(SelectionKey key) throws IOException;
+
+ /**
+ * Callback method, which should be called by
<code>SelectorHandler</code> to
+ * notify, that given <code>SelectableChannel</code> is going to be
closed, so
+ * related <code>SelectableChannel</code> data could be released from
+ * <code>AsyncWriteQueue</code>
+ *
+ * @param <code>SelectableChannel</code>
+ * @throws java.io.IOException
+ */
+ public void onClose(SelectableChannel channel);
+
+ public void close();
+
+}
Index: async/AsyncQueueWriterContextTask.java
--- async/AsyncQueueWriterContextTask.java Locally New
+++ async/AsyncQueueWriterContextTask.java Locally New
@@ -0,0 +1,78 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import com.sun.grizzly.*;

Fix the '*' by listing the classes instead (Netbeans Netbeans Netbeans ;-))

+
+/**
+ * <code>AsyncQueueWriter</code> task, which will be executed by
+ * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
+ * is called.
+ *
+ * @author Alexey Stashok
+ */
+public class AsyncQueueWriterContextTask extends ContextTask {
+ private static final TaskPool<AsyncQueueWriterContextTask> taskPool =
+ new TaskPool<AsyncQueueWriterContextTask>() {
+ @Override
+ public AsyncQueueWriterContextTask newTask() {
+ return new AsyncQueueWriterContextTask();
+ }
+ };
+
+ private AsyncQueueWriter asyncQueueWriter;
+
+ public static AsyncQueueWriterContextTask poll() {
+ return taskPool.poll();
+ }
+
+ public static void offer(AsyncQueueWriterContextTask contextTask) {
+ contextTask.recycle();
+ taskPool.offer(contextTask);
+ }
+
+ public Object call() throws Exception {
+ try {
+ asyncQueueWriter.onWrite(context.getSelectionKey());

Can you make sure the getSelectionKey() is not null, and return an
IllegalStateException in case the Key is null? Just to make the code
robust, event if the key cannot be null.


+ } finally {
+ offer(this);
+ }
+
+ return null;
+ }
+
+ public AsyncQueueWriter getAsyncQueueWriter() {
+ return asyncQueueWriter;
+ }
+
+ public void setAsyncQueueWriter(AsyncQueueWriter asyncWriter) {
+ this.asyncQueueWriter = asyncWriter;
+ }
+
+ @Override
+ public void recycle() {
+ asyncQueueWriter = null;
+ super.recycle();
+ }
+}
Index: async/AsyncWriteCallbackHandler.java
--- async/AsyncWriteCallbackHandler.java Locally New
+++ async/AsyncWriteCallbackHandler.java Locally New
@@ -0,0 +1,38 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+
+/**
+ * Callback handler interface, used by <code>AsyncQueueWriter</code> to
notify
+ * custom code about completion of specific <code>ByteBuffer</code>
writing.
+ *
+ * @author Alexey Stashok
+ */
+public interface AsyncWriteCallbackHandler {
+ public void onWriteCompleted(SelectionKey key, ByteBuffer buffer);
+ public void onIOException(SelectionKey key, ByteBuffer buffer);
+}

The API looks good, but I would add more docs here as this interface
will be implemented by the user of Grizzly, and since we are lacking in
term of tutorial, better to have good docs :-)


Index: async/AsyncWriteQueue.java
--- async/AsyncWriteQueue.java Locally New
+++ async/AsyncWriteQueue.java Locally New
@@ -0,0 +1,144 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import java.nio.channels.SelectableChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Class represents collection of
+ * <code>SelectableChannel</code>s and correspondent queue of
+ * <code>ByteBuffer</code>, which should be written asynchronously.
+ * This implementation is TCP protocol specific.

To follow our convention, should this class be called TCPAsyncWriteQueue?

+ *
+ * @author Alexey Stashok
+ */
+public class AsyncWriteQueue<E>{
+ private Map<SelectableChannel, ChannelAsyncWriteEntry> queueMap =
+ new ConcurrentHashMap<SelectableChannel,
ChannelAsyncWriteEntry>();
+
+ /**
+ * Add data to the <code>AsyncWriteQueue</code>, corresponding to
the given
+ * <code>SelectableChannel</code>
+ *
+ * @param channel <code>SelectableChannel</code>
+ * @param queueRecord write data unit
+ */
+ public void offer(SelectableChannel channel, E queueRecord) {
+ ChannelAsyncWriteEntry entry =
obtainChannelAsyncWriteEntry(channel);
+ entry.queue.offer(queueRecord);
+ }
+
+ /**
+ * Get head element of <code>SelectableChannel</code> write queue.
+ * Element will not be removed from queue.
+ *
+ * @param channel <code>SelectableChannel</code>
+ *
+ * @return <code>AsyncQueueRecord</code> write data unit
+ */
+ public E peek(SelectableChannel channel) {
+ ChannelAsyncWriteEntry entry = queueMap.get(channel);
+ if (entry != null) {
+ return entry.queue.peek();
+ }
+
+ return null;
+ }
+
+ /**
+ * Get head element of <code>SelectableChannel</code> write queue.
+ * Element will be removed from queue.
+ *
+ * @param channel <code>SelectableChannel</code>
+ *
+ * @return <code>AsyncQueueRecord</code> write data unit
+ */
+ public E poll(SelectableChannel channel) {
+ ChannelAsyncWriteEntry entry = queueMap.get(channel);
+ if (entry != null) {
+ return entry.queue.poll();
+ }
+
+ return null;
+ }
+
+ /**
+ * Remove head element of <code>SelectableChannel</code> write queue.
+ *
+ * @param channel <code>SelectableChannel</code>
+ */
+ public void removeEntry(SelectableChannel channel) {
+ queueMap.remove(channel);
+ }
+
+ /**
+ * Get the size of <code>SelectableChannel</code> write queue.
+ *
+ * @param channel <code>SelectableChannel</code>
+ * @return size of <code>SelectableChannel</code> write queue.
+ */
+ public int size(SelectableChannel channel) {
+ ChannelAsyncWriteEntry entry = queueMap.get(channel);
+ return entry == null ? 0 : entry.queue.size();
+ }
+
+ public void clear() {
+ queueMap.clear();
+ }
+
+ protected ChannelAsyncWriteEntry
obtainChannelAsyncWriteEntry(SelectableChannel channel) {
+ ChannelAsyncWriteEntry entry = queueMap.get(channel);
+ if (entry == null) {
+ synchronized(channel) {

Minor: A ReentrantReadWriteLock might perform better than a Sync here

+ entry = queueMap.get(channel);
+ if (entry == null) {
+ entry = new ChannelAsyncWriteEntry();
+ queueMap.put(channel, entry);
+ }
+ }
+ }
+ return entry;
+ }
+
+ /**
+ * <code>AsyncWriteQueue</code> data unit
+ */
+ protected class ChannelAsyncWriteEntry {
+ public ConcurrentLinkedQueue<E> queue;
+ public ReentrantLock writeLock;
+ public ReentrantLock onWriteLock;
+ public ReentrantLock updateLock;
+
+ public ChannelAsyncWriteEntry() {
+ queue = new ConcurrentLinkedQueue<E>();
+ writeLock = new ReentrantLock();
+ onWriteLock = new ReentrantLock();
+ updateLock = new ReentrantLock();
+ }
+ }
+}
\ No newline at end of file
Index: async/TCPAsyncQueueWriter.java
--- async/TCPAsyncQueueWriter.java Locally New
+++ async/TCPAsyncQueueWriter.java Locally New
@@ -0,0 +1,255 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import com.sun.grizzly.*;
+import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * TCP implementation of <code>AsyncQueueWriter</code>
+ *
+ * @author Alexey Stashok
+ */
+public class TCPAsyncQueueWriter implements AsyncQueueWriter {
+ private SelectorHandler selectorHandler;
+ private AsyncWriteQueue<TCPAsyncQueueRecord> writeQueue;
+
+ public TCPAsyncQueueWriter(SelectorHandler selectorHandler) {
+ this.selectorHandler = selectorHandler;
+ writeQueue = new AsyncWriteQueue<TCPAsyncQueueRecord>();
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer) throws
IOException {
+ return write(key, buffer, null);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return write(key, buffer, callbackHandler, false);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean
isCloneByteBuffer) throws IOException {
+
+ SelectableChannel channel = key.channel();
+ // If AsyncWriteQueue is empty - try to write ByteBuffer here
+ ChannelAsyncWriteEntry channelEntry =
+ writeQueue.obtainChannelAsyncWriteEntry(channel);
+
+ ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
channelEntry.queue;
+
+ boolean hasWriteLock = false;
+
+ try {
+ if (queue.isEmpty() &&
+ (hasWriteLock = channelEntry.writeLock.tryLock())) {
+ ((WritableByteChannel) channel).write(buffer);
+ }
+
+ if (buffer.hasRemaining()) {
+ // clone ByteBuffer if required
+ if (isCloneByteBuffer) {
+ int size = buffer.remaining();
+ ByteBuffer newBuffer = buffer.isDirect() ?
+ ByteBuffer.allocateDirect(size) :
+ ByteBuffer.allocate(size);

Here we need to support VIEW and HEAP_ARRAY like the remaining of the
framework allow. We don't need it for the first release, but might be
good to file an RFE. I would think an application that use
ByteBuffer.wrap() would like to use the same type of bb when writing.

+
+ newBuffer.put(buffer);
+ newBuffer.position(0);
+ buffer = newBuffer;
+ }
+
+ // add new element to the queue
+ channelEntry.updateLock.lock();
+ queue.offer(new TCPAsyncQueueRecord(buffer,
+ callbackHandler));

Here should we try to cache TCPAsyncQueueRecord instead of calling 'new'
  for every write? I would think yes :-)

+
+ /*
+ * This check helps to avoid not required key registering
+ */
+ if (!channelEntry.onWriteLock.isLocked()) {
+ registerForWriting(key);
+ }
+
+ channelEntry.updateLock.unlock();
+
+ return false;
+ }
+ } finally {
+ if (hasWriteLock) {
+ channelEntry.writeLock.unlock();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean hasReadyAsyncWriteData(SelectionKey key) {
+ return writeQueue.size(key.channel()) > 0;
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public void onWrite(SelectionKey key) throws IOException {
+ SelectableChannel channel = key.channel();
+
+ ChannelAsyncWriteEntry channelEntry =
+ writeQueue.obtainChannelAsyncWriteEntry(channel);
+
+ boolean hasUpdateLock = false;
+ boolean hasOnWriteLock = false;
+
+ if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
+ try {
+ ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
channelEntry.queue;
+
+ while (queue.size() > 0) {
+ if (hasUpdateLock) {
+ channelEntry.updateLock.unlock();
+ hasUpdateLock = false;
+ }
+
+ TCPAsyncQueueRecord queueRecord = queue.peek();
+
+ ByteBuffer byteBuffer = queueRecord.byteBuffer;
+ try {
+ ((WritableByteChannel) channel).write(byteBuffer);
+ } catch (IOException e) {
+ if (queueRecord.callbackHandler != null) {
+ queueRecord.callbackHandler.onIOException(key,
+ byteBuffer);
+ }

else log the exception.

+ }
+
+ if (!byteBuffer.hasRemaining()) {
+ TCPAsyncQueueRecord removedQueueRecord =
+ queue.poll();
+ assert removedQueueRecord == queueRecord;
+
+ if (removedQueueRecord.callbackHandler != null) {
+
removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
+ }
+ } else {
+ hasOnWriteLock = false;
+ channelEntry.onWriteLock.unlock();
+ registerForWriting(key);
+ break;
+ }
+
+ if (queue.size() == 0) {
+ /*
+ * If queue is empty - there is possibility
that write method
+ * will add entry, but it will not be
processed, because
+ * onWriteLock is currently locked
+ */
+ channelEntry.updateLock.lock();
+ hasUpdateLock = true;
+ }
+ }
+ } finally {
+ if (hasOnWriteLock) {
+ channelEntry.onWriteLock.unlock();
+ }
+
+ if (hasUpdateLock) {
+ channelEntry.updateLock.unlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
ByteBuffer buffer) throws IOException {
+ throw new UnsupportedOperationException("Not supported for TCP
transport.");
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws
IOException {
+ throw new UnsupportedOperationException("Not supported for TCP
transport.");
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, boolean
isCloneByteBuffer) throws IOException {
+ throw new UnsupportedOperationException("Not supported for TCP
transport.");
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public void onClose(SelectableChannel channel) {
+ writeQueue.removeEntry(channel);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public void close() {
+ writeQueue.clear();
+ writeQueue = null;
+ }
+
+ private void registerForWriting(SelectionKey key) {
+ selectorHandler.register(key, SelectionKey.OP_WRITE);
+ }
+
+ /**
+ * <code>AsyncWriteQueue</code> data unit specific for TCP protocol
+ */
+ protected static class TCPAsyncQueueRecord {
+ public ByteBuffer byteBuffer;
+ public AsyncWriteCallbackHandler callbackHandler;
+
+ public TCPAsyncQueueRecord(ByteBuffer byteBuffer,
+ AsyncWriteCallbackHandler callbackHandler) {
+ this.byteBuffer = byteBuffer;
+ this.callbackHandler = callbackHandler;
+ }
+ }
+}
Index: async/UDPAsyncQueueWriter.java
--- async/UDPAsyncQueueWriter.java Locally New
+++ async/UDPAsyncQueueWriter.java Locally New
@@ -0,0 +1,271 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import com.sun.grizzly.*;
+import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+
+/**
+ * UDP implementation of <code>AsyncQueueWriter</code>
+ *
+ * @author Alexey Stashok
+ */
+public class UDPAsyncQueueWriter implements AsyncQueueWriter {
+ private SelectorHandler selectorHandler;
+ private AsyncWriteQueue<UDPAsyncQueueRecord> writeQueue;
+
+ public UDPAsyncQueueWriter(SelectorHandler selectorHandler) {
+ this.selectorHandler = selectorHandler;
+ writeQueue = new AsyncWriteQueue<UDPAsyncQueueRecord>();
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer) throws
IOException {
+ return write(key, null, buffer, null);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return write(key, null, buffer, callbackHandler, false);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean
isCloneByteBuffer) throws IOException {
+ return write(key, null, buffer, callbackHandler,
isCloneByteBuffer);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer) throws IOException {
+ return write(key, dstAddress, buffer, null);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler)
+ throws IOException {
+ return write(key, dstAddress, buffer, callbackHandler, false);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException {
+
+ SelectableChannel channel = key.channel();
+ // If AsyncWriteQueue is empty - try to write ByteBuffer here
+ ChannelAsyncWriteEntry channelEntry =
+ writeQueue.obtainChannelAsyncWriteEntry(channel);
+
+ ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
channelEntry.queue;
+
+ boolean hasWriteLock = false;
+
+ try {
+ if (queue.isEmpty() &&
+ (hasWriteLock = channelEntry.writeLock.tryLock())) {
+ doWrite((WritableByteChannel) channel, dstAddress, buffer);
+ }
+
+ if (buffer.hasRemaining()) {
+ // clone ByteBuffer if required
+ if (isCloneByteBuffer) {
+ int size = buffer.remaining();
+ ByteBuffer newBuffer = buffer.isDirect() ?
+ ByteBuffer.allocateDirect(size) :
+ ByteBuffer.allocate(size);
+
+ newBuffer.put(buffer);
+ newBuffer.position(0);
+ buffer = newBuffer;
+ }

Same as above. Support HEAP and VIEW_HEAP/DIRECT via ByteBufferFactory

+
+ // add new element to the queue
+ channelEntry.updateLock.lock();
+ queue.offer(new UDPAsyncQueueRecord(dstAddress, buffer,
+ callbackHandler));
+
+ /*
+ * This check helps to avoid not required key registering
+ */
+ if (!channelEntry.onWriteLock.isLocked()) {
+ registerForWriting(key);
+ }
+
+ channelEntry.updateLock.unlock();
+
+ return false;
+ }
+ } finally {
+ if (hasWriteLock) {
+ channelEntry.writeLock.unlock();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean hasReadyAsyncWriteData(SelectionKey key) {
+ return writeQueue.size(key.channel()) > 0;
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public void onWrite(SelectionKey key) throws IOException {
+ SelectableChannel channel = key.channel();
+
+ ChannelAsyncWriteEntry channelEntry =
+ writeQueue.obtainChannelAsyncWriteEntry(channel);
+
+ boolean hasUpdateLock = false;
+ boolean hasOnWriteLock = false;
+
+ if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
+ try {
+ ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
+ channelEntry.queue;
+
+ while (queue.size() > 0) {
+ if (hasUpdateLock) {
+ channelEntry.updateLock.unlock();
+ hasUpdateLock = false;
+ }
+
+ UDPAsyncQueueRecord queueRecord = queue.peek();
+
+ ByteBuffer byteBuffer = queueRecord.byteBuffer;
+ try {
+ doWrite((WritableByteChannel) channel,
+ queueRecord.dstAddress, byteBuffer);
+ } catch (IOException e) {
+ if (queueRecord.callbackHandler != null) {
+ queueRecord.callbackHandler.onIOException(key,
+ byteBuffer);
+ }
+ }

else log the exception.

+
+ if (!byteBuffer.hasRemaining()) {
+ UDPAsyncQueueRecord removedQueueRecord =
queue.poll();
+ assert removedQueueRecord == queueRecord;
+
+ if (removedQueueRecord.callbackHandler != null) {
+
removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
+ }
+ } else {
+ hasOnWriteLock = false;
+ channelEntry.onWriteLock.unlock();
+ registerForWriting(key);
+ break;
+ }
+
+ if (queue.size() == 0) {
+ /*
+ * If queue is empty - there is possibility
that write method
+ * will add entry, but it will not be
processed, because
+ * onWriteLock is currently locked
+ */
+ channelEntry.updateLock.lock();
+ hasUpdateLock = true;
+ }
+ }
+ } finally {
+ if (hasOnWriteLock) {
+ channelEntry.onWriteLock.unlock();
+ }
+
+ if (hasUpdateLock) {
+ channelEntry.updateLock.unlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public void onClose(SelectableChannel channel) {
+ writeQueue.removeEntry(channel);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public void close() {
+ writeQueue.clear();
+ writeQueue = null;
+ }
+
+ private void doWrite(WritableByteChannel channel, SocketAddress
dstAddress,
+ ByteBuffer byteBuffer) throws IOException {
+ if (dstAddress != null) {
+ ((DatagramChannel) channel).send(byteBuffer, dstAddress);
+ } else {
+ channel.write(byteBuffer);
+ }
+ }
+
+ private void registerForWriting(SelectionKey key) {
+ selectorHandler.register(key, SelectionKey.OP_WRITE);
+ }
+
+ /**
+ * <code>AsyncWriteQueue</code> data unit specific for TCP protocol
+ */
+ protected static class UDPAsyncQueueRecord extends
+ TCPAsyncQueueWriter.TCPAsyncQueueRecord {
+ public SocketAddress dstAddress;
+
+ public UDPAsyncQueueRecord(SocketAddress dstAddress, ByteBuffer
byteBuffer,
+ AsyncWriteCallbackHandler callbackHandler) {
+ super(byteBuffer, callbackHandler);
+ this.dstAddress = dstAddress;
+ }
+ }
+}
Index: CallbackHandlerContextTask.java
--- CallbackHandlerContextTask.java Locally New
+++ CallbackHandlerContextTask.java Locally New
@@ -0,0 +1,98 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly;
+
+import com.sun.grizzly.Context.OpType;
+import com.sun.grizzly.ContextTask.TaskPool;
+
+/**
+ * <code>CallbackHandler</code> task, which will be executed by
+ * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
+ * is called.
+ *
+ * @author Alexey Stashok
+ */
+public class CallbackHandlerContextTask extends ContextTask {
+ private static final TaskPool<CallbackHandlerContextTask> taskPool =
+ new TaskPool<CallbackHandlerContextTask>() {
+ @Override
+ public CallbackHandlerContextTask newTask() {
+ return new CallbackHandlerContextTask();
+ }
+ };
+
+ private CallbackHandler callBackHandler;
+
+ public static CallbackHandlerContextTask poll() {
+ return taskPool.poll();
+ }
+
+ public static void offer(CallbackHandlerContextTask contextTask) {
+ contextTask.recycle();
+ taskPool.offer(contextTask);
+ }
+
+ public Object call() throws Exception {
+ IOEvent ioEvent = context.getIOEvent();
+ OpType currentOpType = context.getCurrentOpType();
+
+ try {
+ if (currentOpType == OpType.OP_READ) {
+ callBackHandler.onRead(ioEvent);
+ } else if (currentOpType == OpType.OP_WRITE) {
+ callBackHandler.onWrite(ioEvent);
+ } else if (currentOpType == OpType.OP_CONNECT) {
+ callBackHandler.onConnect(ioEvent);
+ }
+ } finally {
+ if (ioEvent != null) {
+ // Prevent the CallbackHandler to re-use the context.
+ // TODO: This is still dangerous as the Context
might have been
+ // cached by the CallbackHandler.
+ ioEvent.attach(null);
+ ioEvent = null;
+ }
+
+ offer(this);
+ }
+
+ return null;
+ }
+
+ public CallbackHandler getCallBackHandler() {
+ return callBackHandler;
+ }
+
+ public void setCallBackHandler(CallbackHandler callBackHandler) {
+ this.callBackHandler = callBackHandler;
+ }
+
+ @Override
+ public void recycle() {
+ callBackHandler = null;
+ super.recycle();
+ }
+
+
+}
Index: Context.java
--- Context.java Base (BASE)
+++ Context.java Locally Modified (Based On LOCAL)
@@ -23,9 +23,14 @@

  package com.sun.grizzly;

+import com.sun.grizzly.async.AsyncQueueWritable;
+import com.sun.grizzly.async.AsyncQueueWriter;
+import com.sun.grizzly.async.AsyncWriteCallbackHandler;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
  import java.nio.channels.SelectionKey;
  import java.util.HashMap;
-import java.util.concurrent.Callable;

  /**
   * This Object is used to share information between the Grizzly Framework
@@ -33,7 +38,7 @@
   *
   * @author Jeanfrancois Arcand
   */
-public class Context implements Callable {
+public class Context {

      /**
       * A SelectionKey's registration state.
@@ -119,6 +124,16 @@


      /**
+ * <code>AsyncQueueWriter</code>
+ */
+ private AsyncQueueWriter asyncQueueWriter;
+
+ /**
+ * <code>AsyncQueueWritable</code>
+ */
+ private AsyncQueueWritable asyncQueueWritable;
+
+ /**
       * Constructor
       */
      public Context() {
@@ -210,6 +225,7 @@
          currentOpType = null;
          protocolChain = null;
          ioEvent = null;
+ asyncQueueWriter = null;
      }


@@ -233,47 +249,6 @@


      /**
- * Execute the <code>ProtocolChain</code>.
- * @throws java.lang.Exception Exception thrown by protocol chain
- */
- public Object call() throws Exception {
- // If a IOEvent has been defined, invoke it first and
- // let its associated CallbackHandler decide if the ProtocolChain
- // be invoked or not.
- Object attachment = key.attachment();
- if (ioEvent != null && (attachment instanceof CallbackHandler)){
- try{
- CallbackHandler callBackHandler =
((CallbackHandler)attachment);
- if (currentOpType == OpType.OP_READ){
- callBackHandler.onRead(ioEvent);
- } else if (currentOpType == OpType.OP_WRITE){
- callBackHandler.onWrite(ioEvent);
- } else if (currentOpType == OpType.OP_CONNECT){
- callBackHandler.onConnect(ioEvent);
- }
- } finally {
- if (ioEvent != null){
- // Prevent the CallbackHandler to re-use the context.
- // TODO: This is still dangerous as the Context
might have been
- // cached by the CallbackHandler.
- ioEvent.attach(null);
- ioEvent = null;
- }
- }
- } else {
- SelectionKey currentKey = key;
- selectorHandler.getSelectionKeyHandler().process(currentKey);
- try {
- protocolChain.execute(this);
- } finally {
-
selectorHandler.getSelectionKeyHandler().postProcess(currentKey);
- }
- }
- return null;
- }
-
-
- /**
       * Return <code>ProtocolChain</code> executed by this instance.
       * @return <code>ProtocolChain</code> instance
       */
@@ -313,9 +288,12 @@
       * Execute this Context using the Controller's Pipeline
       * @throws com.sun.grizzly.PipelineFullException
       */
- public void execute() throws PipelineFullException{


Here we must keep the execute() method available as it will breaks
existing application (like Sailfin).

- getPipeline().execute(this);
+ public void execute(ContextTask contextTask) throws
PipelineFullException {
+ if (contextTask != null) {
+ contextTask.setContext(this);
+ getPipeline().execute(contextTask);
      }

else throw IllegalStateException("ContextTask cannot be null");

+ }


      /**
@@ -392,4 +370,89 @@
      public void setSelectorHandler(SelectorHandler selectorHandler) {
          this.selectorHandler = selectorHandler;
      }
+
+
+ /**
+ * Returns <code>AsyncQueueWritable</code>, assciated with the current
+ * <code>Context</code>. This method is not threadsafe.
+ *
+ * @return <code>AsyncQueueWritable</code>
+ */
+ public AsyncQueueWritable getAsyncQueueWritable() {
+ if (asyncQueueWritable == null) {
+ asyncQueueWritable = new AsyncQueueWritableContextWrapper();
  }
+
+ return asyncQueueWritable;
+ }
+
+ /**
+ * Return the <code>AsyncQueueWriter</code>
+ * @return the <code>AsyncQueueWriter</code>
+ */
+ protected AsyncQueueWriter getAsyncQueueWriter() {
+ return asyncQueueWriter;
+ }
+
+ /**
+ * Set the <code>AsyncQueueWriter</code>
+ * @param asyncQueueWriter <code>AsyncQueueWriter</code>
+ */
+ protected void setAsyncQueueWriter(AsyncQueueWriter asyncQueueWriter) {
+ this.asyncQueueWriter = asyncQueueWriter;
+ }
+
+ private class AsyncQueueWritableContextWrapper implements
AsyncQueueWritable {
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer) throws
IOException {
+ return asyncQueueWriter.write(key, buffer);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws
IOException {
+ return asyncQueueWriter.write(key, buffer, callbackHandler);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException {
+ return asyncQueueWriter.write(key, buffer, callbackHandler,
+ isCloneByteBuffer);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer)
+ throws IOException {
+ return asyncQueueWriter.write(key, dstAddress, buffer);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws
IOException {
+ return asyncQueueWriter.write(key, dstAddress, buffer,
+ callbackHandler);
+ }
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean
isCloneByteBuffer)
+ throws IOException {
+ return asyncQueueWriter.write(key, dstAddress, buffer,
+ callbackHandler, isCloneByteBuffer);
+ }
+ }
+}
Index: ContextTask.java
--- ContextTask.java Locally New
+++ ContextTask.java Locally New
@@ -0,0 +1,70 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Task, which will be executed by <code>Context</code>, when
+ * Context.execute(<code>ContextTask</code>) is called.
+ *
+ * @author Alexey Stashok
+ */
+public abstract class ContextTask implements Callable {
+ protected Context context;
+
+ public Context getContext() {
+ return context;
+ }
+
+ public void setContext(Context context) {
+ this.context = context;
+ }
+
+ public void recycle() {
+ context = null;
+ }
+
+ protected abstract static class TaskPool<E extends ContextTask> {
+ private ConcurrentLinkedQueue<E> pool =
+ new ConcurrentLinkedQueue<E>();
+
+ public abstract E newTask();
+
+ public E poll() {
+ E task = pool.poll();
+ if (task == null) {
+ task = newTask();
+ }
+
+ return task;
+ }
+
+ public void offer(E task) {
+ task.recycle();
+ pool.offer(task);
+ }
+ }
+}
Index: Controller.java
--- Controller.java Base (BASE)
+++ Controller.java Locally Modified (Based On LOCAL)
@@ -298,6 +298,7 @@
                      key = iterator.next();
                      iterator.remove();
                      boolean skipOpWrite = false;
+ delegateToWorkerThread = false;
                      if (key.isValid()) {
                          if ((key.readyOps() & SelectionKey.OP_ACCEPT)
                                  == SelectionKey.OP_ACCEPT){
@@ -356,7 +357,7 @@
                              if (logger.isLoggable(Level.FINE)) {
                                  logger.log(Level.FINE, "OP_WRITE on "
+ key);
                              }
- delegateToWorkerThread = selectorHandler.
+ delegateToWorkerThread |= selectorHandler.
                                      onWriteInterest(key,serverCtx);
                          }

@@ -368,7 +369,9 @@
                              Context context =
pollContext(key,protocolChain);
                              context.setSelectorHandler(selectorHandler);
 
context.setPipeline(selectorHandler.pipeline());
- context.execute();
+ context.setAsyncQueueWriter(
+ selectorHandler.getAsyncQueueWriter());
+
context.execute(ProtocolChainContextTask.poll());

This bring an interesting point. Instead of setting the
setAsyncQueueWriter on every request, should the Context instance be
pooled per Selector instead of the Controller? That way once configured,
a Context can be pooled and never have to be re-configured again?



                          }
                      } else {
 
selectorHandler.getSelectionKeyHandler().cancel(key);
Index: filter/EchoAsyncWriteQueueFilter.java
--- filter/EchoAsyncWriteQueueFilter.java Locally New
+++ filter/EchoAsyncWriteQueueFilter.java Locally New
@@ -0,0 +1,81 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.filter;
+
+import com.sun.grizzly.Context;
+import com.sun.grizzly.Controller;
+import com.sun.grizzly.ProtocolFilter;
+import com.sun.grizzly.util.OutputWriter;
+import com.sun.grizzly.util.WorkerThread;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+
+/**
+ * Simple echo filter
+ *
+ * @author Alexey Stashok
+ */
+public class EchoAsyncWriteQueueFilter implements ProtocolFilter {
+ AtomicInteger counter = new AtomicInteger(1);
+ public boolean execute(Context ctx) throws IOException {
+ final WorkerThread workerThread =
((WorkerThread)Thread.currentThread());
+ ByteBuffer buffer = workerThread.getByteBuffer();
+ buffer.flip();
+ if (buffer.hasRemaining()) {
+ // Depending on protocol perform echo
+ try {
+ if (ctx.getProtocol() == Controller.Protocol.TCP) { //
TCP case
+
ctx.getAsyncQueueWritable().writeToAsyncQueue(buffer, null, true);
+ } else if (ctx.getProtocol() ==
Controller.Protocol.UDP) { // UDP case
+ SocketAddress address = (SocketAddress)
+ ctx.getAttribute(ReadFilter.UDP_SOCKETADDRESS);
+ ctx.getAsyncQueueWritable().writeToAsyncQueue(address,
+ buffer, null, true);
+ }
+ } catch (IOException ex) {
+ // Store incoming data in byte[]
+ byte[] data = new byte[buffer.remaining()];
+ int position = buffer.position();
+ buffer.get(data);
+ buffer.position(position);
+
+ Controller.logger().log(Level.WARNING,
+ "Exception. Echo \"" + new String(data) + "\"");
+ throw ex;
+ }
+ }
+
+ buffer.clear();
+ return false;
+ }
+
+ public boolean postExecute(Context ctx) throws IOException {
+ return true;
+ }
+}
Index: ProtocolChainContextTask.java
--- ProtocolChainContextTask.java Locally New
+++ ProtocolChainContextTask.java Locally New
@@ -0,0 +1,69 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+
+package com.sun.grizzly;
+
+import java.nio.channels.SelectionKey;
+
+/**
+ * <code>ProtocolChain</code> task, which will be executed by
+ * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
+ * is called.
+ *
+ * @author Alexey Stashok
+ */
+public class ProtocolChainContextTask extends ContextTask {
+ private static final TaskPool<ProtocolChainContextTask> taskPool =
+ new TaskPool<ProtocolChainContextTask>() {
+ @Override
+ public ProtocolChainContextTask newTask() {
+ return new ProtocolChainContextTask();
+ }
+ };
+
+ public static ProtocolChainContextTask poll() {
+ return taskPool.poll();
+ }
+
+ public static void offer(ProtocolChainContextTask contextTask) {
+ contextTask.recycle();
+ taskPool.offer(contextTask);
+ }
+
+ public Object call() throws Exception {
+ SelectionKey currentKey = context.getSelectionKey();
+ SelectionKeyHandler selectionKeyHandler = context.
+ getSelectorHandler().getSelectionKeyHandler();
+
+ selectionKeyHandler.process(currentKey);
+ try {
+ context.getProtocolChain().execute(context);
+ } finally {
+ selectionKeyHandler.postProcess(currentKey);
+ offer(this);
+ }
+
+ return null;
+ }
+}
Index: SelectorHandler.java
--- SelectorHandler.java Base (BASE)
+++ SelectorHandler.java Locally Modified (Based On LOCAL)
@@ -22,6 +22,7 @@
   */
  package com.sun.grizzly;

+import com.sun.grizzly.async.AsyncQueueWriter;
  import com.sun.grizzly.util.Copyable;
  import java.io.IOException;
  import java.nio.channels.SelectableChannel;
@@ -203,6 +204,16 @@


      /**
+ * Returns <code>AsyncQueueWriter</code> associated with this
+ * <code>SelectorHandler</code>. Method will return null, if this
+ * <code>TCPSelectorHandler</code> is not running.
+ *
+ * @return <code>AsyncQuquqWriter</code>
+ */
+ public AsyncQueueWriter getAsyncQueueWriter();
+
+
+ /**
       * Return the Pipeline used to execute this SelectorHandler's
       * SelectionKey ops
       * @return The pipeline to use, or null if the Controller's Pipeline
Index: TCPConnectorHandler.java
--- TCPConnectorHandler.java Base (BASE)
+++ TCPConnectorHandler.java Locally Modified (Based On LOCAL)
@@ -23,6 +23,8 @@
  package com.sun.grizzly;

  import com.sun.grizzly.Controller.Protocol;
+import com.sun.grizzly.async.AsyncWriteCallbackHandler;
+import com.sun.grizzly.async.AsyncQueueWritable;
  import com.sun.grizzly.util.ByteBufferInputStream;
  import com.sun.grizzly.util.OutputWriter;
  import java.io.IOException;
@@ -70,7 +72,8 @@
   *
   * @author Jeanfrancois Arcand
   */
-public class TCPConnectorHandler implements
ConnectorHandler<TCPSelectorHandler, CallbackHandler>{
+public class TCPConnectorHandler implements
+ ConnectorHandler<TCPSelectorHandler, CallbackHandler>,
AsyncQueueWritable {

      /**
       * The underlying TCPSelectorHandler used to mange SelectionKeys.
@@ -399,6 +402,72 @@


      /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer) throws
IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()),
buffer);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()),
buffer,
+ callbackHandler);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()),
buffer,
+ callbackHandler, isCloneByteBuffer);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer)
+ throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()),
dstAddress,
+ buffer);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()),
dstAddress,
+ buffer, callbackHandler);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean
isCloneByteBuffer)
+ throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()),
dstAddress,
+ buffer, callbackHandler, isCloneByteBuffer);
+ }
+
+
+ /**
       * Close the underlying connection.
       */
      public void close() throws IOException{
Index: TCPSelectorHandler.java
--- TCPSelectorHandler.java Base (BASE)
+++ TCPSelectorHandler.java Locally Modified (Based On LOCAL)
@@ -23,6 +23,9 @@

  package com.sun.grizzly;

+import com.sun.grizzly.async.AsyncQueueWriter;
+import com.sun.grizzly.async.TCPAsyncQueueWriter;
+import com.sun.grizzly.async.AsyncQueueWriterContextTask;
  import com.sun.grizzly.util.Cloner;
  import com.sun.grizzly.util.Copyable;
  import com.sun.grizzly.util.SelectionKeyOP;
@@ -171,6 +174,7 @@
       */
      protected ProtocolChainInstanceHandler instanceHandler;

+ protected AsyncQueueWriter asyncQueueWriter;

      public TCPSelectorHandler(){
      }
@@ -234,6 +238,10 @@
      public void preSelect(Context ctx) throws IOException {
          initOpRegistriesIfRequired();

+ if (asyncQueueWriter == null) {
+ asyncQueueWriter = new TCPAsyncQueueWriter(this);
+ }
+
          if (selector == null){
              try{
                  connectorInstanceHandler = new ConnectorInstanceHandler.
@@ -423,7 +431,7 @@
      /**
       * Shuntdown this instance by closing its Selector and associated
channels.
       */
- public void shutdown(){
+ public void shutdown() {
          if (selector != null){
              for (SelectionKey selectionKey : selector.keys()) {

                  selectionKeyHandler.close(selectionKey);
@@ -454,7 +462,11 @@
                      "selector.close",ex);
          }

+ if (asyncQueueWriter != null) {
+ asyncQueueWriter.close();
+ asyncQueueWriter = null;
      }
+ }

      /**
       * {_at_inheritDoc}
@@ -494,10 +506,11 @@
      throws IOException{
          // disable OP_READ on key before doing anything else
          key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
- if (key.attachment() instanceof CallbackHandler){
- final Context context = ctx.getController().pollContext(key);
+ Object attach = key.attachment();
+ if (attach instanceof CallbackHandler){
+ final Context context = pollContext(ctx, key);
              context.setCurrentOpType(Context.OpType.OP_READ);
- invokeCallbackHandler(context);
+ invokeCallbackHandler((CallbackHandler) attach, context);
              return false;
          } else {
              return true;
@@ -515,12 +528,19 @@
      throws IOException{
          // disable OP_WRITE on key before doing anything else
          key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
- if (key.attachment() instanceof CallbackHandler){
- final Context context = ctx.getController().pollContext(key);
- context.setSelectionKey(key);
+
+ Object attach = null;
+
+ if (asyncQueueWriter.hasReadyAsyncWriteData(key)) {
+ final Context context = pollContext(ctx, key);
              context.setCurrentOpType(Context.OpType.OP_WRITE);
- invokeCallbackHandler(context);
+ invokeAsyncQueueWriter(context);
              return false;
+ } else if ((attach = key.attachment()) instanceof CallbackHandler){
+ final Context context = pollContext(ctx, key);
+ context.setCurrentOpType(Context.OpType.OP_WRITE);
+ invokeCallbackHandler((CallbackHandler) attach, context);
+ return false;
          } else {
              return true;
          }
@@ -541,11 +561,11 @@
          key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
          key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));

- if (key.attachment() instanceof CallbackHandler){
- Context context = ctx.getController().pollContext(key);
- context.setSelectionKey(key);
+ Object attach = key.attachment();
+ if (attach instanceof CallbackHandler){
+ final Context context = pollContext(ctx, key);
              context.setCurrentOpType(Context.OpType.OP_CONNECT);
- invokeCallbackHandler(context);
+ invokeCallbackHandler((CallbackHandler) attach, context);
          }
          return false;
      }
@@ -556,13 +576,14 @@
       * @param context <code>Context</code>
       * @throws java.io.IOException
       */
- protected void invokeCallbackHandler(Context context) throws
IOException{
- context.setSelectorHandler(this);
-
+ protected void invokeCallbackHandler(CallbackHandler callbackHandler,
+ Context context) throws IOException{
          IOEvent<Context>ioEvent = new
IOEvent.DefaultIOEvent<Context>(context);
          context.setIOEvent(ioEvent);
          try {
- context.execute();
+ CallbackHandlerContextTask task =
CallbackHandlerContextTask.poll();
+ task.setCallBackHandler(callbackHandler);
+ context.execute(task);
          } catch (PipelineFullException ex){
              throw new IOException(ex.getMessage());
          }
@@ -570,6 +591,22 @@


      /**
+ * Invoke a <code>AsyncQueueWriter</code>
+ * @param context <code>Context</code>
+ * @throws java.io.IOException
+ */
+ protected void invokeAsyncQueueWriter(Context context) throws
IOException {
+ AsyncQueueWriterContextTask task =
AsyncQueueWriterContextTask.poll();
+ task.setAsyncQueueWriter(asyncQueueWriter);
+ try {
+ context.execute(task);
+ } catch (PipelineFullException ex) {
+ throw new IOException(ex.getMessage());
+ }
+ }
+
+
+ /**
       * Return an instance of the default <code>ConnectorHandler</code>,
       * which is the <code>TCPConnectorHandler</code>
       * @return <code>ConnectorHandler</code>
@@ -654,6 +691,13 @@
          this.selector = selector;
      }

+ /**
+ * {_at_inheritDoc}
+ */
+ public AsyncQueueWriter getAsyncQueueWriter() {
+ return asyncQueueWriter;
+ }
+
      public long getSelectTimeout() {
          return selectTimeout;
      }
@@ -855,8 +899,19 @@
          } catch (IOException ex){
              ; // LOG ME
          }
+
+ asyncQueueWriter.onClose(channel);
      }

+ private Context pollContext(final Context serverContext,
+ final SelectionKey key) {
+ final Context context =
serverContext.getController().pollContext(key);
+ context.setSelectionKey(key);
+ context.setSelectorHandler(this);
+ context.setAsyncQueueWriter(asyncQueueWriter);
+ return context;
+ }
+
      //--------------- ConnectorInstanceHandler
-----------------------------
      /**
       * Return <Callable>factory<Callable> object, which knows how
Index: UDPConnectorHandler.java
--- UDPConnectorHandler.java Base (BASE)
+++ UDPConnectorHandler.java Locally Modified (Based On LOCAL)
@@ -23,6 +23,8 @@

  package com.sun.grizzly;

+import com.sun.grizzly.async.AsyncWriteCallbackHandler;
+import com.sun.grizzly.async.AsyncQueueWritable;
  import com.sun.grizzly.util.ByteBufferInputStream;
  import java.io.IOException;
  import java.net.SocketAddress;
@@ -47,7 +49,8 @@
   *
   * @author Jeanfrancois Arcand
   */
-public class UDPConnectorHandler implements
ConnectorHandler<UDPSelectorHandler, CallbackHandler>{
+public class UDPConnectorHandler implements
+ ConnectorHandler<UDPSelectorHandler, CallbackHandler>,
AsyncQueueWritable {

      /**
       * The underlying UDPSelectorHandler used to mange SelectionKeys.
@@ -339,6 +342,72 @@


      /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer) throws
IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()),
buffer);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()),
buffer,
+ callbackHandler);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()),
buffer,
+ callbackHandler, isCloneByteBuffer);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer)
+ throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()),
dstAddress,
+ buffer);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()),
dstAddress,
+ buffer, callbackHandler);
+ }
+
+
+ /**
+ * {_at_inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress,
ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean
isCloneByteBuffer)
+ throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()),
dstAddress,
+ buffer, callbackHandler, isCloneByteBuffer);
+ }
+
+
+ /**
       * Receive bytes.
       * @param byteBuffer The byteBuffer to store bytes.
       * @param socketAddress
@@ -469,5 +538,4 @@
      public UDPSelectorHandler getSelectorHandler() {
          return selectorHandler;
      }
-
  }
Index: UDPSelectorHandler.java
--- UDPSelectorHandler.java Base (BASE)
+++ UDPSelectorHandler.java Locally Modified (Based On LOCAL)
@@ -23,6 +23,7 @@

  package com.sun.grizzly;

+import com.sun.grizzly.async.UDPAsyncQueueWriter;
  import com.sun.grizzly.util.Copyable;
  import com.sun.grizzly.util.SelectionKeyOP;
  import java.io.IOException;
@@ -92,6 +93,10 @@
      public void preSelect(Context ctx) throws IOException {
          initOpRegistriesIfRequired();

+ if (asyncQueueWriter == null) {
+ asyncQueueWriter = new UDPAsyncQueueWriter(this);
+ }
+
          if (selector == null){
              try{
                  connectorInstanceHandler = new ConnectorInstanceHandler.
@@ -174,7 +179,12 @@
              Controller.logger().log(Level.SEVERE,
                      "closeSocketException",ex);
          }
+
+ if (asyncQueueWriter != null) {
+ asyncQueueWriter.close();
+ asyncQueueWriter = null;
      }
+ }


      /**
@@ -258,6 +268,8 @@
          } catch (IOException ex){
              ; // LOG ME
          }
+
+ asyncQueueWriter.onClose(channel);
      }

      //--------------- ConnectorInstanceHandler
-----------------------------


>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>
> ------------------------------------------------------------------------
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net