Wrt context.execute() & context.execute(ContextTask) ... would it make
sense to:
1.) Extend Context to an AsyncContext, or something like that where
you'd add an AsyncContext.execute(ContextTask) ?
2.) Or, possibly add Context.execute(ContextTask) in addition to
Context.execute() ?
I think the former might be able to preserve compatibility. But, I'm
not sure of the impact to the AsyncWriter?
charlie ....
Jeanfrancois Arcand wrote:
>
> Salut,
>
> Oleksiy Stashok wrote:
>> Hello,
>>
>> Here is the 2nd try for AsyncQueueWriter. This time I have it
>> integrated and tested.
>> Attached AsyncWriteQueue.zip has implementation of AsyncWriteQueue;
>> AsyncWriteQueue-complete.diff has complete svn diff for
>> implementation and integration code.
>>
>> In diff file you can see refactoring I'm proposing for Context class.
>> Currently context class has some logic, which is executed by
>> Pipeline(ThreadPool): callback handler, protocol chain execution. I
>> propose to move this logic to ContextTask interface implementations,
>> this will simplify Context class. Because currently we have 2
>> possibilities for Context: execute callback handler, protocol chain.
>> After adding AsyncWriteQueue - 3 possibilities... Think it's too much
>> to hold in Context?
>
> If the functionality is the same but better designed, I'm +1 :-)
>
>>
>> I was thinking about async reading, looks like we can apply it for
>> UDP protocol. (Discussion with Radim on users_at_grizzly mailing list).
>> This way we can try to simulate some features, which TCP protocol
>> has, for the server side.
>
> Not only TCP/UDP, but also Charlie have presented (not sure
> implemented) last year an API where you can register a
> key/byteBuffer/size and get notified only when all the bytes requested
> are available (similar to what AIO/NIO.2 will do). I think we should
> keep that idea in mind when designing that interface.
>
> Comments inline ;-) (almost until the end of the mails :-))
>
> I'm +1 for almost everything (great works!). The only -1 is the
> Context.execute() swicth to Context.execute(ContextTask). We need to
> keep backward compatibility with the current implementation because
> Sailfin and (I suspect) the ORB will no longer works. I'm sure we can
> find a way to implement your idea and still support that method.
>
> Thanks
>
> --Jeanfrancois
>
>
>>
>> Thanks.
>>
>> WBR,
>> Alexey.
>>
> # This patch file was generated by NetBeans IDE
> # Following Index: paths are relative to:
> C:\Projects\Grizzly\trunk\modules\grizzly\src\main\java\com\sun\grizzly
> # This patch can be applied using context Tools: Patch action on
> respective folder.
> # It uses platform neutral UTF-8 encoding and \n newlines.
> # Above lines and this line are ignored by the patching process.
> Index: async/AsyncQueueWritable.java
> --- async/AsyncQueueWritable.java Locally New
> +++ async/AsyncQueueWritable.java Locally New
> @@ -0,0 +1,183 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +
> +package com.sun.grizzly.async;
> +
> +import java.io.IOException;
> +import java.net.SocketAddress;
> +import java.nio.ByteBuffer;
> +
> +/**
> + * Object, which is able to send <code>ByteBuffer</code> data
> asynchronously,
> + * using queue.
> + *
> + * @author Alexey Stashok
> + */
> +public interface AsyncQueueWritable {
> + /**
> + * Method writes <code>ByteBuffer</code> using async write queue.
> + * First, if write queue is empty - it tries to write
> <code>ByteBuffer</code>
> + * directly (without putting to the queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise it will be just logged by
> + * Grizzly framework.
> + *
> + * @param buffer <code>ByteBuffer</code>
> + * @return true, if <code>ByteBuffer</code> was written
> completely, false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer) throws
> IOException;
> +
> + /**
> + * Method writes <code>ByteBuffer</code> using async write queue.
> + * First, if write queue is empty - it tries to write
> <code>ByteBuffer</code>
> + * directly (without putting to the queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise, if the
> <code>ByteBuffer</code> is
> + * added to a writing queue - exception notification will come via
> + * <code>AsyncWriteCallbackHandler</code>
> + *
> + * @param buffer <code>ByteBuffer</code>
> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> + * which will get notified, when
> + * <code>ByteBuffer</code> will be
> completely written
> + * @return true, if <code>ByteBuffer</code> was written completely,
> + * false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException;
> +
> + /**
> + * Method writes <code>ByteBuffer</code> using async write queue.
> + * First, if write queue is empty - it tries to write
> <code>ByteBuffer</code>
> + * directly (without putting to the queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise, if the
> <code>ByteBuffer</code> is
> + * added to a writing queue - exception notification will come via
> + * <code>AsyncWriteCallbackHandler</code>
> + *
> + * @param buffer <code>ByteBuffer</code>
> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> + * which will get notified, when
> + * <code>ByteBuffer</code> will be
> completely written
> + * @param isCloneByteBuffer if true - <code>AsyncQueueWriter</code>
> + * will clone given
> + * <code>ByteBuffer</code> before puting
> it to the
> + * <code>AsyncWriteQueue</code>
> + * @return true, if <code>ByteBuffer</code> was written completely,
> + * false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler,
> + boolean isCloneByteBuffer) throws IOException;
> +
> + /**
> + * Method sends <code>ByteBuffer</code> using async write queue.
> + * First, if write queue is empty - it tries to send
> <code>ByteBuffer</code>
> + * to the given <code>SocketAddress</code> directly
> + * (without putting to the queue).
> + * If associated write queue is not empty or after direct sending
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise it will be just logged by
> + * Grizzly framework.
> + *
> + * @param dstAddress destination <code>SocketAddress</code> data
> will
> + * be sent to
> + * @param buffer <code>ByteBuffer</code>
> + * @return true, if <code>ByteBuffer</code> was written
> completely, false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer)
> + throws IOException;
> +
> + /**
> + * Method sends <code>ByteBuffer</code> using async write queue.
> + * First, if write queue is empty - it tries to send
> <code>ByteBuffer</code>
> + * to the given <code>SocketAddress</code> directly
> + * (without putting to the queue).
> + * If associated write queue is not empty or after direct sending
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise, if the
> <code>ByteBuffer</code> is
> + * added to a writing queue - exception notification will come via
> + * <code>AsyncWriteCallbackHandler</code>
> + *
> + * @param dstAddress destination <code>SocketAddress</code> data
> will
> + * be sent to
> + * @param buffer <code>ByteBuffer</code>
> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> + * which will get notified, when
> + * <code>ByteBuffer</code> will be
> completely written
> + * @return true, if <code>ByteBuffer</code> was written completely,
> + * false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException;
> +
> + /**
> + * Method sends <code>ByteBuffer</code> using async write queue.
> + * First, if write queue is empty - it tries to send
> <code>ByteBuffer</code>
> + * to the given <code>SocketAddress</code> directly
> + * (without putting to the queue).
> + * If associated write queue is not empty or after direct sending
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise, if the
> <code>ByteBuffer</code> is
> + * added to a writing queue - exception notification will come via
> + * <code>AsyncWriteCallbackHandler</code>
> + *
> + * @param dstAddress destination <code>SocketAddress</code> data
> will
> + * be sent to
> + * @param buffer <code>ByteBuffer</code>
> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> + * which will get notified, when
> + * <code>ByteBuffer</code> will be
> completely written
> + * @param isCloneByteBuffer if true - <code>AsyncQueueWriter</code>
> + * will clone given
> + * <code>ByteBuffer</code> before puting
> it to the
> + * <code>AsyncWriteQueue</code>
> + * @return true, if <code>ByteBuffer</code> was written completely,
> + * false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler,
> + boolean isCloneByteBuffer) throws IOException;
> +}
> Index: async/AsyncQueueWriter.java
> --- async/AsyncQueueWriter.java Locally New
> +++ async/AsyncQueueWriter.java Locally New
> @@ -0,0 +1,247 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import java.io.IOException;
> +import java.net.SocketAddress;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.SelectableChannel;
> +import java.nio.channels.SelectionKey;
> +
> +/**
> + * Common inteface to be implemented by protocol dependant
> asynchronous queue
> + * writers implementations
> + *
> + * @author Alexey Stashok
> + */
> +public interface AsyncQueueWriter {
> + /**
> + * Method writes <code>ByteBuffer</code> to the
> <code>SelectableChannel</code>
> + * First, if <code>SelectableChannel</code> associated write
> queue is empty -
> + * it tries to write <code>ByteBuffer</code> to the given
> + * <code>SelectableChannel</code> directly (without putting to
> the queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>
> + * and <code>SelectableChannel<code> will be registered on
> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise it will be just logged by
> + * Grizzly framework.
> + *
> + * @param key <code>SelectionKey</code> associated with
> + * <code>SelectableChannel</code> <code>ByteBuffer</code>
> + * should be written to
> + * @param buffer <code>ByteBuffer</code>
> + * @return true, if <code>ByteBuffer</code> was written
> completely, false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
> IOException;
> +
> + /**
> + * Method writes <code>ByteBuffer</code> to the
> <code>SelectableChannel</code>
> + * First, if <code>SelectableChannel</code> associated write
> queue is empty -
> + * it tries to write <code>ByteBuffer</code> to the given
> + * <code>SelectableChannel</code> directly (without putting to
> the queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>
> + * and <code>SelectableChannel<code> will be registered on
> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
> + * If an exception occurs, during direct writing - it will be
> propogated
>
> Typo propagated
>
> + * to the caller directly, otherwise, if the
> <code>ByteBuffer</code> is
> + * added to a writing queue - exception notification will come via
> + * <code>AsyncWriteCallbackHandler</code>
>
> add the AsyncWriteCallbackHandler.onIOException()
>
> + *
> + * @param key <code>SelectionKey</code> associated with
> + * <code>SelectableChannel</code> <code>ByteBuffer</code>
> + * should be written to
> + * @param buffer <code>ByteBuffer</code>
> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> + * which will get notified, when
> + * <code>ByteBuffer</code> will be
> completely written
> + * @return true, if <code>ByteBuffer</code> was written completely,
> + * false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException;
> +
> + /**
> + * Method writes <code>ByteBuffer</code> to the
> <code>SelectableChannel</code>
> + * First, if <code>SelectableChannel</code> associated write
> queue is empty -
> + * it tries to write <code>ByteBuffer</code> to the given
> + * <code>SelectableChannel</code> directly (without putting to
> the queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>
> + * and <code>SelectableChannel<code> will be registered on
> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise, if the
> <code>ByteBuffer</code> is
> + * added to a writing queue - exception notification will come via
> + * <code>AsyncWriteCallbackHandler</code>
> + *
> + * @param key <code>SelectionKey</code> associated with
> + * <code>SelectableChannel</code> <code>ByteBuffer</code>
> + * should be written to
> + * @param buffer <code>ByteBuffer</code>
> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> + * which will get notified, when
> + * <code>ByteBuffer</code> will be
> completely written
> + * @param isCloneByteBuffer if true -
> <code>AsyncQueueWriter</code> will
> + * clone given
> + * <code>ByteBuffer</code> before puting
> it to the
> + * <code>AsyncWriteQueue</code>
> + * @return true, if <code>ByteBuffer</code> was written completely,
> + * false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer)
> + throws IOException;
> +
> + /**
> + * Method sends <code>ByteBuffer</code> to the
> <code>SocketAddress</code>
> + * First, if <code>SelectableChannel</code> associated write
> queue is empty -
> + * it tries to write <code>ByteBuffer</code> to the given
> + * <code>SocketAddress</code> directly (without putting to the
> queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>
> + * and <code>SelectableChannel<code> will be registered on
> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise it will be just logged by
> + * Grizzly framework.
> + *
> + * @param key <code>SelectionKey</code> associated with
> + * <code>SelectableChannel</code>, which will be used to
> + * send<code>ByteBuffer</code> to
> + * @param dstAddress destination address <code>ByteBuffer</code>
> will be sent to
> + * @param buffer <code>ByteBuffer</code>
> + * @return true, if <code>ByteBuffer</code> was written
> completely, false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> + ByteBuffer buffer) throws IOException;
> +
> + /**
> + * Method sends <code>ByteBuffer</code> to the
> <code>SocketAddress</code>
> + * First, if <code>SelectableChannel</code> associated write
> queue is empty -
> + * it tries to write <code>ByteBuffer</code> to the given
> + * <code>SocketAddress</code> directly (without putting to the
> queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>
> + * and <code>SelectableChannel<code> will be registered on
> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise, if the
> <code>ByteBuffer</code> is
> + * added to a writing queue - exception notification will come via
> + * <code>AsyncWriteCallbackHandler</code>
> + *
> + * @param key <code>SelectionKey</code> associated with
> + * <code>SelectableChannel</code> <code>ByteBuffer</code>
> + * should be written to
> + * @param dstAddress destination address <code>ByteBuffer</code>
> will be sent to
> + * @param buffer <code>ByteBuffer</code>
> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> + * which will get notified, when
> + * <code>ByteBuffer</code> will be
> completely written
> + * @return true, if <code>ByteBuffer</code> was written completely,
> + * false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> + ByteBuffer buffer, AsyncWriteCallbackHandler
> callbackHandler)
> + throws IOException;
> +
> + /**
> + * Method sends <code>ByteBuffer</code> to the
> <code>SocketAddress</code>
> + * First, if <code>SelectableChannel</code> associated write
> queue is empty -
> + * it tries to write <code>ByteBuffer</code> to the given
> + * <code>SocketAddress</code> directly (without putting to the
> queue).
> + * If associated write queue is not empty or after direct writing
> + * <code>ByteBuffer</code> still has ready data to be written -
> + * <code>ByteBuffer</code> will be added to
> <code>AsyncWriteQueue</code>
> + * and <code>SelectableChannel<code> will be registered on
> + * <code>SelectorHandler</code>, waiting for OP_WRITE event.
> + * If an exception occurs, during direct writing - it will be
> propogated
> + * to the caller directly, otherwise, if the
> <code>ByteBuffer</code> is
> + * added to a writing queue - exception notification will come via
> + * <code>AsyncWriteCallbackHandler</code>
> + *
> + * @param key <code>SelectionKey</code> associated with
> + * <code>SelectableChannel</code> <code>ByteBuffer</code>
> + * should be written to
> + * @param dstAddress destination address <code>ByteBuffer</code>
> will be sent to
> + * @param buffer <code>ByteBuffer</code>
> + * @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
> + * which will get notified, when
> + * <code>ByteBuffer</code> will be
> completely written
> + * @param isCloneByteBuffer if true -
> <code>AsyncQueueWriter</code> will
> + * clone given
> + * <code>ByteBuffer</code> before puting
> it to the
> + * <code>AsyncWriteQueue</code>
> + * @return true, if <code>ByteBuffer</code> was written completely,
> + * false if write operation was put to queue
> + * @throws java.io.IOException
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> + ByteBuffer buffer, AsyncWriteCallbackHandler
> callbackHandler,
> + boolean isCloneByteBuffer) throws IOException;
> +
> + /**
> + * Checks whether there is any data in
> <code>AsyncWriteQueue</code> ready
> + * for the given <code>SelectableChannel</code>
> + *
> + * @param key <code>SelectionKey</code> associated with
> <code>SelectableChannel</code>
> + * @return true, if there is ready data. False otherwise.
> + */
> + public boolean hasReadyAsyncWriteData(SelectionKey key);
> +
> + /**
> + * Callback method, which should be called by
> <code>SelectorHandler</code> to
> + * notify, that given <code>SelectableChannel</code> is ready to
> transmit data.
> + *
> + * @param key <code>SelectionKey</code> associated with
> <code>SelectableChannel</code>
> + * @throws java.io.IOException
> + */
> + public void onWrite(SelectionKey key) throws IOException;
> +
> + /**
> + * Callback method, which should be called by
> <code>SelectorHandler</code> to
> + * notify, that given <code>SelectableChannel</code> is going to
> be closed, so
> + * related <code>SelectableChannel</code> data could be released
> from
> + * <code>AsyncWriteQueue</code>
> + *
> + * @param <code>SelectableChannel</code>
> + * @throws java.io.IOException
> + */
> + public void onClose(SelectableChannel channel);
> +
> + public void close();
> +
> +}
> Index: async/AsyncQueueWriterContextTask.java
> --- async/AsyncQueueWriterContextTask.java Locally New
> +++ async/AsyncQueueWriterContextTask.java Locally New
> @@ -0,0 +1,78 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import com.sun.grizzly.*;
>
> Fix the '*' by listing the classes instead (Netbeans Netbeans Netbeans
> ;-))
>
> +
> +/**
> + * <code>AsyncQueueWriter</code> task, which will be executed by
> + * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
> + * is called.
> + *
> + * @author Alexey Stashok
> + */
> +public class AsyncQueueWriterContextTask extends ContextTask {
> + private static final TaskPool<AsyncQueueWriterContextTask>
> taskPool =
> + new TaskPool<AsyncQueueWriterContextTask>() {
> + @Override
> + public AsyncQueueWriterContextTask newTask() {
> + return new AsyncQueueWriterContextTask();
> + }
> + };
> +
> + private AsyncQueueWriter asyncQueueWriter;
> +
> + public static AsyncQueueWriterContextTask poll() {
> + return taskPool.poll();
> + }
> +
> + public static void offer(AsyncQueueWriterContextTask contextTask) {
> + contextTask.recycle();
> + taskPool.offer(contextTask);
> + }
> +
> + public Object call() throws Exception {
> + try {
> + asyncQueueWriter.onWrite(context.getSelectionKey());
>
> Can you make sure the getSelectionKey() is not null, and return an
> IllegalStateException in case the Key is null? Just to make the code
> robust, event if the key cannot be null.
>
>
> + } finally {
> + offer(this);
> + }
> +
> + return null;
> + }
> +
> + public AsyncQueueWriter getAsyncQueueWriter() {
> + return asyncQueueWriter;
> + }
> +
> + public void setAsyncQueueWriter(AsyncQueueWriter asyncWriter) {
> + this.asyncQueueWriter = asyncWriter;
> + }
> +
> + @Override
> + public void recycle() {
> + asyncQueueWriter = null;
> + super.recycle();
> + }
> +}
> Index: async/AsyncWriteCallbackHandler.java
> --- async/AsyncWriteCallbackHandler.java Locally New
> +++ async/AsyncWriteCallbackHandler.java Locally New
> @@ -0,0 +1,38 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import java.nio.ByteBuffer;
> +import java.nio.channels.SelectionKey;
> +
> +/**
> + * Callback handler interface, used by <code>AsyncQueueWriter</code>
> to notify
> + * custom code about completion of specific <code>ByteBuffer</code>
> writing.
> + *
> + * @author Alexey Stashok
> + */
> +public interface AsyncWriteCallbackHandler {
> + public void onWriteCompleted(SelectionKey key, ByteBuffer buffer);
> + public void onIOException(SelectionKey key, ByteBuffer buffer);
> +}
>
> The API looks good, but I would add more docs here as this interface
> will be implemented by the user of Grizzly, and since we are lacking
> in term of tutorial, better to have good docs :-)
>
>
> Index: async/AsyncWriteQueue.java
> --- async/AsyncWriteQueue.java Locally New
> +++ async/AsyncWriteQueue.java Locally New
> @@ -0,0 +1,144 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import java.nio.channels.SelectableChannel;
> +import java.util.Map;
> +import java.util.concurrent.ConcurrentHashMap;
> +import java.util.concurrent.ConcurrentLinkedQueue;
> +import java.util.concurrent.locks.ReentrantLock;
> +
> +/**
> + * Class represents collection of
> + * <code>SelectableChannel</code>s and correspondent queue of
> + * <code>ByteBuffer</code>, which should be written asynchronously.
> + * This implementation is TCP protocol specific.
>
> To follow our convention, should this class be called TCPAsyncWriteQueue?
>
> + *
> + * @author Alexey Stashok
> + */
> +public class AsyncWriteQueue<E>{
> + private Map<SelectableChannel, ChannelAsyncWriteEntry> queueMap =
> + new ConcurrentHashMap<SelectableChannel,
> ChannelAsyncWriteEntry>();
> +
> + /**
> + * Add data to the <code>AsyncWriteQueue</code>, corresponding to
> the given
> + * <code>SelectableChannel</code>
> + *
> + * @param channel <code>SelectableChannel</code>
> + * @param queueRecord write data unit
> + */
> + public void offer(SelectableChannel channel, E queueRecord) {
> + ChannelAsyncWriteEntry entry =
> obtainChannelAsyncWriteEntry(channel);
> + entry.queue.offer(queueRecord);
> + }
> +
> + /**
> + * Get head element of <code>SelectableChannel</code> write queue.
> + * Element will not be removed from queue.
> + *
> + * @param channel <code>SelectableChannel</code>
> + *
> + * @return <code>AsyncQueueRecord</code> write data unit
> + */
> + public E peek(SelectableChannel channel) {
> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
> + if (entry != null) {
> + return entry.queue.peek();
> + }
> +
> + return null;
> + }
> +
> + /**
> + * Get head element of <code>SelectableChannel</code> write queue.
> + * Element will be removed from queue.
> + *
> + * @param channel <code>SelectableChannel</code>
> + *
> + * @return <code>AsyncQueueRecord</code> write data unit
> + */
> + public E poll(SelectableChannel channel) {
> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
> + if (entry != null) {
> + return entry.queue.poll();
> + }
> +
> + return null;
> + }
> +
> + /**
> + * Remove head element of <code>SelectableChannel</code> write
> queue.
> + *
> + * @param channel <code>SelectableChannel</code>
> + */
> + public void removeEntry(SelectableChannel channel) {
> + queueMap.remove(channel);
> + }
> +
> + /**
> + * Get the size of <code>SelectableChannel</code> write queue.
> + *
> + * @param channel <code>SelectableChannel</code>
> + * @return size of <code>SelectableChannel</code> write queue.
> + */
> + public int size(SelectableChannel channel) {
> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
> + return entry == null ? 0 : entry.queue.size();
> + }
> +
> + public void clear() {
> + queueMap.clear();
> + }
> +
> + protected ChannelAsyncWriteEntry
> obtainChannelAsyncWriteEntry(SelectableChannel channel) {
> + ChannelAsyncWriteEntry entry = queueMap.get(channel);
> + if (entry == null) {
> + synchronized(channel) {
>
> Minor: A ReentrantReadWriteLock might perform better than a Sync here
>
> + entry = queueMap.get(channel);
> + if (entry == null) {
> + entry = new ChannelAsyncWriteEntry();
> + queueMap.put(channel, entry);
> + }
> + }
> + }
> + return entry;
> + }
> +
> + /**
> + * <code>AsyncWriteQueue</code> data unit
> + */
> + protected class ChannelAsyncWriteEntry {
> + public ConcurrentLinkedQueue<E> queue;
> + public ReentrantLock writeLock;
> + public ReentrantLock onWriteLock;
> + public ReentrantLock updateLock;
> +
> + public ChannelAsyncWriteEntry() {
> + queue = new ConcurrentLinkedQueue<E>();
> + writeLock = new ReentrantLock();
> + onWriteLock = new ReentrantLock();
> + updateLock = new ReentrantLock();
> + }
> + }
> +}
> \ No newline at end of file
> Index: async/TCPAsyncQueueWriter.java
> --- async/TCPAsyncQueueWriter.java Locally New
> +++ async/TCPAsyncQueueWriter.java Locally New
> @@ -0,0 +1,255 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import com.sun.grizzly.*;
> +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
> +import java.io.IOException;
> +import java.net.SocketAddress;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.SelectableChannel;
> +import java.nio.channels.SelectionKey;
> +import java.nio.channels.WritableByteChannel;
> +import java.util.concurrent.ConcurrentLinkedQueue;
> +
> +/**
> + * TCP implementation of <code>AsyncQueueWriter</code>
> + *
> + * @author Alexey Stashok
> + */
> +public class TCPAsyncQueueWriter implements AsyncQueueWriter {
> + private SelectorHandler selectorHandler;
> + private AsyncWriteQueue<TCPAsyncQueueRecord> writeQueue;
> +
> + public TCPAsyncQueueWriter(SelectorHandler selectorHandler) {
> + this.selectorHandler = selectorHandler;
> + writeQueue = new AsyncWriteQueue<TCPAsyncQueueRecord>();
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
> IOException {
> + return write(key, buffer, null);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return write(key, buffer, callbackHandler, false);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer) throws IOException {
> +
> + SelectableChannel channel = key.channel();
> + // If AsyncWriteQueue is empty - try to write ByteBuffer here
> + ChannelAsyncWriteEntry channelEntry =
> + writeQueue.obtainChannelAsyncWriteEntry(channel);
> +
> + ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
> channelEntry.queue;
> +
> + boolean hasWriteLock = false;
> +
> + try {
> + if (queue.isEmpty() &&
> + (hasWriteLock = channelEntry.writeLock.tryLock())) {
> + ((WritableByteChannel) channel).write(buffer);
> + }
> +
> + if (buffer.hasRemaining()) {
> + // clone ByteBuffer if required
> + if (isCloneByteBuffer) {
> + int size = buffer.remaining();
> + ByteBuffer newBuffer = buffer.isDirect() ?
> + ByteBuffer.allocateDirect(size) :
> + ByteBuffer.allocate(size);
>
> Here we need to support VIEW and HEAP_ARRAY like the remaining of the
> framework allow. We don't need it for the first release, but might be
> good to file an RFE. I would think an application that use
> ByteBuffer.wrap() would like to use the same type of bb when writing.
>
> +
> + newBuffer.put(buffer);
> + newBuffer.position(0);
> + buffer = newBuffer;
> + }
> +
> + // add new element to the queue
> + channelEntry.updateLock.lock();
> + queue.offer(new TCPAsyncQueueRecord(buffer,
> + callbackHandler));
>
> Here should we try to cache TCPAsyncQueueRecord instead of calling
> 'new' for every write? I would think yes :-)
>
> +
> + /*
> + * This check helps to avoid not required key
> registering
> + */
> + if (!channelEntry.onWriteLock.isLocked()) {
> + registerForWriting(key);
> + }
> +
> + channelEntry.updateLock.unlock();
> +
> + return false;
> + }
> + } finally {
> + if (hasWriteLock) {
> + channelEntry.writeLock.unlock();
> + }
> + }
> +
> + return true;
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean hasReadyAsyncWriteData(SelectionKey key) {
> + return writeQueue.size(key.channel()) > 0;
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void onWrite(SelectionKey key) throws IOException {
> + SelectableChannel channel = key.channel();
> +
> + ChannelAsyncWriteEntry channelEntry =
> + writeQueue.obtainChannelAsyncWriteEntry(channel);
> +
> + boolean hasUpdateLock = false;
> + boolean hasOnWriteLock = false;
> +
> + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
> + try {
> + ConcurrentLinkedQueue<TCPAsyncQueueRecord> queue =
> channelEntry.queue;
> +
> + while (queue.size() > 0) {
> + if (hasUpdateLock) {
> + channelEntry.updateLock.unlock();
> + hasUpdateLock = false;
> + }
> +
> + TCPAsyncQueueRecord queueRecord = queue.peek();
> +
> + ByteBuffer byteBuffer = queueRecord.byteBuffer;
> + try {
> + ((WritableByteChannel)
> channel).write(byteBuffer);
> + } catch (IOException e) {
> + if (queueRecord.callbackHandler != null) {
> +
> queueRecord.callbackHandler.onIOException(key,
> + byteBuffer);
> + }
>
> else log the exception.
>
> + }
> +
> + if (!byteBuffer.hasRemaining()) {
> + TCPAsyncQueueRecord removedQueueRecord =
> + queue.poll();
> + assert removedQueueRecord == queueRecord;
> +
> + if (removedQueueRecord.callbackHandler !=
> null) {
> + removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
> + }
> + } else {
> + hasOnWriteLock = false;
> + channelEntry.onWriteLock.unlock();
> + registerForWriting(key);
> + break;
> + }
> +
> + if (queue.size() == 0) {
> + /*
> + * If queue is empty - there is possibility
> that write method
> + * will add entry, but it will not be
> processed, because
> + * onWriteLock is currently locked
> + */
> + channelEntry.updateLock.lock();
> + hasUpdateLock = true;
> + }
> + }
> + } finally {
> + if (hasOnWriteLock) {
> + channelEntry.onWriteLock.unlock();
> + }
> +
> + if (hasUpdateLock) {
> + channelEntry.updateLock.unlock();
> + }
> + }
> + }
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> ByteBuffer buffer) throws IOException {
> + throw new UnsupportedOperationException("Not supported for
> TCP transport.");
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + throw new UnsupportedOperationException("Not supported for
> TCP transport.");
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer) throws IOException {
> + throw new UnsupportedOperationException("Not supported for
> TCP transport.");
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void onClose(SelectableChannel channel) {
> + writeQueue.removeEntry(channel);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void close() {
> + writeQueue.clear();
> + writeQueue = null;
> + }
> +
> + private void registerForWriting(SelectionKey key) {
> + selectorHandler.register(key, SelectionKey.OP_WRITE);
> + }
> +
> + /**
> + * <code>AsyncWriteQueue</code> data unit specific for TCP protocol
> + */
> + protected static class TCPAsyncQueueRecord {
> + public ByteBuffer byteBuffer;
> + public AsyncWriteCallbackHandler callbackHandler;
> +
> + public TCPAsyncQueueRecord(ByteBuffer byteBuffer,
> + AsyncWriteCallbackHandler callbackHandler) {
> + this.byteBuffer = byteBuffer;
> + this.callbackHandler = callbackHandler;
> + }
> + }
> +}
> Index: async/UDPAsyncQueueWriter.java
> --- async/UDPAsyncQueueWriter.java Locally New
> +++ async/UDPAsyncQueueWriter.java Locally New
> @@ -0,0 +1,271 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.async;
> +
> +import com.sun.grizzly.*;
> +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
> +import java.io.IOException;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.SelectableChannel;
> +import java.nio.channels.SelectionKey;
> +import java.nio.channels.WritableByteChannel;
> +import java.util.concurrent.ConcurrentLinkedQueue;
> +import java.net.SocketAddress;
> +import java.nio.channels.DatagramChannel;
> +
> +/**
> + * UDP implementation of <code>AsyncQueueWriter</code>
> + *
> + * @author Alexey Stashok
> + */
> +public class UDPAsyncQueueWriter implements AsyncQueueWriter {
> + private SelectorHandler selectorHandler;
> + private AsyncWriteQueue<UDPAsyncQueueRecord> writeQueue;
> +
> + public UDPAsyncQueueWriter(SelectorHandler selectorHandler) {
> + this.selectorHandler = selectorHandler;
> + writeQueue = new AsyncWriteQueue<UDPAsyncQueueRecord>();
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer) throws
> IOException {
> + return write(key, null, buffer, null);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return write(key, null, buffer, callbackHandler, false);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer) throws IOException {
> + return write(key, null, buffer, callbackHandler,
> isCloneByteBuffer);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> + ByteBuffer buffer) throws IOException {
> + return write(key, dstAddress, buffer, null);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> + ByteBuffer buffer, AsyncWriteCallbackHandler
> callbackHandler)
> + throws IOException {
> + return write(key, dstAddress, buffer, callbackHandler, false);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean write(SelectionKey key, SocketAddress dstAddress,
> + ByteBuffer buffer, AsyncWriteCallbackHandler
> callbackHandler,
> + boolean isCloneByteBuffer) throws IOException {
> +
> + SelectableChannel channel = key.channel();
> + // If AsyncWriteQueue is empty - try to write ByteBuffer here
> + ChannelAsyncWriteEntry channelEntry =
> + writeQueue.obtainChannelAsyncWriteEntry(channel);
> +
> + ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
> channelEntry.queue;
> +
> + boolean hasWriteLock = false;
> +
> + try {
> + if (queue.isEmpty() &&
> + (hasWriteLock = channelEntry.writeLock.tryLock())) {
> + doWrite((WritableByteChannel) channel, dstAddress,
> buffer);
> + }
> +
> + if (buffer.hasRemaining()) {
> + // clone ByteBuffer if required
> + if (isCloneByteBuffer) {
> + int size = buffer.remaining();
> + ByteBuffer newBuffer = buffer.isDirect() ?
> + ByteBuffer.allocateDirect(size) :
> + ByteBuffer.allocate(size);
> +
> + newBuffer.put(buffer);
> + newBuffer.position(0);
> + buffer = newBuffer;
> + }
>
> Same as above. Support HEAP and VIEW_HEAP/DIRECT via ByteBufferFactory
>
> +
> + // add new element to the queue
> + channelEntry.updateLock.lock();
> + queue.offer(new UDPAsyncQueueRecord(dstAddress, buffer,
> + callbackHandler));
> +
> + /*
> + * This check helps to avoid not required key
> registering
> + */
> + if (!channelEntry.onWriteLock.isLocked()) {
> + registerForWriting(key);
> + }
> +
> + channelEntry.updateLock.unlock();
> +
> + return false;
> + }
> + } finally {
> + if (hasWriteLock) {
> + channelEntry.writeLock.unlock();
> + }
> + }
> +
> + return true;
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean hasReadyAsyncWriteData(SelectionKey key) {
> + return writeQueue.size(key.channel()) > 0;
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void onWrite(SelectionKey key) throws IOException {
> + SelectableChannel channel = key.channel();
> +
> + ChannelAsyncWriteEntry channelEntry =
> + writeQueue.obtainChannelAsyncWriteEntry(channel);
> +
> + boolean hasUpdateLock = false;
> + boolean hasOnWriteLock = false;
> +
> + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
> + try {
> + ConcurrentLinkedQueue<UDPAsyncQueueRecord> queue =
> + channelEntry.queue;
> +
> + while (queue.size() > 0) {
> + if (hasUpdateLock) {
> + channelEntry.updateLock.unlock();
> + hasUpdateLock = false;
> + }
> +
> + UDPAsyncQueueRecord queueRecord = queue.peek();
> +
> + ByteBuffer byteBuffer = queueRecord.byteBuffer;
> + try {
> + doWrite((WritableByteChannel) channel,
> + queueRecord.dstAddress, byteBuffer);
> + } catch (IOException e) {
> + if (queueRecord.callbackHandler != null) {
> +
> queueRecord.callbackHandler.onIOException(key,
> + byteBuffer);
> + }
> + }
>
> else log the exception.
>
> +
> + if (!byteBuffer.hasRemaining()) {
> + UDPAsyncQueueRecord removedQueueRecord =
> queue.poll();
> + assert removedQueueRecord == queueRecord;
> +
> + if (removedQueueRecord.callbackHandler !=
> null) {
> + removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
> + }
> + } else {
> + hasOnWriteLock = false;
> + channelEntry.onWriteLock.unlock();
> + registerForWriting(key);
> + break;
> + }
> +
> + if (queue.size() == 0) {
> + /*
> + * If queue is empty - there is possibility
> that write method
> + * will add entry, but it will not be
> processed, because
> + * onWriteLock is currently locked
> + */
> + channelEntry.updateLock.lock();
> + hasUpdateLock = true;
> + }
> + }
> + } finally {
> + if (hasOnWriteLock) {
> + channelEntry.onWriteLock.unlock();
> + }
> +
> + if (hasUpdateLock) {
> + channelEntry.updateLock.unlock();
> + }
> + }
> + }
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void onClose(SelectableChannel channel) {
> + writeQueue.removeEntry(channel);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public void close() {
> + writeQueue.clear();
> + writeQueue = null;
> + }
> +
> + private void doWrite(WritableByteChannel channel, SocketAddress
> dstAddress,
> + ByteBuffer byteBuffer) throws IOException {
> + if (dstAddress != null) {
> + ((DatagramChannel) channel).send(byteBuffer, dstAddress);
> + } else {
> + channel.write(byteBuffer);
> + }
> + }
> +
> + private void registerForWriting(SelectionKey key) {
> + selectorHandler.register(key, SelectionKey.OP_WRITE);
> + }
> +
> + /**
> + * <code>AsyncWriteQueue</code> data unit specific for TCP protocol
> + */
> + protected static class UDPAsyncQueueRecord extends
> + TCPAsyncQueueWriter.TCPAsyncQueueRecord {
> + public SocketAddress dstAddress;
> +
> + public UDPAsyncQueueRecord(SocketAddress dstAddress,
> ByteBuffer byteBuffer,
> + AsyncWriteCallbackHandler callbackHandler) {
> + super(byteBuffer, callbackHandler);
> + this.dstAddress = dstAddress;
> + }
> + }
> +}
> Index: CallbackHandlerContextTask.java
> --- CallbackHandlerContextTask.java Locally New
> +++ CallbackHandlerContextTask.java Locally New
> @@ -0,0 +1,98 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly;
> +
> +import com.sun.grizzly.Context.OpType;
> +import com.sun.grizzly.ContextTask.TaskPool;
> +
> +/**
> + * <code>CallbackHandler</code> task, which will be executed by
> + * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
> + * is called.
> + *
> + * @author Alexey Stashok
> + */
> +public class CallbackHandlerContextTask extends ContextTask {
> + private static final TaskPool<CallbackHandlerContextTask> taskPool =
> + new TaskPool<CallbackHandlerContextTask>() {
> + @Override
> + public CallbackHandlerContextTask newTask() {
> + return new CallbackHandlerContextTask();
> + }
> + };
> +
> + private CallbackHandler callBackHandler;
> +
> + public static CallbackHandlerContextTask poll() {
> + return taskPool.poll();
> + }
> +
> + public static void offer(CallbackHandlerContextTask contextTask) {
> + contextTask.recycle();
> + taskPool.offer(contextTask);
> + }
> +
> + public Object call() throws Exception {
> + IOEvent ioEvent = context.getIOEvent();
> + OpType currentOpType = context.getCurrentOpType();
> +
> + try {
> + if (currentOpType == OpType.OP_READ) {
> + callBackHandler.onRead(ioEvent);
> + } else if (currentOpType == OpType.OP_WRITE) {
> + callBackHandler.onWrite(ioEvent);
> + } else if (currentOpType == OpType.OP_CONNECT) {
> + callBackHandler.onConnect(ioEvent);
> + }
> + } finally {
> + if (ioEvent != null) {
> + // Prevent the CallbackHandler to re-use the context.
> + // TODO: This is still dangerous as the Context
> might have been
> + // cached by the CallbackHandler.
> + ioEvent.attach(null);
> + ioEvent = null;
> + }
> +
> + offer(this);
> + }
> +
> + return null;
> + }
> +
> + public CallbackHandler getCallBackHandler() {
> + return callBackHandler;
> + }
> +
> + public void setCallBackHandler(CallbackHandler callBackHandler) {
> + this.callBackHandler = callBackHandler;
> + }
> +
> + @Override
> + public void recycle() {
> + callBackHandler = null;
> + super.recycle();
> + }
> +
> +
> +}
> Index: Context.java
> --- Context.java Base (BASE)
> +++ Context.java Locally Modified (Based On LOCAL)
> @@ -23,9 +23,14 @@
>
> package com.sun.grizzly;
>
> +import com.sun.grizzly.async.AsyncQueueWritable;
> +import com.sun.grizzly.async.AsyncQueueWriter;
> +import com.sun.grizzly.async.AsyncWriteCallbackHandler;
> +import java.io.IOException;
> +import java.net.SocketAddress;
> +import java.nio.ByteBuffer;
> import java.nio.channels.SelectionKey;
> import java.util.HashMap;
> -import java.util.concurrent.Callable;
>
> /**
> * This Object is used to share information between the Grizzly
> Framework
> @@ -33,7 +38,7 @@
> *
> * @author Jeanfrancois Arcand
> */
> -public class Context implements Callable {
> +public class Context {
>
> /**
> * A SelectionKey's registration state.
> @@ -119,6 +124,16 @@
>
>
> /**
> + * <code>AsyncQueueWriter</code>
> + */
> + private AsyncQueueWriter asyncQueueWriter;
> +
> + /**
> + * <code>AsyncQueueWritable</code>
> + */
> + private AsyncQueueWritable asyncQueueWritable;
> +
> + /**
> * Constructor
> */
> public Context() {
> @@ -210,6 +225,7 @@
> currentOpType = null;
> protocolChain = null;
> ioEvent = null;
> + asyncQueueWriter = null;
> }
>
>
> @@ -233,47 +249,6 @@
>
>
> /**
> - * Execute the <code>ProtocolChain</code>.
> - * @throws java.lang.Exception Exception thrown by protocol chain
> - */
> - public Object call() throws Exception {
> - // If a IOEvent has been defined, invoke it first and
> - // let its associated CallbackHandler decide if the
> ProtocolChain
> - // be invoked or not.
> - Object attachment = key.attachment();
> - if (ioEvent != null && (attachment instanceof CallbackHandler)){
> - try{
> - CallbackHandler callBackHandler =
> ((CallbackHandler)attachment);
> - if (currentOpType == OpType.OP_READ){
> - callBackHandler.onRead(ioEvent);
> - } else if (currentOpType == OpType.OP_WRITE){
> - callBackHandler.onWrite(ioEvent);
> - } else if (currentOpType == OpType.OP_CONNECT){
> - callBackHandler.onConnect(ioEvent);
> - }
> - } finally {
> - if (ioEvent != null){
> - // Prevent the CallbackHandler to re-use the
> context.
> - // TODO: This is still dangerous as the Context
> might have been
> - // cached by the CallbackHandler.
> - ioEvent.attach(null);
> - ioEvent = null;
> - }
> - }
> - } else {
> - SelectionKey currentKey = key;
> -
> selectorHandler.getSelectionKeyHandler().process(currentKey);
> - try {
> - protocolChain.execute(this);
> - } finally {
> - selectorHandler.getSelectionKeyHandler().postProcess(currentKey);
> - }
> - }
> - return null;
> - }
> -
> -
> - /**
> * Return <code>ProtocolChain</code> executed by this instance.
> * @return <code>ProtocolChain</code> instance
> */
> @@ -313,9 +288,12 @@
> * Execute this Context using the Controller's Pipeline
> * @throws com.sun.grizzly.PipelineFullException
> */
> - public void execute() throws PipelineFullException{
>
>
> Here we must keep the execute() method available as it will breaks
> existing application (like Sailfin).
>
> - getPipeline().execute(this);
> + public void execute(ContextTask contextTask) throws
> PipelineFullException {
> + if (contextTask != null) {
> + contextTask.setContext(this);
> + getPipeline().execute(contextTask);
> }
>
> else throw IllegalStateException("ContextTask cannot be null");
>
> + }
>
>
> /**
> @@ -392,4 +370,89 @@
> public void setSelectorHandler(SelectorHandler selectorHandler) {
> this.selectorHandler = selectorHandler;
> }
> +
> +
> + /**
> + * Returns <code>AsyncQueueWritable</code>, assciated with the
> current
> + * <code>Context</code>. This method is not threadsafe.
> + *
> + * @return <code>AsyncQueueWritable</code>
> + */
> + public AsyncQueueWritable getAsyncQueueWritable() {
> + if (asyncQueueWritable == null) {
> + asyncQueueWritable = new AsyncQueueWritableContextWrapper();
> }
> +
> + return asyncQueueWritable;
> + }
> +
> + /**
> + * Return the <code>AsyncQueueWriter</code>
> + * @return the <code>AsyncQueueWriter</code>
> + */
> + protected AsyncQueueWriter getAsyncQueueWriter() {
> + return asyncQueueWriter;
> + }
> +
> + /**
> + * Set the <code>AsyncQueueWriter</code>
> + * @param asyncQueueWriter <code>AsyncQueueWriter</code>
> + */
> + protected void setAsyncQueueWriter(AsyncQueueWriter
> asyncQueueWriter) {
> + this.asyncQueueWriter = asyncQueueWriter;
> + }
> +
> + private class AsyncQueueWritableContextWrapper implements
> AsyncQueueWritable {
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer) throws
> IOException {
> + return asyncQueueWriter.write(key, buffer);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return asyncQueueWriter.write(key, buffer, callbackHandler);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler,
> + boolean isCloneByteBuffer) throws IOException {
> + return asyncQueueWriter.write(key, buffer, callbackHandler,
> + isCloneByteBuffer);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer)
> + throws IOException {
> + return asyncQueueWriter.write(key, dstAddress, buffer);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return asyncQueueWriter.write(key, dstAddress, buffer,
> + callbackHandler);
> + }
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer)
> + throws IOException {
> + return asyncQueueWriter.write(key, dstAddress, buffer,
> + callbackHandler, isCloneByteBuffer);
> + }
> + }
> +}
> Index: ContextTask.java
> --- ContextTask.java Locally New
> +++ ContextTask.java Locally New
> @@ -0,0 +1,70 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly;
> +
> +import java.util.concurrent.Callable;
> +import java.util.concurrent.ConcurrentLinkedQueue;
> +
> +/**
> + * Task, which will be executed by <code>Context</code>, when
> + * Context.execute(<code>ContextTask</code>) is called.
> + *
> + * @author Alexey Stashok
> + */
> +public abstract class ContextTask implements Callable {
> + protected Context context;
> +
> + public Context getContext() {
> + return context;
> + }
> +
> + public void setContext(Context context) {
> + this.context = context;
> + }
> +
> + public void recycle() {
> + context = null;
> + }
> +
> + protected abstract static class TaskPool<E extends ContextTask> {
> + private ConcurrentLinkedQueue<E> pool =
> + new ConcurrentLinkedQueue<E>();
> +
> + public abstract E newTask();
> +
> + public E poll() {
> + E task = pool.poll();
> + if (task == null) {
> + task = newTask();
> + }
> +
> + return task;
> + }
> +
> + public void offer(E task) {
> + task.recycle();
> + pool.offer(task);
> + }
> + }
> +}
> Index: Controller.java
> --- Controller.java Base (BASE)
> +++ Controller.java Locally Modified (Based On LOCAL)
> @@ -298,6 +298,7 @@
> key = iterator.next();
> iterator.remove();
> boolean skipOpWrite = false;
> + delegateToWorkerThread = false;
> if (key.isValid()) {
> if ((key.readyOps() & SelectionKey.OP_ACCEPT)
> == SelectionKey.OP_ACCEPT){
> @@ -356,7 +357,7 @@
> if (logger.isLoggable(Level.FINE)) {
> logger.log(Level.FINE, "OP_WRITE on "
> + key);
> }
> - delegateToWorkerThread = selectorHandler.
> + delegateToWorkerThread |= selectorHandler.
> onWriteInterest(key,serverCtx);
> }
>
> @@ -368,7 +369,9 @@
> Context context =
> pollContext(key,protocolChain);
> context.setSelectorHandler(selectorHandler);
>
> context.setPipeline(selectorHandler.pipeline());
> - context.execute();
> + context.setAsyncQueueWriter(
> +
> selectorHandler.getAsyncQueueWriter());
> + context.execute(ProtocolChainContextTask.poll());
>
> This bring an interesting point. Instead of setting the
> setAsyncQueueWriter on every request, should the Context instance be
> pooled per Selector instead of the Controller? That way once
> configured, a Context can be pooled and never have to be re-configured
> again?
>
>
>
> }
> } else {
>
> selectorHandler.getSelectionKeyHandler().cancel(key);
> Index: filter/EchoAsyncWriteQueueFilter.java
> --- filter/EchoAsyncWriteQueueFilter.java Locally New
> +++ filter/EchoAsyncWriteQueueFilter.java Locally New
> @@ -0,0 +1,81 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.filter;
> +
> +import com.sun.grizzly.Context;
> +import com.sun.grizzly.Controller;
> +import com.sun.grizzly.ProtocolFilter;
> +import com.sun.grizzly.util.OutputWriter;
> +import com.sun.grizzly.util.WorkerThread;
> +import java.io.IOException;
> +import java.net.SocketAddress;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.DatagramChannel;
> +import java.nio.channels.SelectableChannel;
> +import java.util.concurrent.atomic.AtomicInteger;
> +import java.util.logging.Level;
> +
> +/**
> + * Simple echo filter
> + *
> + * @author Alexey Stashok
> + */
> +public class EchoAsyncWriteQueueFilter implements ProtocolFilter {
> + AtomicInteger counter = new AtomicInteger(1);
> + public boolean execute(Context ctx) throws IOException {
> + final WorkerThread workerThread =
> ((WorkerThread)Thread.currentThread());
> + ByteBuffer buffer = workerThread.getByteBuffer();
> + buffer.flip();
> + if (buffer.hasRemaining()) {
> + // Depending on protocol perform echo
> + try {
> + if (ctx.getProtocol() == Controller.Protocol.TCP) {
> // TCP case
> + ctx.getAsyncQueueWritable().writeToAsyncQueue(buffer, null, true);
> + } else if (ctx.getProtocol() ==
> Controller.Protocol.UDP) { // UDP case
> + SocketAddress address = (SocketAddress)
> +
> ctx.getAttribute(ReadFilter.UDP_SOCKETADDRESS);
> +
> ctx.getAsyncQueueWritable().writeToAsyncQueue(address,
> + buffer, null, true);
> + }
> + } catch (IOException ex) {
> + // Store incoming data in byte[]
> + byte[] data = new byte[buffer.remaining()];
> + int position = buffer.position();
> + buffer.get(data);
> + buffer.position(position);
> +
> + Controller.logger().log(Level.WARNING,
> + "Exception. Echo \"" + new String(data) + "\"");
> + throw ex;
> + }
> + }
> +
> + buffer.clear();
> + return false;
> + }
> +
> + public boolean postExecute(Context ctx) throws IOException {
> + return true;
> + }
> +}
> Index: ProtocolChainContextTask.java
> --- ProtocolChainContextTask.java Locally New
> +++ ProtocolChainContextTask.java Locally New
> @@ -0,0 +1,69 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +
> +package com.sun.grizzly;
> +
> +import java.nio.channels.SelectionKey;
> +
> +/**
> + * <code>ProtocolChain</code> task, which will be executed by
> + * <code>Context</code>, when Context.execute(<code>ContextTask</code>)
> + * is called.
> + *
> + * @author Alexey Stashok
> + */
> +public class ProtocolChainContextTask extends ContextTask {
> + private static final TaskPool<ProtocolChainContextTask> taskPool =
> + new TaskPool<ProtocolChainContextTask>() {
> + @Override
> + public ProtocolChainContextTask newTask() {
> + return new ProtocolChainContextTask();
> + }
> + };
> +
> + public static ProtocolChainContextTask poll() {
> + return taskPool.poll();
> + }
> +
> + public static void offer(ProtocolChainContextTask contextTask) {
> + contextTask.recycle();
> + taskPool.offer(contextTask);
> + }
> +
> + public Object call() throws Exception {
> + SelectionKey currentKey = context.getSelectionKey();
> + SelectionKeyHandler selectionKeyHandler = context.
> + getSelectorHandler().getSelectionKeyHandler();
> +
> + selectionKeyHandler.process(currentKey);
> + try {
> + context.getProtocolChain().execute(context);
> + } finally {
> + selectionKeyHandler.postProcess(currentKey);
> + offer(this);
> + }
> +
> + return null;
> + }
> +}
> Index: SelectorHandler.java
> --- SelectorHandler.java Base (BASE)
> +++ SelectorHandler.java Locally Modified (Based On LOCAL)
> @@ -22,6 +22,7 @@
> */
> package com.sun.grizzly;
>
> +import com.sun.grizzly.async.AsyncQueueWriter;
> import com.sun.grizzly.util.Copyable;
> import java.io.IOException;
> import java.nio.channels.SelectableChannel;
> @@ -203,6 +204,16 @@
>
>
> /**
> + * Returns <code>AsyncQueueWriter</code> associated with this
> + * <code>SelectorHandler</code>. Method will return null, if this
> + * <code>TCPSelectorHandler</code> is not running.
> + *
> + * @return <code>AsyncQuquqWriter</code>
> + */
> + public AsyncQueueWriter getAsyncQueueWriter();
> +
> +
> + /**
> * Return the Pipeline used to execute this SelectorHandler's
> * SelectionKey ops
> * @return The pipeline to use, or null if the Controller's Pipeline
> Index: TCPConnectorHandler.java
> --- TCPConnectorHandler.java Base (BASE)
> +++ TCPConnectorHandler.java Locally Modified (Based On LOCAL)
> @@ -23,6 +23,8 @@
> package com.sun.grizzly;
>
> import com.sun.grizzly.Controller.Protocol;
> +import com.sun.grizzly.async.AsyncWriteCallbackHandler;
> +import com.sun.grizzly.async.AsyncQueueWritable;
> import com.sun.grizzly.util.ByteBufferInputStream;
> import com.sun.grizzly.util.OutputWriter;
> import java.io.IOException;
> @@ -70,7 +72,8 @@
> *
> * @author Jeanfrancois Arcand
> */
> -public class TCPConnectorHandler implements
> ConnectorHandler<TCPSelectorHandler, CallbackHandler>{
> +public class TCPConnectorHandler implements
> + ConnectorHandler<TCPSelectorHandler, CallbackHandler>,
> AsyncQueueWritable {
>
> /**
> * The underlying TCPSelectorHandler used to mange SelectionKeys.
> @@ -399,6 +402,72 @@
>
>
> /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer) throws
> IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> + socketChannel.keyFor(selectorHandler.getSelector()),
> buffer);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> + socketChannel.keyFor(selectorHandler.getSelector()),
> buffer,
> + callbackHandler);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler,
> + boolean isCloneByteBuffer) throws IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> + socketChannel.keyFor(selectorHandler.getSelector()),
> buffer,
> + callbackHandler, isCloneByteBuffer);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer)
> + throws IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> + socketChannel.keyFor(selectorHandler.getSelector()),
> dstAddress,
> + buffer);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> + socketChannel.keyFor(selectorHandler.getSelector()),
> dstAddress,
> + buffer, callbackHandler);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer)
> + throws IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> + socketChannel.keyFor(selectorHandler.getSelector()),
> dstAddress,
> + buffer, callbackHandler, isCloneByteBuffer);
> + }
> +
> +
> + /**
> * Close the underlying connection.
> */
> public void close() throws IOException{
> Index: TCPSelectorHandler.java
> --- TCPSelectorHandler.java Base (BASE)
> +++ TCPSelectorHandler.java Locally Modified (Based On LOCAL)
> @@ -23,6 +23,9 @@
>
> package com.sun.grizzly;
>
> +import com.sun.grizzly.async.AsyncQueueWriter;
> +import com.sun.grizzly.async.TCPAsyncQueueWriter;
> +import com.sun.grizzly.async.AsyncQueueWriterContextTask;
> import com.sun.grizzly.util.Cloner;
> import com.sun.grizzly.util.Copyable;
> import com.sun.grizzly.util.SelectionKeyOP;
> @@ -171,6 +174,7 @@
> */
> protected ProtocolChainInstanceHandler instanceHandler;
>
> + protected AsyncQueueWriter asyncQueueWriter;
>
> public TCPSelectorHandler(){
> }
> @@ -234,6 +238,10 @@
> public void preSelect(Context ctx) throws IOException {
> initOpRegistriesIfRequired();
>
> + if (asyncQueueWriter == null) {
> + asyncQueueWriter = new TCPAsyncQueueWriter(this);
> + }
> +
> if (selector == null){
> try{
> connectorInstanceHandler = new ConnectorInstanceHandler.
> @@ -423,7 +431,7 @@
> /**
> * Shuntdown this instance by closing its Selector and associated
> channels.
> */
> - public void shutdown(){
> + public void shutdown() {
> if (selector != null){
> for (SelectionKey selectionKey : selector.keys()) {
> selectionKeyHandler.close(selectionKey);
> @@ -454,7 +462,11 @@
> "selector.close",ex);
> }
>
> + if (asyncQueueWriter != null) {
> + asyncQueueWriter.close();
> + asyncQueueWriter = null;
> }
> + }
>
> /**
> * {_at_inheritDoc}
> @@ -494,10 +506,11 @@
> throws IOException{
> // disable OP_READ on key before doing anything else
> key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
> - if (key.attachment() instanceof CallbackHandler){
> - final Context context =
> ctx.getController().pollContext(key);
> + Object attach = key.attachment();
> + if (attach instanceof CallbackHandler){
> + final Context context = pollContext(ctx, key);
> context.setCurrentOpType(Context.OpType.OP_READ);
> - invokeCallbackHandler(context);
> + invokeCallbackHandler((CallbackHandler) attach, context);
> return false;
> } else {
> return true;
> @@ -515,12 +528,19 @@
> throws IOException{
> // disable OP_WRITE on key before doing anything else
> key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
> - if (key.attachment() instanceof CallbackHandler){
> - final Context context =
> ctx.getController().pollContext(key);
> - context.setSelectionKey(key);
> +
> + Object attach = null;
> +
> + if (asyncQueueWriter.hasReadyAsyncWriteData(key)) {
> + final Context context = pollContext(ctx, key);
> context.setCurrentOpType(Context.OpType.OP_WRITE);
> - invokeCallbackHandler(context);
> + invokeAsyncQueueWriter(context);
> return false;
> + } else if ((attach = key.attachment()) instanceof
> CallbackHandler){
> + final Context context = pollContext(ctx, key);
> + context.setCurrentOpType(Context.OpType.OP_WRITE);
> + invokeCallbackHandler((CallbackHandler) attach, context);
> + return false;
> } else {
> return true;
> }
> @@ -541,11 +561,11 @@
> key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
> key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
>
> - if (key.attachment() instanceof CallbackHandler){
> - Context context = ctx.getController().pollContext(key);
> - context.setSelectionKey(key);
> + Object attach = key.attachment();
> + if (attach instanceof CallbackHandler){
> + final Context context = pollContext(ctx, key);
> context.setCurrentOpType(Context.OpType.OP_CONNECT);
> - invokeCallbackHandler(context);
> + invokeCallbackHandler((CallbackHandler) attach, context);
> }
> return false;
> }
> @@ -556,13 +576,14 @@
> * @param context <code>Context</code>
> * @throws java.io.IOException
> */
> - protected void invokeCallbackHandler(Context context) throws
> IOException{
> - context.setSelectorHandler(this);
> -
> + protected void invokeCallbackHandler(CallbackHandler
> callbackHandler,
> + Context context) throws IOException{
> IOEvent<Context>ioEvent = new
> IOEvent.DefaultIOEvent<Context>(context);
> context.setIOEvent(ioEvent);
> try {
> - context.execute();
> + CallbackHandlerContextTask task =
> CallbackHandlerContextTask.poll();
> + task.setCallBackHandler(callbackHandler);
> + context.execute(task);
> } catch (PipelineFullException ex){
> throw new IOException(ex.getMessage());
> }
> @@ -570,6 +591,22 @@
>
>
> /**
> + * Invoke a <code>AsyncQueueWriter</code>
> + * @param context <code>Context</code>
> + * @throws java.io.IOException
> + */
> + protected void invokeAsyncQueueWriter(Context context) throws
> IOException {
> + AsyncQueueWriterContextTask task =
> AsyncQueueWriterContextTask.poll();
> + task.setAsyncQueueWriter(asyncQueueWriter);
> + try {
> + context.execute(task);
> + } catch (PipelineFullException ex) {
> + throw new IOException(ex.getMessage());
> + }
> + }
> +
> +
> + /**
> * Return an instance of the default <code>ConnectorHandler</code>,
> * which is the <code>TCPConnectorHandler</code>
> * @return <code>ConnectorHandler</code>
> @@ -654,6 +691,13 @@
> this.selector = selector;
> }
>
> + /**
> + * {_at_inheritDoc}
> + */
> + public AsyncQueueWriter getAsyncQueueWriter() {
> + return asyncQueueWriter;
> + }
> +
> public long getSelectTimeout() {
> return selectTimeout;
> }
> @@ -855,8 +899,19 @@
> } catch (IOException ex){
> ; // LOG ME
> }
> +
> + asyncQueueWriter.onClose(channel);
> }
>
> + private Context pollContext(final Context serverContext,
> + final SelectionKey key) {
> + final Context context =
> serverContext.getController().pollContext(key);
> + context.setSelectionKey(key);
> + context.setSelectorHandler(this);
> + context.setAsyncQueueWriter(asyncQueueWriter);
> + return context;
> + }
> +
> //--------------- ConnectorInstanceHandler
> -----------------------------
> /**
> * Return <Callable>factory<Callable> object, which knows how
> Index: UDPConnectorHandler.java
> --- UDPConnectorHandler.java Base (BASE)
> +++ UDPConnectorHandler.java Locally Modified (Based On LOCAL)
> @@ -23,6 +23,8 @@
>
> package com.sun.grizzly;
>
> +import com.sun.grizzly.async.AsyncWriteCallbackHandler;
> +import com.sun.grizzly.async.AsyncQueueWritable;
> import com.sun.grizzly.util.ByteBufferInputStream;
> import java.io.IOException;
> import java.net.SocketAddress;
> @@ -47,7 +49,8 @@
> *
> * @author Jeanfrancois Arcand
> */
> -public class UDPConnectorHandler implements
> ConnectorHandler<UDPSelectorHandler, CallbackHandler>{
> +public class UDPConnectorHandler implements
> + ConnectorHandler<UDPSelectorHandler, CallbackHandler>,
> AsyncQueueWritable {
>
> /**
> * The underlying UDPSelectorHandler used to mange SelectionKeys.
> @@ -339,6 +342,72 @@
>
>
> /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer) throws
> IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> +
> datagramChannel.keyFor(selectorHandler.getSelector()), buffer);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> +
> datagramChannel.keyFor(selectorHandler.getSelector()), buffer,
> + callbackHandler);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler,
> + boolean isCloneByteBuffer) throws IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> +
> datagramChannel.keyFor(selectorHandler.getSelector()), buffer,
> + callbackHandler, isCloneByteBuffer);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer)
> + throws IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> +
> datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress,
> + buffer);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler) throws
> IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> +
> datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress,
> + buffer, callbackHandler);
> + }
> +
> +
> + /**
> + * {_at_inheritDoc}
> + */
> + public boolean writeToAsyncQueue(SocketAddress dstAddress,
> ByteBuffer buffer,
> + AsyncWriteCallbackHandler callbackHandler, boolean
> isCloneByteBuffer)
> + throws IOException {
> + return selectorHandler.getAsyncQueueWriter().write(
> +
> datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress,
> + buffer, callbackHandler, isCloneByteBuffer);
> + }
> +
> +
> + /**
> * Receive bytes.
> * @param byteBuffer The byteBuffer to store bytes.
> * @param socketAddress
> @@ -469,5 +538,4 @@
> public UDPSelectorHandler getSelectorHandler() {
> return selectorHandler;
> }
> -
> }
> Index: UDPSelectorHandler.java
> --- UDPSelectorHandler.java Base (BASE)
> +++ UDPSelectorHandler.java Locally Modified (Based On LOCAL)
> @@ -23,6 +23,7 @@
>
> package com.sun.grizzly;
>
> +import com.sun.grizzly.async.UDPAsyncQueueWriter;
> import com.sun.grizzly.util.Copyable;
> import com.sun.grizzly.util.SelectionKeyOP;
> import java.io.IOException;
> @@ -92,6 +93,10 @@
> public void preSelect(Context ctx) throws IOException {
> initOpRegistriesIfRequired();
>
> + if (asyncQueueWriter == null) {
> + asyncQueueWriter = new UDPAsyncQueueWriter(this);
> + }
> +
> if (selector == null){
> try{
> connectorInstanceHandler = new ConnectorInstanceHandler.
> @@ -174,7 +179,12 @@
> Controller.logger().log(Level.SEVERE,
> "closeSocketException",ex);
> }
> +
> + if (asyncQueueWriter != null) {
> + asyncQueueWriter.close();
> + asyncQueueWriter = null;
> }
> + }
>
>
> /**
> @@ -258,6 +268,8 @@
> } catch (IOException ex){
> ; // LOG ME
> }
> +
> + asyncQueueWriter.onClose(channel);
> }
>
> //--------------- ConnectorInstanceHandler
> -----------------------------
>
>
>>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>>
>>
>> ------------------------------------------------------------------------
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net