# This patch file was generated by NetBeans IDE
# Following Index: paths are relative to: C:\Projects\Grizzly\trunk\modules\grizzly\src\main\java\com\sun\grizzly
# This patch can be applied using context Tools: Patch action on respective folder.
# It uses platform neutral UTF-8 encoding and \n newlines.
# Above lines and this line are ignored by the patching process.
Index: async/AsyncQueueWritable.java
--- async/AsyncQueueWritable.java Locally New
+++ async/AsyncQueueWritable.java Locally New
@@ -0,0 +1,183 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+
+package com.sun.grizzly.async;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+/**
+ * Object, which is able to send ByteBuffer
data asynchronously,
+ * using queue.
+ *
+ * @author Alexey Stashok
+ */
+public interface AsyncQueueWritable {
+ /**
+ * Method writes ByteBuffer
using async write queue.
+ * First, if write queue is empty - it tries to write ByteBuffer
+ * directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise it will be just logged by
+ * Grizzly framework.
+ *
+ * @param buffer ByteBuffer
+ * @return true, if ByteBuffer
was written completely, false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer) throws IOException;
+
+ /**
+ * Method writes ByteBuffer
using async write queue.
+ * First, if write queue is empty - it tries to write ByteBuffer
+ * directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise, if the ByteBuffer
is
+ * added to a writing queue - exception notification will come via
+ * AsyncWriteCallbackHandler
+ *
+ * @param buffer ByteBuffer
+ * @param callbackHandler AsyncWriteCallbackHandler
,
+ * which will get notified, when
+ * ByteBuffer
will be completely written
+ * @return true, if ByteBuffer
was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException;
+
+ /**
+ * Method writes ByteBuffer
using async write queue.
+ * First, if write queue is empty - it tries to write ByteBuffer
+ * directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise, if the ByteBuffer
is
+ * added to a writing queue - exception notification will come via
+ * AsyncWriteCallbackHandler
+ *
+ * @param buffer ByteBuffer
+ * @param callbackHandler AsyncWriteCallbackHandler
,
+ * which will get notified, when
+ * ByteBuffer
will be completely written
+ * @param isCloneByteBuffer if true - AsyncQueueWriter
+ * will clone given
+ * ByteBuffer
before puting it to the
+ * AsyncWriteQueue
+ * @return true, if ByteBuffer
was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException;
+
+ /**
+ * Method sends ByteBuffer
using async write queue.
+ * First, if write queue is empty - it tries to send ByteBuffer
+ * to the given SocketAddress
directly
+ * (without putting to the queue).
+ * If associated write queue is not empty or after direct sending
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise it will be just logged by
+ * Grizzly framework.
+ *
+ * @param dstAddress destination SocketAddress
data will
+ * be sent to
+ * @param buffer ByteBuffer
+ * @return true, if ByteBuffer
was written completely, false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer)
+ throws IOException;
+
+ /**
+ * Method sends ByteBuffer
using async write queue.
+ * First, if write queue is empty - it tries to send ByteBuffer
+ * to the given SocketAddress
directly
+ * (without putting to the queue).
+ * If associated write queue is not empty or after direct sending
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise, if the ByteBuffer
is
+ * added to a writing queue - exception notification will come via
+ * AsyncWriteCallbackHandler
+ *
+ * @param dstAddress destination SocketAddress
data will
+ * be sent to
+ * @param buffer ByteBuffer
+ * @param callbackHandler AsyncWriteCallbackHandler
,
+ * which will get notified, when
+ * ByteBuffer
will be completely written
+ * @return true, if ByteBuffer
was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException;
+
+ /**
+ * Method sends ByteBuffer
using async write queue.
+ * First, if write queue is empty - it tries to send ByteBuffer
+ * to the given SocketAddress
directly
+ * (without putting to the queue).
+ * If associated write queue is not empty or after direct sending
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise, if the ByteBuffer
is
+ * added to a writing queue - exception notification will come via
+ * AsyncWriteCallbackHandler
+ *
+ * @param dstAddress destination SocketAddress
data will
+ * be sent to
+ * @param buffer ByteBuffer
+ * @param callbackHandler AsyncWriteCallbackHandler
,
+ * which will get notified, when
+ * ByteBuffer
will be completely written
+ * @param isCloneByteBuffer if true - AsyncQueueWriter
+ * will clone given
+ * ByteBuffer
before puting it to the
+ * AsyncWriteQueue
+ * @return true, if ByteBuffer
was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException;
+}
Index: async/AsyncQueueWriter.java
--- async/AsyncQueueWriter.java Locally New
+++ async/AsyncQueueWriter.java Locally New
@@ -0,0 +1,247 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+
+/**
+ * Common inteface to be implemented by protocol dependant asynchronous queue
+ * writers implementations
+ *
+ * @author Alexey Stashok
+ */
+public interface AsyncQueueWriter {
+ /**
+ * Method writes ByteBuffer
to the SelectableChannel
+ * First, if SelectableChannel
associated write queue is empty -
+ * it tries to write ByteBuffer
to the given
+ * SelectableChannel
directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
+ * and SelectableChannel will be registered on
+ * SelectorHandler
, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise it will be just logged by
+ * Grizzly framework.
+ *
+ * @param key SelectionKey
associated with
+ * SelectableChannel
ByteBuffer
+ * should be written to
+ * @param buffer ByteBuffer
+ * @return true, if ByteBuffer
was written completely, false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer) throws IOException;
+
+ /**
+ * Method writes ByteBuffer
to the SelectableChannel
+ * First, if SelectableChannel
associated write queue is empty -
+ * it tries to write ByteBuffer
to the given
+ * SelectableChannel
directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
+ * and SelectableChannel will be registered on
+ * SelectorHandler
, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise, if the ByteBuffer
is
+ * added to a writing queue - exception notification will come via
+ * AsyncWriteCallbackHandler
+ *
+ * @param key SelectionKey
associated with
+ * SelectableChannel
ByteBuffer
+ * should be written to
+ * @param buffer ByteBuffer
+ * @param callbackHandler AsyncWriteCallbackHandler
,
+ * which will get notified, when
+ * ByteBuffer
will be completely written
+ * @return true, if ByteBuffer
was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException;
+
+ /**
+ * Method writes ByteBuffer
to the SelectableChannel
+ * First, if SelectableChannel
associated write queue is empty -
+ * it tries to write ByteBuffer
to the given
+ * SelectableChannel
directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
+ * and SelectableChannel will be registered on
+ * SelectorHandler
, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise, if the ByteBuffer
is
+ * added to a writing queue - exception notification will come via
+ * AsyncWriteCallbackHandler
+ *
+ * @param key SelectionKey
associated with
+ * SelectableChannel
ByteBuffer
+ * should be written to
+ * @param buffer ByteBuffer
+ * @param callbackHandler AsyncWriteCallbackHandler
,
+ * which will get notified, when
+ * ByteBuffer
will be completely written
+ * @param isCloneByteBuffer if true - AsyncQueueWriter
will
+ * clone given
+ * ByteBuffer
before puting it to the
+ * AsyncWriteQueue
+ * @return true, if ByteBuffer
was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer)
+ throws IOException;
+
+ /**
+ * Method sends ByteBuffer
to the SocketAddress
+ * First, if SelectableChannel
associated write queue is empty -
+ * it tries to write ByteBuffer
to the given
+ * SocketAddress
directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
+ * and SelectableChannel will be registered on
+ * SelectorHandler
, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise it will be just logged by
+ * Grizzly framework.
+ *
+ * @param key SelectionKey
associated with
+ * SelectableChannel
, which will be used to
+ * sendByteBuffer
to
+ * @param dstAddress destination address ByteBuffer
will be sent to
+ * @param buffer ByteBuffer
+ * @return true, if ByteBuffer
was written completely, false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer) throws IOException;
+
+ /**
+ * Method sends ByteBuffer
to the SocketAddress
+ * First, if SelectableChannel
associated write queue is empty -
+ * it tries to write ByteBuffer
to the given
+ * SocketAddress
directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
+ * and SelectableChannel will be registered on
+ * SelectorHandler
, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise, if the ByteBuffer
is
+ * added to a writing queue - exception notification will come via
+ * AsyncWriteCallbackHandler
+ *
+ * @param key SelectionKey
associated with
+ * SelectableChannel
ByteBuffer
+ * should be written to
+ * @param dstAddress destination address ByteBuffer
will be sent to
+ * @param buffer ByteBuffer
+ * @param callbackHandler AsyncWriteCallbackHandler
,
+ * which will get notified, when
+ * ByteBuffer
will be completely written
+ * @return true, if ByteBuffer
was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler)
+ throws IOException;
+
+ /**
+ * Method sends ByteBuffer
to the SocketAddress
+ * First, if SelectableChannel
associated write queue is empty -
+ * it tries to write ByteBuffer
to the given
+ * SocketAddress
directly (without putting to the queue).
+ * If associated write queue is not empty or after direct writing
+ * ByteBuffer
still has ready data to be written -
+ * ByteBuffer
will be added to AsyncWriteQueue
+ * and SelectableChannel will be registered on
+ * SelectorHandler
, waiting for OP_WRITE event.
+ * If an exception occurs, during direct writing - it will be propogated
+ * to the caller directly, otherwise, if the ByteBuffer
is
+ * added to a writing queue - exception notification will come via
+ * AsyncWriteCallbackHandler
+ *
+ * @param key SelectionKey
associated with
+ * SelectableChannel
ByteBuffer
+ * should be written to
+ * @param dstAddress destination address ByteBuffer
will be sent to
+ * @param buffer ByteBuffer
+ * @param callbackHandler AsyncWriteCallbackHandler
,
+ * which will get notified, when
+ * ByteBuffer
will be completely written
+ * @param isCloneByteBuffer if true - AsyncQueueWriter
will
+ * clone given
+ * ByteBuffer
before puting it to the
+ * AsyncWriteQueue
+ * @return true, if ByteBuffer
was written completely,
+ * false if write operation was put to queue
+ * @throws java.io.IOException
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException;
+
+ /**
+ * Checks whether there is any data in AsyncWriteQueue
ready
+ * for the given SelectableChannel
+ *
+ * @param key SelectionKey
associated with SelectableChannel
+ * @return true, if there is ready data. False otherwise.
+ */
+ public boolean hasReadyAsyncWriteData(SelectionKey key);
+
+ /**
+ * Callback method, which should be called by SelectorHandler
to
+ * notify, that given SelectableChannel
is ready to transmit data.
+ *
+ * @param key SelectionKey
associated with SelectableChannel
+ * @throws java.io.IOException
+ */
+ public void onWrite(SelectionKey key) throws IOException;
+
+ /**
+ * Callback method, which should be called by SelectorHandler
to
+ * notify, that given SelectableChannel
is going to be closed, so
+ * related SelectableChannel
data could be released from
+ * AsyncWriteQueue
+ *
+ * @param SelectableChannel
+ * @throws java.io.IOException
+ */
+ public void onClose(SelectableChannel channel);
+
+ public void close();
+
+}
Index: async/AsyncQueueWriterContextTask.java
--- async/AsyncQueueWriterContextTask.java Locally New
+++ async/AsyncQueueWriterContextTask.java Locally New
@@ -0,0 +1,78 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import com.sun.grizzly.*;
+
+/**
+ * AsyncQueueWriter
task, which will be executed by
+ * Context
, when Context.execute(ContextTask
)
+ * is called.
+ *
+ * @author Alexey Stashok
+ */
+public class AsyncQueueWriterContextTask extends ContextTask {
+ private static final TaskPool taskPool =
+ new TaskPool() {
+ @Override
+ public AsyncQueueWriterContextTask newTask() {
+ return new AsyncQueueWriterContextTask();
+ }
+ };
+
+ private AsyncQueueWriter asyncQueueWriter;
+
+ public static AsyncQueueWriterContextTask poll() {
+ return taskPool.poll();
+ }
+
+ public static void offer(AsyncQueueWriterContextTask contextTask) {
+ contextTask.recycle();
+ taskPool.offer(contextTask);
+ }
+
+ public Object call() throws Exception {
+ try {
+ asyncQueueWriter.onWrite(context.getSelectionKey());
+ } finally {
+ offer(this);
+ }
+
+ return null;
+ }
+
+ public AsyncQueueWriter getAsyncQueueWriter() {
+ return asyncQueueWriter;
+ }
+
+ public void setAsyncQueueWriter(AsyncQueueWriter asyncWriter) {
+ this.asyncQueueWriter = asyncWriter;
+ }
+
+ @Override
+ public void recycle() {
+ asyncQueueWriter = null;
+ super.recycle();
+ }
+}
Index: async/AsyncWriteCallbackHandler.java
--- async/AsyncWriteCallbackHandler.java Locally New
+++ async/AsyncWriteCallbackHandler.java Locally New
@@ -0,0 +1,38 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+
+/**
+ * Callback handler interface, used by AsyncQueueWriter
to notify
+ * custom code about completion of specific ByteBuffer
writing.
+ *
+ * @author Alexey Stashok
+ */
+public interface AsyncWriteCallbackHandler {
+ public void onWriteCompleted(SelectionKey key, ByteBuffer buffer);
+ public void onIOException(SelectionKey key, ByteBuffer buffer);
+}
Index: async/AsyncWriteQueue.java
--- async/AsyncWriteQueue.java Locally New
+++ async/AsyncWriteQueue.java Locally New
@@ -0,0 +1,144 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import java.nio.channels.SelectableChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Class represents collection of
+ * SelectableChannel
s and correspondent queue of
+ * ByteBuffer
, which should be written asynchronously.
+ * This implementation is TCP protocol specific.
+ *
+ * @author Alexey Stashok
+ */
+public class AsyncWriteQueue{
+ private Map queueMap =
+ new ConcurrentHashMap();
+
+ /**
+ * Add data to the AsyncWriteQueue
, corresponding to the given
+ * SelectableChannel
+ *
+ * @param channel SelectableChannel
+ * @param queueRecord write data unit
+ */
+ public void offer(SelectableChannel channel, E queueRecord) {
+ ChannelAsyncWriteEntry entry = obtainChannelAsyncWriteEntry(channel);
+ entry.queue.offer(queueRecord);
+ }
+
+ /**
+ * Get head element of SelectableChannel
write queue.
+ * Element will not be removed from queue.
+ *
+ * @param channel SelectableChannel
+ *
+ * @return AsyncQueueRecord
write data unit
+ */
+ public E peek(SelectableChannel channel) {
+ ChannelAsyncWriteEntry entry = queueMap.get(channel);
+ if (entry != null) {
+ return entry.queue.peek();
+ }
+
+ return null;
+ }
+
+ /**
+ * Get head element of SelectableChannel
write queue.
+ * Element will be removed from queue.
+ *
+ * @param channel SelectableChannel
+ *
+ * @return AsyncQueueRecord
write data unit
+ */
+ public E poll(SelectableChannel channel) {
+ ChannelAsyncWriteEntry entry = queueMap.get(channel);
+ if (entry != null) {
+ return entry.queue.poll();
+ }
+
+ return null;
+ }
+
+ /**
+ * Remove head element of SelectableChannel
write queue.
+ *
+ * @param channel SelectableChannel
+ */
+ public void removeEntry(SelectableChannel channel) {
+ queueMap.remove(channel);
+ }
+
+ /**
+ * Get the size of SelectableChannel
write queue.
+ *
+ * @param channel SelectableChannel
+ * @return size of SelectableChannel
write queue.
+ */
+ public int size(SelectableChannel channel) {
+ ChannelAsyncWriteEntry entry = queueMap.get(channel);
+ return entry == null ? 0 : entry.queue.size();
+ }
+
+ public void clear() {
+ queueMap.clear();
+ }
+
+ protected ChannelAsyncWriteEntry obtainChannelAsyncWriteEntry(SelectableChannel channel) {
+ ChannelAsyncWriteEntry entry = queueMap.get(channel);
+ if (entry == null) {
+ synchronized(channel) {
+ entry = queueMap.get(channel);
+ if (entry == null) {
+ entry = new ChannelAsyncWriteEntry();
+ queueMap.put(channel, entry);
+ }
+ }
+ }
+ return entry;
+ }
+
+ /**
+ * AsyncWriteQueue
data unit
+ */
+ protected class ChannelAsyncWriteEntry {
+ public ConcurrentLinkedQueue queue;
+ public ReentrantLock writeLock;
+ public ReentrantLock onWriteLock;
+ public ReentrantLock updateLock;
+
+ public ChannelAsyncWriteEntry() {
+ queue = new ConcurrentLinkedQueue();
+ writeLock = new ReentrantLock();
+ onWriteLock = new ReentrantLock();
+ updateLock = new ReentrantLock();
+ }
+ }
+}
\ No newline at end of file
Index: async/TCPAsyncQueueWriter.java
--- async/TCPAsyncQueueWriter.java Locally New
+++ async/TCPAsyncQueueWriter.java Locally New
@@ -0,0 +1,255 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import com.sun.grizzly.*;
+import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * TCP implementation of AsyncQueueWriter
+ *
+ * @author Alexey Stashok
+ */
+public class TCPAsyncQueueWriter implements AsyncQueueWriter {
+ private SelectorHandler selectorHandler;
+ private AsyncWriteQueue writeQueue;
+
+ public TCPAsyncQueueWriter(SelectorHandler selectorHandler) {
+ this.selectorHandler = selectorHandler;
+ writeQueue = new AsyncWriteQueue();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer) throws IOException {
+ return write(key, buffer, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return write(key, buffer, callbackHandler, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) throws IOException {
+
+ SelectableChannel channel = key.channel();
+ // If AsyncWriteQueue is empty - try to write ByteBuffer here
+ ChannelAsyncWriteEntry channelEntry =
+ writeQueue.obtainChannelAsyncWriteEntry(channel);
+
+ ConcurrentLinkedQueue queue = channelEntry.queue;
+
+ boolean hasWriteLock = false;
+
+ try {
+ if (queue.isEmpty() &&
+ (hasWriteLock = channelEntry.writeLock.tryLock())) {
+ ((WritableByteChannel) channel).write(buffer);
+ }
+
+ if (buffer.hasRemaining()) {
+ // clone ByteBuffer if required
+ if (isCloneByteBuffer) {
+ int size = buffer.remaining();
+ ByteBuffer newBuffer = buffer.isDirect() ?
+ ByteBuffer.allocateDirect(size) :
+ ByteBuffer.allocate(size);
+
+ newBuffer.put(buffer);
+ newBuffer.position(0);
+ buffer = newBuffer;
+ }
+
+ // add new element to the queue
+ channelEntry.updateLock.lock();
+ queue.offer(new TCPAsyncQueueRecord(buffer,
+ callbackHandler));
+
+ /*
+ * This check helps to avoid not required key registering
+ */
+ if (!channelEntry.onWriteLock.isLocked()) {
+ registerForWriting(key);
+ }
+
+ channelEntry.updateLock.unlock();
+
+ return false;
+ }
+ } finally {
+ if (hasWriteLock) {
+ channelEntry.writeLock.unlock();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean hasReadyAsyncWriteData(SelectionKey key) {
+ return writeQueue.size(key.channel()) > 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void onWrite(SelectionKey key) throws IOException {
+ SelectableChannel channel = key.channel();
+
+ ChannelAsyncWriteEntry channelEntry =
+ writeQueue.obtainChannelAsyncWriteEntry(channel);
+
+ boolean hasUpdateLock = false;
+ boolean hasOnWriteLock = false;
+
+ if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
+ try {
+ ConcurrentLinkedQueue queue = channelEntry.queue;
+
+ while (queue.size() > 0) {
+ if (hasUpdateLock) {
+ channelEntry.updateLock.unlock();
+ hasUpdateLock = false;
+ }
+
+ TCPAsyncQueueRecord queueRecord = queue.peek();
+
+ ByteBuffer byteBuffer = queueRecord.byteBuffer;
+ try {
+ ((WritableByteChannel) channel).write(byteBuffer);
+ } catch (IOException e) {
+ if (queueRecord.callbackHandler != null) {
+ queueRecord.callbackHandler.onIOException(key,
+ byteBuffer);
+ }
+ }
+
+ if (!byteBuffer.hasRemaining()) {
+ TCPAsyncQueueRecord removedQueueRecord =
+ queue.poll();
+ assert removedQueueRecord == queueRecord;
+
+ if (removedQueueRecord.callbackHandler != null) {
+ removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
+ }
+ } else {
+ hasOnWriteLock = false;
+ channelEntry.onWriteLock.unlock();
+ registerForWriting(key);
+ break;
+ }
+
+ if (queue.size() == 0) {
+ /*
+ * If queue is empty - there is possibility that write method
+ * will add entry, but it will not be processed, because
+ * onWriteLock is currently locked
+ */
+ channelEntry.updateLock.lock();
+ hasUpdateLock = true;
+ }
+ }
+ } finally {
+ if (hasOnWriteLock) {
+ channelEntry.onWriteLock.unlock();
+ }
+
+ if (hasUpdateLock) {
+ channelEntry.updateLock.unlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress, ByteBuffer buffer) throws IOException {
+ throw new UnsupportedOperationException("Not supported for TCP transport.");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ throw new UnsupportedOperationException("Not supported for TCP transport.");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) throws IOException {
+ throw new UnsupportedOperationException("Not supported for TCP transport.");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void onClose(SelectableChannel channel) {
+ writeQueue.removeEntry(channel);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() {
+ writeQueue.clear();
+ writeQueue = null;
+ }
+
+ private void registerForWriting(SelectionKey key) {
+ selectorHandler.register(key, SelectionKey.OP_WRITE);
+ }
+
+ /**
+ * AsyncWriteQueue
data unit specific for TCP protocol
+ */
+ protected static class TCPAsyncQueueRecord {
+ public ByteBuffer byteBuffer;
+ public AsyncWriteCallbackHandler callbackHandler;
+
+ public TCPAsyncQueueRecord(ByteBuffer byteBuffer,
+ AsyncWriteCallbackHandler callbackHandler) {
+ this.byteBuffer = byteBuffer;
+ this.callbackHandler = callbackHandler;
+ }
+ }
+}
Index: async/UDPAsyncQueueWriter.java
--- async/UDPAsyncQueueWriter.java Locally New
+++ async/UDPAsyncQueueWriter.java Locally New
@@ -0,0 +1,271 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.async;
+
+import com.sun.grizzly.*;
+import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+
+/**
+ * UDP implementation of AsyncQueueWriter
+ *
+ * @author Alexey Stashok
+ */
+public class UDPAsyncQueueWriter implements AsyncQueueWriter {
+ private SelectorHandler selectorHandler;
+ private AsyncWriteQueue writeQueue;
+
+ public UDPAsyncQueueWriter(SelectorHandler selectorHandler) {
+ this.selectorHandler = selectorHandler;
+ writeQueue = new AsyncWriteQueue();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer) throws IOException {
+ return write(key, null, buffer, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return write(key, null, buffer, callbackHandler, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) throws IOException {
+ return write(key, null, buffer, callbackHandler, isCloneByteBuffer);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer) throws IOException {
+ return write(key, dstAddress, buffer, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler)
+ throws IOException {
+ return write(key, dstAddress, buffer, callbackHandler, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean write(SelectionKey key, SocketAddress dstAddress,
+ ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException {
+
+ SelectableChannel channel = key.channel();
+ // If AsyncWriteQueue is empty - try to write ByteBuffer here
+ ChannelAsyncWriteEntry channelEntry =
+ writeQueue.obtainChannelAsyncWriteEntry(channel);
+
+ ConcurrentLinkedQueue queue = channelEntry.queue;
+
+ boolean hasWriteLock = false;
+
+ try {
+ if (queue.isEmpty() &&
+ (hasWriteLock = channelEntry.writeLock.tryLock())) {
+ doWrite((WritableByteChannel) channel, dstAddress, buffer);
+ }
+
+ if (buffer.hasRemaining()) {
+ // clone ByteBuffer if required
+ if (isCloneByteBuffer) {
+ int size = buffer.remaining();
+ ByteBuffer newBuffer = buffer.isDirect() ?
+ ByteBuffer.allocateDirect(size) :
+ ByteBuffer.allocate(size);
+
+ newBuffer.put(buffer);
+ newBuffer.position(0);
+ buffer = newBuffer;
+ }
+
+ // add new element to the queue
+ channelEntry.updateLock.lock();
+ queue.offer(new UDPAsyncQueueRecord(dstAddress, buffer,
+ callbackHandler));
+
+ /*
+ * This check helps to avoid not required key registering
+ */
+ if (!channelEntry.onWriteLock.isLocked()) {
+ registerForWriting(key);
+ }
+
+ channelEntry.updateLock.unlock();
+
+ return false;
+ }
+ } finally {
+ if (hasWriteLock) {
+ channelEntry.writeLock.unlock();
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean hasReadyAsyncWriteData(SelectionKey key) {
+ return writeQueue.size(key.channel()) > 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void onWrite(SelectionKey key) throws IOException {
+ SelectableChannel channel = key.channel();
+
+ ChannelAsyncWriteEntry channelEntry =
+ writeQueue.obtainChannelAsyncWriteEntry(channel);
+
+ boolean hasUpdateLock = false;
+ boolean hasOnWriteLock = false;
+
+ if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) {
+ try {
+ ConcurrentLinkedQueue queue =
+ channelEntry.queue;
+
+ while (queue.size() > 0) {
+ if (hasUpdateLock) {
+ channelEntry.updateLock.unlock();
+ hasUpdateLock = false;
+ }
+
+ UDPAsyncQueueRecord queueRecord = queue.peek();
+
+ ByteBuffer byteBuffer = queueRecord.byteBuffer;
+ try {
+ doWrite((WritableByteChannel) channel,
+ queueRecord.dstAddress, byteBuffer);
+ } catch (IOException e) {
+ if (queueRecord.callbackHandler != null) {
+ queueRecord.callbackHandler.onIOException(key,
+ byteBuffer);
+ }
+ }
+
+ if (!byteBuffer.hasRemaining()) {
+ UDPAsyncQueueRecord removedQueueRecord = queue.poll();
+ assert removedQueueRecord == queueRecord;
+
+ if (removedQueueRecord.callbackHandler != null) {
+ removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer);
+ }
+ } else {
+ hasOnWriteLock = false;
+ channelEntry.onWriteLock.unlock();
+ registerForWriting(key);
+ break;
+ }
+
+ if (queue.size() == 0) {
+ /*
+ * If queue is empty - there is possibility that write method
+ * will add entry, but it will not be processed, because
+ * onWriteLock is currently locked
+ */
+ channelEntry.updateLock.lock();
+ hasUpdateLock = true;
+ }
+ }
+ } finally {
+ if (hasOnWriteLock) {
+ channelEntry.onWriteLock.unlock();
+ }
+
+ if (hasUpdateLock) {
+ channelEntry.updateLock.unlock();
+ }
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void onClose(SelectableChannel channel) {
+ writeQueue.removeEntry(channel);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() {
+ writeQueue.clear();
+ writeQueue = null;
+ }
+
+ private void doWrite(WritableByteChannel channel, SocketAddress dstAddress,
+ ByteBuffer byteBuffer) throws IOException {
+ if (dstAddress != null) {
+ ((DatagramChannel) channel).send(byteBuffer, dstAddress);
+ } else {
+ channel.write(byteBuffer);
+ }
+ }
+
+ private void registerForWriting(SelectionKey key) {
+ selectorHandler.register(key, SelectionKey.OP_WRITE);
+ }
+
+ /**
+ * AsyncWriteQueue
data unit specific for TCP protocol
+ */
+ protected static class UDPAsyncQueueRecord extends
+ TCPAsyncQueueWriter.TCPAsyncQueueRecord {
+ public SocketAddress dstAddress;
+
+ public UDPAsyncQueueRecord(SocketAddress dstAddress, ByteBuffer byteBuffer,
+ AsyncWriteCallbackHandler callbackHandler) {
+ super(byteBuffer, callbackHandler);
+ this.dstAddress = dstAddress;
+ }
+ }
+}
Index: CallbackHandlerContextTask.java
--- CallbackHandlerContextTask.java Locally New
+++ CallbackHandlerContextTask.java Locally New
@@ -0,0 +1,98 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly;
+
+import com.sun.grizzly.Context.OpType;
+import com.sun.grizzly.ContextTask.TaskPool;
+
+/**
+ * CallbackHandler
task, which will be executed by
+ * Context
, when Context.execute(ContextTask
)
+ * is called.
+ *
+ * @author Alexey Stashok
+ */
+public class CallbackHandlerContextTask extends ContextTask {
+ private static final TaskPool taskPool =
+ new TaskPool() {
+ @Override
+ public CallbackHandlerContextTask newTask() {
+ return new CallbackHandlerContextTask();
+ }
+ };
+
+ private CallbackHandler callBackHandler;
+
+ public static CallbackHandlerContextTask poll() {
+ return taskPool.poll();
+ }
+
+ public static void offer(CallbackHandlerContextTask contextTask) {
+ contextTask.recycle();
+ taskPool.offer(contextTask);
+ }
+
+ public Object call() throws Exception {
+ IOEvent ioEvent = context.getIOEvent();
+ OpType currentOpType = context.getCurrentOpType();
+
+ try {
+ if (currentOpType == OpType.OP_READ) {
+ callBackHandler.onRead(ioEvent);
+ } else if (currentOpType == OpType.OP_WRITE) {
+ callBackHandler.onWrite(ioEvent);
+ } else if (currentOpType == OpType.OP_CONNECT) {
+ callBackHandler.onConnect(ioEvent);
+ }
+ } finally {
+ if (ioEvent != null) {
+ // Prevent the CallbackHandler to re-use the context.
+ // TODO: This is still dangerous as the Context might have been
+ // cached by the CallbackHandler.
+ ioEvent.attach(null);
+ ioEvent = null;
+ }
+
+ offer(this);
+ }
+
+ return null;
+ }
+
+ public CallbackHandler getCallBackHandler() {
+ return callBackHandler;
+ }
+
+ public void setCallBackHandler(CallbackHandler callBackHandler) {
+ this.callBackHandler = callBackHandler;
+ }
+
+ @Override
+ public void recycle() {
+ callBackHandler = null;
+ super.recycle();
+ }
+
+
+}
Index: Context.java
--- Context.java Base (BASE)
+++ Context.java Locally Modified (Based On LOCAL)
@@ -23,9 +23,14 @@
package com.sun.grizzly;
+import com.sun.grizzly.async.AsyncQueueWritable;
+import com.sun.grizzly.async.AsyncQueueWriter;
+import com.sun.grizzly.async.AsyncWriteCallbackHandler;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.HashMap;
-import java.util.concurrent.Callable;
/**
* This Object is used to share information between the Grizzly Framework
@@ -33,7 +38,7 @@
*
* @author Jeanfrancois Arcand
*/
-public class Context implements Callable {
+public class Context {
/**
* A SelectionKey's registration state.
@@ -119,6 +124,16 @@
/**
+ * AsyncQueueWriter
+ */
+ private AsyncQueueWriter asyncQueueWriter;
+
+ /**
+ * AsyncQueueWritable
+ */
+ private AsyncQueueWritable asyncQueueWritable;
+
+ /**
* Constructor
*/
public Context() {
@@ -210,6 +225,7 @@
currentOpType = null;
protocolChain = null;
ioEvent = null;
+ asyncQueueWriter = null;
}
@@ -233,47 +249,6 @@
/**
- * Execute the ProtocolChain
.
- * @throws java.lang.Exception Exception thrown by protocol chain
- */
- public Object call() throws Exception {
- // If a IOEvent has been defined, invoke it first and
- // let its associated CallbackHandler decide if the ProtocolChain
- // be invoked or not.
- Object attachment = key.attachment();
- if (ioEvent != null && (attachment instanceof CallbackHandler)){
- try{
- CallbackHandler callBackHandler = ((CallbackHandler)attachment);
- if (currentOpType == OpType.OP_READ){
- callBackHandler.onRead(ioEvent);
- } else if (currentOpType == OpType.OP_WRITE){
- callBackHandler.onWrite(ioEvent);
- } else if (currentOpType == OpType.OP_CONNECT){
- callBackHandler.onConnect(ioEvent);
- }
- } finally {
- if (ioEvent != null){
- // Prevent the CallbackHandler to re-use the context.
- // TODO: This is still dangerous as the Context might have been
- // cached by the CallbackHandler.
- ioEvent.attach(null);
- ioEvent = null;
- }
- }
- } else {
- SelectionKey currentKey = key;
- selectorHandler.getSelectionKeyHandler().process(currentKey);
- try {
- protocolChain.execute(this);
- } finally {
- selectorHandler.getSelectionKeyHandler().postProcess(currentKey);
- }
- }
- return null;
- }
-
-
- /**
* Return ProtocolChain
executed by this instance.
* @return ProtocolChain
instance
*/
@@ -313,9 +288,12 @@
* Execute this Context using the Controller's Pipeline
* @throws com.sun.grizzly.PipelineFullException
*/
- public void execute() throws PipelineFullException{
- getPipeline().execute(this);
+ public void execute(ContextTask contextTask) throws PipelineFullException {
+ if (contextTask != null) {
+ contextTask.setContext(this);
+ getPipeline().execute(contextTask);
}
+ }
/**
@@ -392,4 +370,89 @@
public void setSelectorHandler(SelectorHandler selectorHandler) {
this.selectorHandler = selectorHandler;
}
+
+
+ /**
+ * Returns AsyncQueueWritable
, assciated with the current
+ * Context
. This method is not threadsafe.
+ *
+ * @return AsyncQueueWritable
+ */
+ public AsyncQueueWritable getAsyncQueueWritable() {
+ if (asyncQueueWritable == null) {
+ asyncQueueWritable = new AsyncQueueWritableContextWrapper();
}
+
+ return asyncQueueWritable;
+ }
+
+ /**
+ * Return the AsyncQueueWriter
+ * @return the AsyncQueueWriter
+ */
+ protected AsyncQueueWriter getAsyncQueueWriter() {
+ return asyncQueueWriter;
+ }
+
+ /**
+ * Set the AsyncQueueWriter
+ * @param asyncQueueWriter AsyncQueueWriter
+ */
+ protected void setAsyncQueueWriter(AsyncQueueWriter asyncQueueWriter) {
+ this.asyncQueueWriter = asyncQueueWriter;
+ }
+
+ private class AsyncQueueWritableContextWrapper implements AsyncQueueWritable {
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer) throws IOException {
+ return asyncQueueWriter.write(key, buffer);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return asyncQueueWriter.write(key, buffer, callbackHandler);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException {
+ return asyncQueueWriter.write(key, buffer, callbackHandler,
+ isCloneByteBuffer);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer)
+ throws IOException {
+ return asyncQueueWriter.write(key, dstAddress, buffer);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return asyncQueueWriter.write(key, dstAddress, buffer,
+ callbackHandler);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer)
+ throws IOException {
+ return asyncQueueWriter.write(key, dstAddress, buffer,
+ callbackHandler, isCloneByteBuffer);
+ }
+ }
+}
Index: ContextTask.java
--- ContextTask.java Locally New
+++ ContextTask.java Locally New
@@ -0,0 +1,70 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Task, which will be executed by Context
, when
+ * Context.execute(ContextTask
) is called.
+ *
+ * @author Alexey Stashok
+ */
+public abstract class ContextTask implements Callable {
+ protected Context context;
+
+ public Context getContext() {
+ return context;
+ }
+
+ public void setContext(Context context) {
+ this.context = context;
+ }
+
+ public void recycle() {
+ context = null;
+ }
+
+ protected abstract static class TaskPool {
+ private ConcurrentLinkedQueue pool =
+ new ConcurrentLinkedQueue();
+
+ public abstract E newTask();
+
+ public E poll() {
+ E task = pool.poll();
+ if (task == null) {
+ task = newTask();
+ }
+
+ return task;
+ }
+
+ public void offer(E task) {
+ task.recycle();
+ pool.offer(task);
+ }
+ }
+}
Index: Controller.java
--- Controller.java Base (BASE)
+++ Controller.java Locally Modified (Based On LOCAL)
@@ -298,6 +298,7 @@
key = iterator.next();
iterator.remove();
boolean skipOpWrite = false;
+ delegateToWorkerThread = false;
if (key.isValid()) {
if ((key.readyOps() & SelectionKey.OP_ACCEPT)
== SelectionKey.OP_ACCEPT){
@@ -356,7 +357,7 @@
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "OP_WRITE on " + key);
}
- delegateToWorkerThread = selectorHandler.
+ delegateToWorkerThread |= selectorHandler.
onWriteInterest(key,serverCtx);
}
@@ -368,7 +369,9 @@
Context context = pollContext(key,protocolChain);
context.setSelectorHandler(selectorHandler);
context.setPipeline(selectorHandler.pipeline());
- context.execute();
+ context.setAsyncQueueWriter(
+ selectorHandler.getAsyncQueueWriter());
+ context.execute(ProtocolChainContextTask.poll());
}
} else {
selectorHandler.getSelectionKeyHandler().cancel(key);
Index: filter/EchoAsyncWriteQueueFilter.java
--- filter/EchoAsyncWriteQueueFilter.java Locally New
+++ filter/EchoAsyncWriteQueueFilter.java Locally New
@@ -0,0 +1,81 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
+ */
+
+package com.sun.grizzly.filter;
+
+import com.sun.grizzly.Context;
+import com.sun.grizzly.Controller;
+import com.sun.grizzly.ProtocolFilter;
+import com.sun.grizzly.util.OutputWriter;
+import com.sun.grizzly.util.WorkerThread;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+
+/**
+ * Simple echo filter
+ *
+ * @author Alexey Stashok
+ */
+public class EchoAsyncWriteQueueFilter implements ProtocolFilter {
+ AtomicInteger counter = new AtomicInteger(1);
+ public boolean execute(Context ctx) throws IOException {
+ final WorkerThread workerThread = ((WorkerThread)Thread.currentThread());
+ ByteBuffer buffer = workerThread.getByteBuffer();
+ buffer.flip();
+ if (buffer.hasRemaining()) {
+ // Depending on protocol perform echo
+ try {
+ if (ctx.getProtocol() == Controller.Protocol.TCP) { // TCP case
+ ctx.getAsyncQueueWritable().writeToAsyncQueue(buffer, null, true);
+ } else if (ctx.getProtocol() == Controller.Protocol.UDP) { // UDP case
+ SocketAddress address = (SocketAddress)
+ ctx.getAttribute(ReadFilter.UDP_SOCKETADDRESS);
+ ctx.getAsyncQueueWritable().writeToAsyncQueue(address,
+ buffer, null, true);
+ }
+ } catch (IOException ex) {
+ // Store incoming data in byte[]
+ byte[] data = new byte[buffer.remaining()];
+ int position = buffer.position();
+ buffer.get(data);
+ buffer.position(position);
+
+ Controller.logger().log(Level.WARNING,
+ "Exception. Echo \"" + new String(data) + "\"");
+ throw ex;
+ }
+ }
+
+ buffer.clear();
+ return false;
+ }
+
+ public boolean postExecute(Context ctx) throws IOException {
+ return true;
+ }
+}
Index: ProtocolChainContextTask.java
--- ProtocolChainContextTask.java Locally New
+++ ProtocolChainContextTask.java Locally New
@@ -0,0 +1,69 @@
+/*
+ * The contents of this file are subject to the terms
+ * of the Common Development and Distribution License
+ * (the License). You may not use this file except in
+ * compliance with the License.
+ *
+ * You can obtain a copy of the license at
+ * https://glassfish.dev.java.net/public/CDDLv1.0.html or
+ * glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL
+ * Header Notice in each file and include the License file
+ * at glassfish/bootstrap/legal/CDDLv1.0.txt.
+ * If applicable, add the following below the CDDL Header,
+ * with the fields enclosed by brackets [] replaced by
+ * you own identifying information:
+ * "Portions Copyrighted [year] [name of copyright owner]"
+ *
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
+ */
+
+
+package com.sun.grizzly;
+
+import java.nio.channels.SelectionKey;
+
+/**
+ * ProtocolChain
task, which will be executed by
+ * Context
, when Context.execute(ContextTask
)
+ * is called.
+ *
+ * @author Alexey Stashok
+ */
+public class ProtocolChainContextTask extends ContextTask {
+ private static final TaskPool taskPool =
+ new TaskPool() {
+ @Override
+ public ProtocolChainContextTask newTask() {
+ return new ProtocolChainContextTask();
+ }
+ };
+
+ public static ProtocolChainContextTask poll() {
+ return taskPool.poll();
+ }
+
+ public static void offer(ProtocolChainContextTask contextTask) {
+ contextTask.recycle();
+ taskPool.offer(contextTask);
+ }
+
+ public Object call() throws Exception {
+ SelectionKey currentKey = context.getSelectionKey();
+ SelectionKeyHandler selectionKeyHandler = context.
+ getSelectorHandler().getSelectionKeyHandler();
+
+ selectionKeyHandler.process(currentKey);
+ try {
+ context.getProtocolChain().execute(context);
+ } finally {
+ selectionKeyHandler.postProcess(currentKey);
+ offer(this);
+ }
+
+ return null;
+ }
+}
Index: SelectorHandler.java
--- SelectorHandler.java Base (BASE)
+++ SelectorHandler.java Locally Modified (Based On LOCAL)
@@ -22,6 +22,7 @@
*/
package com.sun.grizzly;
+import com.sun.grizzly.async.AsyncQueueWriter;
import com.sun.grizzly.util.Copyable;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
@@ -203,6 +204,16 @@
/**
+ * Returns AsyncQueueWriter
associated with this
+ * SelectorHandler
. Method will return null, if this
+ * TCPSelectorHandler
is not running.
+ *
+ * @return AsyncQuquqWriter
+ */
+ public AsyncQueueWriter getAsyncQueueWriter();
+
+
+ /**
* Return the Pipeline used to execute this SelectorHandler's
* SelectionKey ops
* @return The pipeline to use, or null if the Controller's Pipeline
Index: TCPConnectorHandler.java
--- TCPConnectorHandler.java Base (BASE)
+++ TCPConnectorHandler.java Locally Modified (Based On LOCAL)
@@ -23,6 +23,8 @@
package com.sun.grizzly;
import com.sun.grizzly.Controller.Protocol;
+import com.sun.grizzly.async.AsyncWriteCallbackHandler;
+import com.sun.grizzly.async.AsyncQueueWritable;
import com.sun.grizzly.util.ByteBufferInputStream;
import com.sun.grizzly.util.OutputWriter;
import java.io.IOException;
@@ -70,7 +72,8 @@
*
* @author Jeanfrancois Arcand
*/
-public class TCPConnectorHandler implements ConnectorHandler{
+public class TCPConnectorHandler implements
+ ConnectorHandler, AsyncQueueWritable {
/**
* The underlying TCPSelectorHandler used to mange SelectionKeys.
@@ -399,6 +402,72 @@
/**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()), buffer);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()), buffer,
+ callbackHandler);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()), buffer,
+ callbackHandler, isCloneByteBuffer);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer)
+ throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()), dstAddress,
+ buffer);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()), dstAddress,
+ buffer, callbackHandler);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer)
+ throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ socketChannel.keyFor(selectorHandler.getSelector()), dstAddress,
+ buffer, callbackHandler, isCloneByteBuffer);
+ }
+
+
+ /**
* Close the underlying connection.
*/
public void close() throws IOException{
Index: TCPSelectorHandler.java
--- TCPSelectorHandler.java Base (BASE)
+++ TCPSelectorHandler.java Locally Modified (Based On LOCAL)
@@ -23,6 +23,9 @@
package com.sun.grizzly;
+import com.sun.grizzly.async.AsyncQueueWriter;
+import com.sun.grizzly.async.TCPAsyncQueueWriter;
+import com.sun.grizzly.async.AsyncQueueWriterContextTask;
import com.sun.grizzly.util.Cloner;
import com.sun.grizzly.util.Copyable;
import com.sun.grizzly.util.SelectionKeyOP;
@@ -171,6 +174,7 @@
*/
protected ProtocolChainInstanceHandler instanceHandler;
+ protected AsyncQueueWriter asyncQueueWriter;
public TCPSelectorHandler(){
}
@@ -234,6 +238,10 @@
public void preSelect(Context ctx) throws IOException {
initOpRegistriesIfRequired();
+ if (asyncQueueWriter == null) {
+ asyncQueueWriter = new TCPAsyncQueueWriter(this);
+ }
+
if (selector == null){
try{
connectorInstanceHandler = new ConnectorInstanceHandler.
@@ -423,7 +431,7 @@
/**
* Shuntdown this instance by closing its Selector and associated channels.
*/
- public void shutdown(){
+ public void shutdown() {
if (selector != null){
for (SelectionKey selectionKey : selector.keys()) {
selectionKeyHandler.close(selectionKey);
@@ -454,7 +462,11 @@
"selector.close",ex);
}
+ if (asyncQueueWriter != null) {
+ asyncQueueWriter.close();
+ asyncQueueWriter = null;
}
+ }
/**
* {@inheritDoc}
@@ -494,10 +506,11 @@
throws IOException{
// disable OP_READ on key before doing anything else
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
- if (key.attachment() instanceof CallbackHandler){
- final Context context = ctx.getController().pollContext(key);
+ Object attach = key.attachment();
+ if (attach instanceof CallbackHandler){
+ final Context context = pollContext(ctx, key);
context.setCurrentOpType(Context.OpType.OP_READ);
- invokeCallbackHandler(context);
+ invokeCallbackHandler((CallbackHandler) attach, context);
return false;
} else {
return true;
@@ -515,12 +528,19 @@
throws IOException{
// disable OP_WRITE on key before doing anything else
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
- if (key.attachment() instanceof CallbackHandler){
- final Context context = ctx.getController().pollContext(key);
- context.setSelectionKey(key);
+
+ Object attach = null;
+
+ if (asyncQueueWriter.hasReadyAsyncWriteData(key)) {
+ final Context context = pollContext(ctx, key);
context.setCurrentOpType(Context.OpType.OP_WRITE);
- invokeCallbackHandler(context);
+ invokeAsyncQueueWriter(context);
return false;
+ } else if ((attach = key.attachment()) instanceof CallbackHandler){
+ final Context context = pollContext(ctx, key);
+ context.setCurrentOpType(Context.OpType.OP_WRITE);
+ invokeCallbackHandler((CallbackHandler) attach, context);
+ return false;
} else {
return true;
}
@@ -541,11 +561,11 @@
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
- if (key.attachment() instanceof CallbackHandler){
- Context context = ctx.getController().pollContext(key);
- context.setSelectionKey(key);
+ Object attach = key.attachment();
+ if (attach instanceof CallbackHandler){
+ final Context context = pollContext(ctx, key);
context.setCurrentOpType(Context.OpType.OP_CONNECT);
- invokeCallbackHandler(context);
+ invokeCallbackHandler((CallbackHandler) attach, context);
}
return false;
}
@@ -556,13 +576,14 @@
* @param context Context
* @throws java.io.IOException
*/
- protected void invokeCallbackHandler(Context context) throws IOException{
- context.setSelectorHandler(this);
-
+ protected void invokeCallbackHandler(CallbackHandler callbackHandler,
+ Context context) throws IOException{
IOEventioEvent = new IOEvent.DefaultIOEvent(context);
context.setIOEvent(ioEvent);
try {
- context.execute();
+ CallbackHandlerContextTask task = CallbackHandlerContextTask.poll();
+ task.setCallBackHandler(callbackHandler);
+ context.execute(task);
} catch (PipelineFullException ex){
throw new IOException(ex.getMessage());
}
@@ -570,6 +591,22 @@
/**
+ * Invoke a AsyncQueueWriter
+ * @param context Context
+ * @throws java.io.IOException
+ */
+ protected void invokeAsyncQueueWriter(Context context) throws IOException {
+ AsyncQueueWriterContextTask task = AsyncQueueWriterContextTask.poll();
+ task.setAsyncQueueWriter(asyncQueueWriter);
+ try {
+ context.execute(task);
+ } catch (PipelineFullException ex) {
+ throw new IOException(ex.getMessage());
+ }
+ }
+
+
+ /**
* Return an instance of the default ConnectorHandler
,
* which is the TCPConnectorHandler
* @return ConnectorHandler
@@ -654,6 +691,13 @@
this.selector = selector;
}
+ /**
+ * {@inheritDoc}
+ */
+ public AsyncQueueWriter getAsyncQueueWriter() {
+ return asyncQueueWriter;
+ }
+
public long getSelectTimeout() {
return selectTimeout;
}
@@ -855,8 +899,19 @@
} catch (IOException ex){
; // LOG ME
}
+
+ asyncQueueWriter.onClose(channel);
}
+ private Context pollContext(final Context serverContext,
+ final SelectionKey key) {
+ final Context context = serverContext.getController().pollContext(key);
+ context.setSelectionKey(key);
+ context.setSelectorHandler(this);
+ context.setAsyncQueueWriter(asyncQueueWriter);
+ return context;
+ }
+
//--------------- ConnectorInstanceHandler -----------------------------
/**
* Return factory object, which knows how
Index: UDPConnectorHandler.java
--- UDPConnectorHandler.java Base (BASE)
+++ UDPConnectorHandler.java Locally Modified (Based On LOCAL)
@@ -23,6 +23,8 @@
package com.sun.grizzly;
+import com.sun.grizzly.async.AsyncWriteCallbackHandler;
+import com.sun.grizzly.async.AsyncQueueWritable;
import com.sun.grizzly.util.ByteBufferInputStream;
import java.io.IOException;
import java.net.SocketAddress;
@@ -47,7 +49,8 @@
*
* @author Jeanfrancois Arcand
*/
-public class UDPConnectorHandler implements ConnectorHandler{
+public class UDPConnectorHandler implements
+ ConnectorHandler, AsyncQueueWritable {
/**
* The underlying UDPSelectorHandler used to mange SelectionKeys.
@@ -339,6 +342,72 @@
/**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()), buffer);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()), buffer,
+ callbackHandler);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler,
+ boolean isCloneByteBuffer) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()), buffer,
+ callbackHandler, isCloneByteBuffer);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer)
+ throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress,
+ buffer);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler) throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress,
+ buffer, callbackHandler);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer,
+ AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer)
+ throws IOException {
+ return selectorHandler.getAsyncQueueWriter().write(
+ datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress,
+ buffer, callbackHandler, isCloneByteBuffer);
+ }
+
+
+ /**
* Receive bytes.
* @param byteBuffer The byteBuffer to store bytes.
* @param socketAddress
@@ -469,5 +538,4 @@
public UDPSelectorHandler getSelectorHandler() {
return selectorHandler;
}
-
}
Index: UDPSelectorHandler.java
--- UDPSelectorHandler.java Base (BASE)
+++ UDPSelectorHandler.java Locally Modified (Based On LOCAL)
@@ -23,6 +23,7 @@
package com.sun.grizzly;
+import com.sun.grizzly.async.UDPAsyncQueueWriter;
import com.sun.grizzly.util.Copyable;
import com.sun.grizzly.util.SelectionKeyOP;
import java.io.IOException;
@@ -92,6 +93,10 @@
public void preSelect(Context ctx) throws IOException {
initOpRegistriesIfRequired();
+ if (asyncQueueWriter == null) {
+ asyncQueueWriter = new UDPAsyncQueueWriter(this);
+ }
+
if (selector == null){
try{
connectorInstanceHandler = new ConnectorInstanceHandler.
@@ -174,7 +179,12 @@
Controller.logger().log(Level.SEVERE,
"closeSocketException",ex);
}
+
+ if (asyncQueueWriter != null) {
+ asyncQueueWriter.close();
+ asyncQueueWriter = null;
}
+ }
/**
@@ -258,6 +268,8 @@
} catch (IOException ex){
; // LOG ME
}
+
+ asyncQueueWriter.onClose(channel);
}
//--------------- ConnectorInstanceHandler -----------------------------