# This patch file was generated by NetBeans IDE # Following Index: paths are relative to: C:\Projects\Grizzly\trunk\modules\grizzly\src\main\java\com\sun\grizzly # This patch can be applied using context Tools: Patch action on respective folder. # It uses platform neutral UTF-8 encoding and \n newlines. # Above lines and this line are ignored by the patching process. Index: async/AsyncQueueWritable.java --- async/AsyncQueueWritable.java Locally New +++ async/AsyncQueueWritable.java Locally New @@ -0,0 +1,183 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + + +package com.sun.grizzly.async; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +/** + * Object, which is able to send ByteBuffer data asynchronously, + * using queue. + * + * @author Alexey Stashok + */ +public interface AsyncQueueWritable { + /** + * Method writes ByteBuffer using async write queue. + * First, if write queue is empty - it tries to write ByteBuffer + * directly (without putting to the queue). + * If associated write queue is not empty or after direct writing + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise it will be just logged by + * Grizzly framework. + * + * @param buffer ByteBuffer + * @return true, if ByteBuffer was written completely, false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean writeToAsyncQueue(ByteBuffer buffer) throws IOException; + + /** + * Method writes ByteBuffer using async write queue. + * First, if write queue is empty - it tries to write ByteBuffer + * directly (without putting to the queue). + * If associated write queue is not empty or after direct writing + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise, if the ByteBuffer is + * added to a writing queue - exception notification will come via + * AsyncWriteCallbackHandler + * + * @param buffer ByteBuffer + * @param callbackHandler AsyncWriteCallbackHandler, + * which will get notified, when + * ByteBuffer will be completely written + * @return true, if ByteBuffer was written completely, + * false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean writeToAsyncQueue(ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException; + + /** + * Method writes ByteBuffer using async write queue. + * First, if write queue is empty - it tries to write ByteBuffer + * directly (without putting to the queue). + * If associated write queue is not empty or after direct writing + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise, if the ByteBuffer is + * added to a writing queue - exception notification will come via + * AsyncWriteCallbackHandler + * + * @param buffer ByteBuffer + * @param callbackHandler AsyncWriteCallbackHandler, + * which will get notified, when + * ByteBuffer will be completely written + * @param isCloneByteBuffer if true - AsyncQueueWriter + * will clone given + * ByteBuffer before puting it to the + * AsyncWriteQueue + * @return true, if ByteBuffer was written completely, + * false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean writeToAsyncQueue(ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, + boolean isCloneByteBuffer) throws IOException; + + /** + * Method sends ByteBuffer using async write queue. + * First, if write queue is empty - it tries to send ByteBuffer + * to the given SocketAddress directly + * (without putting to the queue). + * If associated write queue is not empty or after direct sending + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise it will be just logged by + * Grizzly framework. + * + * @param dstAddress destination SocketAddress data will + * be sent to + * @param buffer ByteBuffer + * @return true, if ByteBuffer was written completely, false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer) + throws IOException; + + /** + * Method sends ByteBuffer using async write queue. + * First, if write queue is empty - it tries to send ByteBuffer + * to the given SocketAddress directly + * (without putting to the queue). + * If associated write queue is not empty or after direct sending + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise, if the ByteBuffer is + * added to a writing queue - exception notification will come via + * AsyncWriteCallbackHandler + * + * @param dstAddress destination SocketAddress data will + * be sent to + * @param buffer ByteBuffer + * @param callbackHandler AsyncWriteCallbackHandler, + * which will get notified, when + * ByteBuffer will be completely written + * @return true, if ByteBuffer was written completely, + * false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException; + + /** + * Method sends ByteBuffer using async write queue. + * First, if write queue is empty - it tries to send ByteBuffer + * to the given SocketAddress directly + * (without putting to the queue). + * If associated write queue is not empty or after direct sending + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise, if the ByteBuffer is + * added to a writing queue - exception notification will come via + * AsyncWriteCallbackHandler + * + * @param dstAddress destination SocketAddress data will + * be sent to + * @param buffer ByteBuffer + * @param callbackHandler AsyncWriteCallbackHandler, + * which will get notified, when + * ByteBuffer will be completely written + * @param isCloneByteBuffer if true - AsyncQueueWriter + * will clone given + * ByteBuffer before puting it to the + * AsyncWriteQueue + * @return true, if ByteBuffer was written completely, + * false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, + boolean isCloneByteBuffer) throws IOException; +} Index: async/AsyncQueueWriter.java --- async/AsyncQueueWriter.java Locally New +++ async/AsyncQueueWriter.java Locally New @@ -0,0 +1,247 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + +package com.sun.grizzly.async; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; + +/** + * Common inteface to be implemented by protocol dependant asynchronous queue + * writers implementations + * + * @author Alexey Stashok + */ +public interface AsyncQueueWriter { + /** + * Method writes ByteBuffer to the SelectableChannel + * First, if SelectableChannel associated write queue is empty - + * it tries to write ByteBuffer to the given + * SelectableChannel directly (without putting to the queue). + * If associated write queue is not empty or after direct writing + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue + * and SelectableChannel will be registered on + * SelectorHandler, waiting for OP_WRITE event. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise it will be just logged by + * Grizzly framework. + * + * @param key SelectionKey associated with + * SelectableChannel ByteBuffer + * should be written to + * @param buffer ByteBuffer + * @return true, if ByteBuffer was written completely, false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean write(SelectionKey key, ByteBuffer buffer) throws IOException; + + /** + * Method writes ByteBuffer to the SelectableChannel + * First, if SelectableChannel associated write queue is empty - + * it tries to write ByteBuffer to the given + * SelectableChannel directly (without putting to the queue). + * If associated write queue is not empty or after direct writing + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue + * and SelectableChannel will be registered on + * SelectorHandler, waiting for OP_WRITE event. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise, if the ByteBuffer is + * added to a writing queue - exception notification will come via + * AsyncWriteCallbackHandler + * + * @param key SelectionKey associated with + * SelectableChannel ByteBuffer + * should be written to + * @param buffer ByteBuffer + * @param callbackHandler AsyncWriteCallbackHandler, + * which will get notified, when + * ByteBuffer will be completely written + * @return true, if ByteBuffer was written completely, + * false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean write(SelectionKey key, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException; + + /** + * Method writes ByteBuffer to the SelectableChannel + * First, if SelectableChannel associated write queue is empty - + * it tries to write ByteBuffer to the given + * SelectableChannel directly (without putting to the queue). + * If associated write queue is not empty or after direct writing + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue + * and SelectableChannel will be registered on + * SelectorHandler, waiting for OP_WRITE event. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise, if the ByteBuffer is + * added to a writing queue - exception notification will come via + * AsyncWriteCallbackHandler + * + * @param key SelectionKey associated with + * SelectableChannel ByteBuffer + * should be written to + * @param buffer ByteBuffer + * @param callbackHandler AsyncWriteCallbackHandler, + * which will get notified, when + * ByteBuffer will be completely written + * @param isCloneByteBuffer if true - AsyncQueueWriter will + * clone given + * ByteBuffer before puting it to the + * AsyncWriteQueue + * @return true, if ByteBuffer was written completely, + * false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean write(SelectionKey key, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) + throws IOException; + + /** + * Method sends ByteBuffer to the SocketAddress + * First, if SelectableChannel associated write queue is empty - + * it tries to write ByteBuffer to the given + * SocketAddress directly (without putting to the queue). + * If associated write queue is not empty or after direct writing + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue + * and SelectableChannel will be registered on + * SelectorHandler, waiting for OP_WRITE event. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise it will be just logged by + * Grizzly framework. + * + * @param key SelectionKey associated with + * SelectableChannel, which will be used to + * sendByteBuffer to + * @param dstAddress destination address ByteBuffer will be sent to + * @param buffer ByteBuffer + * @return true, if ByteBuffer was written completely, false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean write(SelectionKey key, SocketAddress dstAddress, + ByteBuffer buffer) throws IOException; + + /** + * Method sends ByteBuffer to the SocketAddress + * First, if SelectableChannel associated write queue is empty - + * it tries to write ByteBuffer to the given + * SocketAddress directly (without putting to the queue). + * If associated write queue is not empty or after direct writing + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue + * and SelectableChannel will be registered on + * SelectorHandler, waiting for OP_WRITE event. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise, if the ByteBuffer is + * added to a writing queue - exception notification will come via + * AsyncWriteCallbackHandler + * + * @param key SelectionKey associated with + * SelectableChannel ByteBuffer + * should be written to + * @param dstAddress destination address ByteBuffer will be sent to + * @param buffer ByteBuffer + * @param callbackHandler AsyncWriteCallbackHandler, + * which will get notified, when + * ByteBuffer will be completely written + * @return true, if ByteBuffer was written completely, + * false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean write(SelectionKey key, SocketAddress dstAddress, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) + throws IOException; + + /** + * Method sends ByteBuffer to the SocketAddress + * First, if SelectableChannel associated write queue is empty - + * it tries to write ByteBuffer to the given + * SocketAddress directly (without putting to the queue). + * If associated write queue is not empty or after direct writing + * ByteBuffer still has ready data to be written - + * ByteBuffer will be added to AsyncWriteQueue + * and SelectableChannel will be registered on + * SelectorHandler, waiting for OP_WRITE event. + * If an exception occurs, during direct writing - it will be propogated + * to the caller directly, otherwise, if the ByteBuffer is + * added to a writing queue - exception notification will come via + * AsyncWriteCallbackHandler + * + * @param key SelectionKey associated with + * SelectableChannel ByteBuffer + * should be written to + * @param dstAddress destination address ByteBuffer will be sent to + * @param buffer ByteBuffer + * @param callbackHandler AsyncWriteCallbackHandler, + * which will get notified, when + * ByteBuffer will be completely written + * @param isCloneByteBuffer if true - AsyncQueueWriter will + * clone given + * ByteBuffer before puting it to the + * AsyncWriteQueue + * @return true, if ByteBuffer was written completely, + * false if write operation was put to queue + * @throws java.io.IOException + */ + public boolean write(SelectionKey key, SocketAddress dstAddress, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, + boolean isCloneByteBuffer) throws IOException; + + /** + * Checks whether there is any data in AsyncWriteQueue ready + * for the given SelectableChannel + * + * @param key SelectionKey associated with SelectableChannel + * @return true, if there is ready data. False otherwise. + */ + public boolean hasReadyAsyncWriteData(SelectionKey key); + + /** + * Callback method, which should be called by SelectorHandler to + * notify, that given SelectableChannel is ready to transmit data. + * + * @param key SelectionKey associated with SelectableChannel + * @throws java.io.IOException + */ + public void onWrite(SelectionKey key) throws IOException; + + /** + * Callback method, which should be called by SelectorHandler to + * notify, that given SelectableChannel is going to be closed, so + * related SelectableChannel data could be released from + * AsyncWriteQueue + * + * @param SelectableChannel + * @throws java.io.IOException + */ + public void onClose(SelectableChannel channel); + + public void close(); + +} Index: async/AsyncQueueWriterContextTask.java --- async/AsyncQueueWriterContextTask.java Locally New +++ async/AsyncQueueWriterContextTask.java Locally New @@ -0,0 +1,78 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + +package com.sun.grizzly.async; + +import com.sun.grizzly.*; + +/** + * AsyncQueueWriter task, which will be executed by + * Context, when Context.execute(ContextTask) + * is called. + * + * @author Alexey Stashok + */ +public class AsyncQueueWriterContextTask extends ContextTask { + private static final TaskPool taskPool = + new TaskPool() { + @Override + public AsyncQueueWriterContextTask newTask() { + return new AsyncQueueWriterContextTask(); + } + }; + + private AsyncQueueWriter asyncQueueWriter; + + public static AsyncQueueWriterContextTask poll() { + return taskPool.poll(); + } + + public static void offer(AsyncQueueWriterContextTask contextTask) { + contextTask.recycle(); + taskPool.offer(contextTask); + } + + public Object call() throws Exception { + try { + asyncQueueWriter.onWrite(context.getSelectionKey()); + } finally { + offer(this); + } + + return null; + } + + public AsyncQueueWriter getAsyncQueueWriter() { + return asyncQueueWriter; + } + + public void setAsyncQueueWriter(AsyncQueueWriter asyncWriter) { + this.asyncQueueWriter = asyncWriter; + } + + @Override + public void recycle() { + asyncQueueWriter = null; + super.recycle(); + } +} Index: async/AsyncWriteCallbackHandler.java --- async/AsyncWriteCallbackHandler.java Locally New +++ async/AsyncWriteCallbackHandler.java Locally New @@ -0,0 +1,38 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + +package com.sun.grizzly.async; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; + +/** + * Callback handler interface, used by AsyncQueueWriter to notify + * custom code about completion of specific ByteBuffer writing. + * + * @author Alexey Stashok + */ +public interface AsyncWriteCallbackHandler { + public void onWriteCompleted(SelectionKey key, ByteBuffer buffer); + public void onIOException(SelectionKey key, ByteBuffer buffer); +} Index: async/AsyncWriteQueue.java --- async/AsyncWriteQueue.java Locally New +++ async/AsyncWriteQueue.java Locally New @@ -0,0 +1,144 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + +package com.sun.grizzly.async; + +import java.nio.channels.SelectableChannel; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Class represents collection of + * SelectableChannels and correspondent queue of + * ByteBuffer, which should be written asynchronously. + * This implementation is TCP protocol specific. + * + * @author Alexey Stashok + */ +public class AsyncWriteQueue{ + private Map queueMap = + new ConcurrentHashMap(); + + /** + * Add data to the AsyncWriteQueue, corresponding to the given + * SelectableChannel + * + * @param channel SelectableChannel + * @param queueRecord write data unit + */ + public void offer(SelectableChannel channel, E queueRecord) { + ChannelAsyncWriteEntry entry = obtainChannelAsyncWriteEntry(channel); + entry.queue.offer(queueRecord); + } + + /** + * Get head element of SelectableChannel write queue. + * Element will not be removed from queue. + * + * @param channel SelectableChannel + * + * @return AsyncQueueRecord write data unit + */ + public E peek(SelectableChannel channel) { + ChannelAsyncWriteEntry entry = queueMap.get(channel); + if (entry != null) { + return entry.queue.peek(); + } + + return null; + } + + /** + * Get head element of SelectableChannel write queue. + * Element will be removed from queue. + * + * @param channel SelectableChannel + * + * @return AsyncQueueRecord write data unit + */ + public E poll(SelectableChannel channel) { + ChannelAsyncWriteEntry entry = queueMap.get(channel); + if (entry != null) { + return entry.queue.poll(); + } + + return null; + } + + /** + * Remove head element of SelectableChannel write queue. + * + * @param channel SelectableChannel + */ + public void removeEntry(SelectableChannel channel) { + queueMap.remove(channel); + } + + /** + * Get the size of SelectableChannel write queue. + * + * @param channel SelectableChannel + * @return size of SelectableChannel write queue. + */ + public int size(SelectableChannel channel) { + ChannelAsyncWriteEntry entry = queueMap.get(channel); + return entry == null ? 0 : entry.queue.size(); + } + + public void clear() { + queueMap.clear(); + } + + protected ChannelAsyncWriteEntry obtainChannelAsyncWriteEntry(SelectableChannel channel) { + ChannelAsyncWriteEntry entry = queueMap.get(channel); + if (entry == null) { + synchronized(channel) { + entry = queueMap.get(channel); + if (entry == null) { + entry = new ChannelAsyncWriteEntry(); + queueMap.put(channel, entry); + } + } + } + return entry; + } + + /** + * AsyncWriteQueue data unit + */ + protected class ChannelAsyncWriteEntry { + public ConcurrentLinkedQueue queue; + public ReentrantLock writeLock; + public ReentrantLock onWriteLock; + public ReentrantLock updateLock; + + public ChannelAsyncWriteEntry() { + queue = new ConcurrentLinkedQueue(); + writeLock = new ReentrantLock(); + onWriteLock = new ReentrantLock(); + updateLock = new ReentrantLock(); + } + } +} \ No newline at end of file Index: async/TCPAsyncQueueWriter.java --- async/TCPAsyncQueueWriter.java Locally New +++ async/TCPAsyncQueueWriter.java Locally New @@ -0,0 +1,255 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + +package com.sun.grizzly.async; + +import com.sun.grizzly.*; +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * TCP implementation of AsyncQueueWriter + * + * @author Alexey Stashok + */ +public class TCPAsyncQueueWriter implements AsyncQueueWriter { + private SelectorHandler selectorHandler; + private AsyncWriteQueue writeQueue; + + public TCPAsyncQueueWriter(SelectorHandler selectorHandler) { + this.selectorHandler = selectorHandler; + writeQueue = new AsyncWriteQueue(); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, ByteBuffer buffer) throws IOException { + return write(key, buffer, null); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException { + return write(key, buffer, callbackHandler, false); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) throws IOException { + + SelectableChannel channel = key.channel(); + // If AsyncWriteQueue is empty - try to write ByteBuffer here + ChannelAsyncWriteEntry channelEntry = + writeQueue.obtainChannelAsyncWriteEntry(channel); + + ConcurrentLinkedQueue queue = channelEntry.queue; + + boolean hasWriteLock = false; + + try { + if (queue.isEmpty() && + (hasWriteLock = channelEntry.writeLock.tryLock())) { + ((WritableByteChannel) channel).write(buffer); + } + + if (buffer.hasRemaining()) { + // clone ByteBuffer if required + if (isCloneByteBuffer) { + int size = buffer.remaining(); + ByteBuffer newBuffer = buffer.isDirect() ? + ByteBuffer.allocateDirect(size) : + ByteBuffer.allocate(size); + + newBuffer.put(buffer); + newBuffer.position(0); + buffer = newBuffer; + } + + // add new element to the queue + channelEntry.updateLock.lock(); + queue.offer(new TCPAsyncQueueRecord(buffer, + callbackHandler)); + + /* + * This check helps to avoid not required key registering + */ + if (!channelEntry.onWriteLock.isLocked()) { + registerForWriting(key); + } + + channelEntry.updateLock.unlock(); + + return false; + } + } finally { + if (hasWriteLock) { + channelEntry.writeLock.unlock(); + } + } + + return true; + } + + /** + * {@inheritDoc} + */ + public boolean hasReadyAsyncWriteData(SelectionKey key) { + return writeQueue.size(key.channel()) > 0; + } + + /** + * {@inheritDoc} + */ + public void onWrite(SelectionKey key) throws IOException { + SelectableChannel channel = key.channel(); + + ChannelAsyncWriteEntry channelEntry = + writeQueue.obtainChannelAsyncWriteEntry(channel); + + boolean hasUpdateLock = false; + boolean hasOnWriteLock = false; + + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) { + try { + ConcurrentLinkedQueue queue = channelEntry.queue; + + while (queue.size() > 0) { + if (hasUpdateLock) { + channelEntry.updateLock.unlock(); + hasUpdateLock = false; + } + + TCPAsyncQueueRecord queueRecord = queue.peek(); + + ByteBuffer byteBuffer = queueRecord.byteBuffer; + try { + ((WritableByteChannel) channel).write(byteBuffer); + } catch (IOException e) { + if (queueRecord.callbackHandler != null) { + queueRecord.callbackHandler.onIOException(key, + byteBuffer); + } + } + + if (!byteBuffer.hasRemaining()) { + TCPAsyncQueueRecord removedQueueRecord = + queue.poll(); + assert removedQueueRecord == queueRecord; + + if (removedQueueRecord.callbackHandler != null) { + removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer); + } + } else { + hasOnWriteLock = false; + channelEntry.onWriteLock.unlock(); + registerForWriting(key); + break; + } + + if (queue.size() == 0) { + /* + * If queue is empty - there is possibility that write method + * will add entry, but it will not be processed, because + * onWriteLock is currently locked + */ + channelEntry.updateLock.lock(); + hasUpdateLock = true; + } + } + } finally { + if (hasOnWriteLock) { + channelEntry.onWriteLock.unlock(); + } + + if (hasUpdateLock) { + channelEntry.updateLock.unlock(); + } + } + } + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, SocketAddress dstAddress, ByteBuffer buffer) throws IOException { + throw new UnsupportedOperationException("Not supported for TCP transport."); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws IOException { + throw new UnsupportedOperationException("Not supported for TCP transport."); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) throws IOException { + throw new UnsupportedOperationException("Not supported for TCP transport."); + } + + /** + * {@inheritDoc} + */ + public void onClose(SelectableChannel channel) { + writeQueue.removeEntry(channel); + } + + /** + * {@inheritDoc} + */ + public void close() { + writeQueue.clear(); + writeQueue = null; + } + + private void registerForWriting(SelectionKey key) { + selectorHandler.register(key, SelectionKey.OP_WRITE); + } + + /** + * AsyncWriteQueue data unit specific for TCP protocol + */ + protected static class TCPAsyncQueueRecord { + public ByteBuffer byteBuffer; + public AsyncWriteCallbackHandler callbackHandler; + + public TCPAsyncQueueRecord(ByteBuffer byteBuffer, + AsyncWriteCallbackHandler callbackHandler) { + this.byteBuffer = byteBuffer; + this.callbackHandler = callbackHandler; + } + } +} Index: async/UDPAsyncQueueWriter.java --- async/UDPAsyncQueueWriter.java Locally New +++ async/UDPAsyncQueueWriter.java Locally New @@ -0,0 +1,271 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + +package com.sun.grizzly.async; + +import com.sun.grizzly.*; +import com.sun.grizzly.async.AsyncWriteQueue.ChannelAsyncWriteEntry; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.net.SocketAddress; +import java.nio.channels.DatagramChannel; + +/** + * UDP implementation of AsyncQueueWriter + * + * @author Alexey Stashok + */ +public class UDPAsyncQueueWriter implements AsyncQueueWriter { + private SelectorHandler selectorHandler; + private AsyncWriteQueue writeQueue; + + public UDPAsyncQueueWriter(SelectorHandler selectorHandler) { + this.selectorHandler = selectorHandler; + writeQueue = new AsyncWriteQueue(); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, ByteBuffer buffer) throws IOException { + return write(key, null, buffer, null); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException { + return write(key, null, buffer, callbackHandler, false); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) throws IOException { + return write(key, null, buffer, callbackHandler, isCloneByteBuffer); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, SocketAddress dstAddress, + ByteBuffer buffer) throws IOException { + return write(key, dstAddress, buffer, null); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, SocketAddress dstAddress, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) + throws IOException { + return write(key, dstAddress, buffer, callbackHandler, false); + } + + /** + * {@inheritDoc} + */ + public boolean write(SelectionKey key, SocketAddress dstAddress, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, + boolean isCloneByteBuffer) throws IOException { + + SelectableChannel channel = key.channel(); + // If AsyncWriteQueue is empty - try to write ByteBuffer here + ChannelAsyncWriteEntry channelEntry = + writeQueue.obtainChannelAsyncWriteEntry(channel); + + ConcurrentLinkedQueue queue = channelEntry.queue; + + boolean hasWriteLock = false; + + try { + if (queue.isEmpty() && + (hasWriteLock = channelEntry.writeLock.tryLock())) { + doWrite((WritableByteChannel) channel, dstAddress, buffer); + } + + if (buffer.hasRemaining()) { + // clone ByteBuffer if required + if (isCloneByteBuffer) { + int size = buffer.remaining(); + ByteBuffer newBuffer = buffer.isDirect() ? + ByteBuffer.allocateDirect(size) : + ByteBuffer.allocate(size); + + newBuffer.put(buffer); + newBuffer.position(0); + buffer = newBuffer; + } + + // add new element to the queue + channelEntry.updateLock.lock(); + queue.offer(new UDPAsyncQueueRecord(dstAddress, buffer, + callbackHandler)); + + /* + * This check helps to avoid not required key registering + */ + if (!channelEntry.onWriteLock.isLocked()) { + registerForWriting(key); + } + + channelEntry.updateLock.unlock(); + + return false; + } + } finally { + if (hasWriteLock) { + channelEntry.writeLock.unlock(); + } + } + + return true; + } + + /** + * {@inheritDoc} + */ + public boolean hasReadyAsyncWriteData(SelectionKey key) { + return writeQueue.size(key.channel()) > 0; + } + + /** + * {@inheritDoc} + */ + public void onWrite(SelectionKey key) throws IOException { + SelectableChannel channel = key.channel(); + + ChannelAsyncWriteEntry channelEntry = + writeQueue.obtainChannelAsyncWriteEntry(channel); + + boolean hasUpdateLock = false; + boolean hasOnWriteLock = false; + + if ((hasOnWriteLock = channelEntry.onWriteLock.tryLock())) { + try { + ConcurrentLinkedQueue queue = + channelEntry.queue; + + while (queue.size() > 0) { + if (hasUpdateLock) { + channelEntry.updateLock.unlock(); + hasUpdateLock = false; + } + + UDPAsyncQueueRecord queueRecord = queue.peek(); + + ByteBuffer byteBuffer = queueRecord.byteBuffer; + try { + doWrite((WritableByteChannel) channel, + queueRecord.dstAddress, byteBuffer); + } catch (IOException e) { + if (queueRecord.callbackHandler != null) { + queueRecord.callbackHandler.onIOException(key, + byteBuffer); + } + } + + if (!byteBuffer.hasRemaining()) { + UDPAsyncQueueRecord removedQueueRecord = queue.poll(); + assert removedQueueRecord == queueRecord; + + if (removedQueueRecord.callbackHandler != null) { + removedQueueRecord.callbackHandler.onWriteCompleted(key, byteBuffer); + } + } else { + hasOnWriteLock = false; + channelEntry.onWriteLock.unlock(); + registerForWriting(key); + break; + } + + if (queue.size() == 0) { + /* + * If queue is empty - there is possibility that write method + * will add entry, but it will not be processed, because + * onWriteLock is currently locked + */ + channelEntry.updateLock.lock(); + hasUpdateLock = true; + } + } + } finally { + if (hasOnWriteLock) { + channelEntry.onWriteLock.unlock(); + } + + if (hasUpdateLock) { + channelEntry.updateLock.unlock(); + } + } + } + } + + /** + * {@inheritDoc} + */ + public void onClose(SelectableChannel channel) { + writeQueue.removeEntry(channel); + } + + /** + * {@inheritDoc} + */ + public void close() { + writeQueue.clear(); + writeQueue = null; + } + + private void doWrite(WritableByteChannel channel, SocketAddress dstAddress, + ByteBuffer byteBuffer) throws IOException { + if (dstAddress != null) { + ((DatagramChannel) channel).send(byteBuffer, dstAddress); + } else { + channel.write(byteBuffer); + } + } + + private void registerForWriting(SelectionKey key) { + selectorHandler.register(key, SelectionKey.OP_WRITE); + } + + /** + * AsyncWriteQueue data unit specific for TCP protocol + */ + protected static class UDPAsyncQueueRecord extends + TCPAsyncQueueWriter.TCPAsyncQueueRecord { + public SocketAddress dstAddress; + + public UDPAsyncQueueRecord(SocketAddress dstAddress, ByteBuffer byteBuffer, + AsyncWriteCallbackHandler callbackHandler) { + super(byteBuffer, callbackHandler); + this.dstAddress = dstAddress; + } + } +} Index: CallbackHandlerContextTask.java --- CallbackHandlerContextTask.java Locally New +++ CallbackHandlerContextTask.java Locally New @@ -0,0 +1,98 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + +package com.sun.grizzly; + +import com.sun.grizzly.Context.OpType; +import com.sun.grizzly.ContextTask.TaskPool; + +/** + * CallbackHandler task, which will be executed by + * Context, when Context.execute(ContextTask) + * is called. + * + * @author Alexey Stashok + */ +public class CallbackHandlerContextTask extends ContextTask { + private static final TaskPool taskPool = + new TaskPool() { + @Override + public CallbackHandlerContextTask newTask() { + return new CallbackHandlerContextTask(); + } + }; + + private CallbackHandler callBackHandler; + + public static CallbackHandlerContextTask poll() { + return taskPool.poll(); + } + + public static void offer(CallbackHandlerContextTask contextTask) { + contextTask.recycle(); + taskPool.offer(contextTask); + } + + public Object call() throws Exception { + IOEvent ioEvent = context.getIOEvent(); + OpType currentOpType = context.getCurrentOpType(); + + try { + if (currentOpType == OpType.OP_READ) { + callBackHandler.onRead(ioEvent); + } else if (currentOpType == OpType.OP_WRITE) { + callBackHandler.onWrite(ioEvent); + } else if (currentOpType == OpType.OP_CONNECT) { + callBackHandler.onConnect(ioEvent); + } + } finally { + if (ioEvent != null) { + // Prevent the CallbackHandler to re-use the context. + // TODO: This is still dangerous as the Context might have been + // cached by the CallbackHandler. + ioEvent.attach(null); + ioEvent = null; + } + + offer(this); + } + + return null; + } + + public CallbackHandler getCallBackHandler() { + return callBackHandler; + } + + public void setCallBackHandler(CallbackHandler callBackHandler) { + this.callBackHandler = callBackHandler; + } + + @Override + public void recycle() { + callBackHandler = null; + super.recycle(); + } + + +} Index: Context.java --- Context.java Base (BASE) +++ Context.java Locally Modified (Based On LOCAL) @@ -23,9 +23,14 @@ package com.sun.grizzly; +import com.sun.grizzly.async.AsyncQueueWritable; +import com.sun.grizzly.async.AsyncQueueWriter; +import com.sun.grizzly.async.AsyncWriteCallbackHandler; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.HashMap; -import java.util.concurrent.Callable; /** * This Object is used to share information between the Grizzly Framework @@ -33,7 +38,7 @@ * * @author Jeanfrancois Arcand */ -public class Context implements Callable { +public class Context { /** * A SelectionKey's registration state. @@ -119,6 +124,16 @@ /** + * AsyncQueueWriter + */ + private AsyncQueueWriter asyncQueueWriter; + + /** + * AsyncQueueWritable + */ + private AsyncQueueWritable asyncQueueWritable; + + /** * Constructor */ public Context() { @@ -210,6 +225,7 @@ currentOpType = null; protocolChain = null; ioEvent = null; + asyncQueueWriter = null; } @@ -233,47 +249,6 @@ /** - * Execute the ProtocolChain. - * @throws java.lang.Exception Exception thrown by protocol chain - */ - public Object call() throws Exception { - // If a IOEvent has been defined, invoke it first and - // let its associated CallbackHandler decide if the ProtocolChain - // be invoked or not. - Object attachment = key.attachment(); - if (ioEvent != null && (attachment instanceof CallbackHandler)){ - try{ - CallbackHandler callBackHandler = ((CallbackHandler)attachment); - if (currentOpType == OpType.OP_READ){ - callBackHandler.onRead(ioEvent); - } else if (currentOpType == OpType.OP_WRITE){ - callBackHandler.onWrite(ioEvent); - } else if (currentOpType == OpType.OP_CONNECT){ - callBackHandler.onConnect(ioEvent); - } - } finally { - if (ioEvent != null){ - // Prevent the CallbackHandler to re-use the context. - // TODO: This is still dangerous as the Context might have been - // cached by the CallbackHandler. - ioEvent.attach(null); - ioEvent = null; - } - } - } else { - SelectionKey currentKey = key; - selectorHandler.getSelectionKeyHandler().process(currentKey); - try { - protocolChain.execute(this); - } finally { - selectorHandler.getSelectionKeyHandler().postProcess(currentKey); - } - } - return null; - } - - - /** * Return ProtocolChain executed by this instance. * @return ProtocolChain instance */ @@ -313,9 +288,12 @@ * Execute this Context using the Controller's Pipeline * @throws com.sun.grizzly.PipelineFullException */ - public void execute() throws PipelineFullException{ - getPipeline().execute(this); + public void execute(ContextTask contextTask) throws PipelineFullException { + if (contextTask != null) { + contextTask.setContext(this); + getPipeline().execute(contextTask); } + } /** @@ -392,4 +370,89 @@ public void setSelectorHandler(SelectorHandler selectorHandler) { this.selectorHandler = selectorHandler; } + + + /** + * Returns AsyncQueueWritable, assciated with the current + * Context. This method is not threadsafe. + * + * @return AsyncQueueWritable + */ + public AsyncQueueWritable getAsyncQueueWritable() { + if (asyncQueueWritable == null) { + asyncQueueWritable = new AsyncQueueWritableContextWrapper(); } + + return asyncQueueWritable; + } + + /** + * Return the AsyncQueueWriter + * @return the AsyncQueueWriter + */ + protected AsyncQueueWriter getAsyncQueueWriter() { + return asyncQueueWriter; + } + + /** + * Set the AsyncQueueWriter + * @param asyncQueueWriter AsyncQueueWriter + */ + protected void setAsyncQueueWriter(AsyncQueueWriter asyncQueueWriter) { + this.asyncQueueWriter = asyncQueueWriter; + } + + private class AsyncQueueWritableContextWrapper implements AsyncQueueWritable { + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(ByteBuffer buffer) throws IOException { + return asyncQueueWriter.write(key, buffer); + } + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException { + return asyncQueueWriter.write(key, buffer, callbackHandler); + } + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, + boolean isCloneByteBuffer) throws IOException { + return asyncQueueWriter.write(key, buffer, callbackHandler, + isCloneByteBuffer); + } + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer) + throws IOException { + return asyncQueueWriter.write(key, dstAddress, buffer); + } + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException { + return asyncQueueWriter.write(key, dstAddress, buffer, + callbackHandler); + } + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) + throws IOException { + return asyncQueueWriter.write(key, dstAddress, buffer, + callbackHandler, isCloneByteBuffer); + } + } +} Index: ContextTask.java --- ContextTask.java Locally New +++ ContextTask.java Locally New @@ -0,0 +1,70 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + +package com.sun.grizzly; + +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Task, which will be executed by Context, when + * Context.execute(ContextTask) is called. + * + * @author Alexey Stashok + */ +public abstract class ContextTask implements Callable { + protected Context context; + + public Context getContext() { + return context; + } + + public void setContext(Context context) { + this.context = context; + } + + public void recycle() { + context = null; + } + + protected abstract static class TaskPool { + private ConcurrentLinkedQueue pool = + new ConcurrentLinkedQueue(); + + public abstract E newTask(); + + public E poll() { + E task = pool.poll(); + if (task == null) { + task = newTask(); + } + + return task; + } + + public void offer(E task) { + task.recycle(); + pool.offer(task); + } + } +} Index: Controller.java --- Controller.java Base (BASE) +++ Controller.java Locally Modified (Based On LOCAL) @@ -298,6 +298,7 @@ key = iterator.next(); iterator.remove(); boolean skipOpWrite = false; + delegateToWorkerThread = false; if (key.isValid()) { if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT){ @@ -356,7 +357,7 @@ if (logger.isLoggable(Level.FINE)) { logger.log(Level.FINE, "OP_WRITE on " + key); } - delegateToWorkerThread = selectorHandler. + delegateToWorkerThread |= selectorHandler. onWriteInterest(key,serverCtx); } @@ -368,7 +369,9 @@ Context context = pollContext(key,protocolChain); context.setSelectorHandler(selectorHandler); context.setPipeline(selectorHandler.pipeline()); - context.execute(); + context.setAsyncQueueWriter( + selectorHandler.getAsyncQueueWriter()); + context.execute(ProtocolChainContextTask.poll()); } } else { selectorHandler.getSelectionKeyHandler().cancel(key); Index: filter/EchoAsyncWriteQueueFilter.java --- filter/EchoAsyncWriteQueueFilter.java Locally New +++ filter/EchoAsyncWriteQueueFilter.java Locally New @@ -0,0 +1,81 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2006 Sun Microsystems, Inc. All rights reserved. + */ + +package com.sun.grizzly.filter; + +import com.sun.grizzly.Context; +import com.sun.grizzly.Controller; +import com.sun.grizzly.ProtocolFilter; +import com.sun.grizzly.util.OutputWriter; +import com.sun.grizzly.util.WorkerThread; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; + +/** + * Simple echo filter + * + * @author Alexey Stashok + */ +public class EchoAsyncWriteQueueFilter implements ProtocolFilter { + AtomicInteger counter = new AtomicInteger(1); + public boolean execute(Context ctx) throws IOException { + final WorkerThread workerThread = ((WorkerThread)Thread.currentThread()); + ByteBuffer buffer = workerThread.getByteBuffer(); + buffer.flip(); + if (buffer.hasRemaining()) { + // Depending on protocol perform echo + try { + if (ctx.getProtocol() == Controller.Protocol.TCP) { // TCP case + ctx.getAsyncQueueWritable().writeToAsyncQueue(buffer, null, true); + } else if (ctx.getProtocol() == Controller.Protocol.UDP) { // UDP case + SocketAddress address = (SocketAddress) + ctx.getAttribute(ReadFilter.UDP_SOCKETADDRESS); + ctx.getAsyncQueueWritable().writeToAsyncQueue(address, + buffer, null, true); + } + } catch (IOException ex) { + // Store incoming data in byte[] + byte[] data = new byte[buffer.remaining()]; + int position = buffer.position(); + buffer.get(data); + buffer.position(position); + + Controller.logger().log(Level.WARNING, + "Exception. Echo \"" + new String(data) + "\""); + throw ex; + } + } + + buffer.clear(); + return false; + } + + public boolean postExecute(Context ctx) throws IOException { + return true; + } +} Index: ProtocolChainContextTask.java --- ProtocolChainContextTask.java Locally New +++ ProtocolChainContextTask.java Locally New @@ -0,0 +1,69 @@ +/* + * The contents of this file are subject to the terms + * of the Common Development and Distribution License + * (the License). You may not use this file except in + * compliance with the License. + * + * You can obtain a copy of the license at + * https://glassfish.dev.java.net/public/CDDLv1.0.html or + * glassfish/bootstrap/legal/CDDLv1.0.txt. + * See the License for the specific language governing + * permissions and limitations under the License. + * + * When distributing Covered Code, include this CDDL + * Header Notice in each file and include the License file + * at glassfish/bootstrap/legal/CDDLv1.0.txt. + * If applicable, add the following below the CDDL Header, + * with the fields enclosed by brackets [] replaced by + * you own identifying information: + * "Portions Copyrighted [year] [name of copyright owner]" + * + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + */ + + +package com.sun.grizzly; + +import java.nio.channels.SelectionKey; + +/** + * ProtocolChain task, which will be executed by + * Context, when Context.execute(ContextTask) + * is called. + * + * @author Alexey Stashok + */ +public class ProtocolChainContextTask extends ContextTask { + private static final TaskPool taskPool = + new TaskPool() { + @Override + public ProtocolChainContextTask newTask() { + return new ProtocolChainContextTask(); + } + }; + + public static ProtocolChainContextTask poll() { + return taskPool.poll(); + } + + public static void offer(ProtocolChainContextTask contextTask) { + contextTask.recycle(); + taskPool.offer(contextTask); + } + + public Object call() throws Exception { + SelectionKey currentKey = context.getSelectionKey(); + SelectionKeyHandler selectionKeyHandler = context. + getSelectorHandler().getSelectionKeyHandler(); + + selectionKeyHandler.process(currentKey); + try { + context.getProtocolChain().execute(context); + } finally { + selectionKeyHandler.postProcess(currentKey); + offer(this); + } + + return null; + } +} Index: SelectorHandler.java --- SelectorHandler.java Base (BASE) +++ SelectorHandler.java Locally Modified (Based On LOCAL) @@ -22,6 +22,7 @@ */ package com.sun.grizzly; +import com.sun.grizzly.async.AsyncQueueWriter; import com.sun.grizzly.util.Copyable; import java.io.IOException; import java.nio.channels.SelectableChannel; @@ -203,6 +204,16 @@ /** + * Returns AsyncQueueWriter associated with this + * SelectorHandler. Method will return null, if this + * TCPSelectorHandler is not running. + * + * @return AsyncQuquqWriter + */ + public AsyncQueueWriter getAsyncQueueWriter(); + + + /** * Return the Pipeline used to execute this SelectorHandler's * SelectionKey ops * @return The pipeline to use, or null if the Controller's Pipeline Index: TCPConnectorHandler.java --- TCPConnectorHandler.java Base (BASE) +++ TCPConnectorHandler.java Locally Modified (Based On LOCAL) @@ -23,6 +23,8 @@ package com.sun.grizzly; import com.sun.grizzly.Controller.Protocol; +import com.sun.grizzly.async.AsyncWriteCallbackHandler; +import com.sun.grizzly.async.AsyncQueueWritable; import com.sun.grizzly.util.ByteBufferInputStream; import com.sun.grizzly.util.OutputWriter; import java.io.IOException; @@ -70,7 +72,8 @@ * * @author Jeanfrancois Arcand */ -public class TCPConnectorHandler implements ConnectorHandler{ +public class TCPConnectorHandler implements + ConnectorHandler, AsyncQueueWritable { /** * The underlying TCPSelectorHandler used to mange SelectionKeys. @@ -399,6 +402,72 @@ /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(ByteBuffer buffer) throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + socketChannel.keyFor(selectorHandler.getSelector()), buffer); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + socketChannel.keyFor(selectorHandler.getSelector()), buffer, + callbackHandler); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, + boolean isCloneByteBuffer) throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + socketChannel.keyFor(selectorHandler.getSelector()), buffer, + callbackHandler, isCloneByteBuffer); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer) + throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + socketChannel.keyFor(selectorHandler.getSelector()), dstAddress, + buffer); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + socketChannel.keyFor(selectorHandler.getSelector()), dstAddress, + buffer, callbackHandler); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) + throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + socketChannel.keyFor(selectorHandler.getSelector()), dstAddress, + buffer, callbackHandler, isCloneByteBuffer); + } + + + /** * Close the underlying connection. */ public void close() throws IOException{ Index: TCPSelectorHandler.java --- TCPSelectorHandler.java Base (BASE) +++ TCPSelectorHandler.java Locally Modified (Based On LOCAL) @@ -23,6 +23,9 @@ package com.sun.grizzly; +import com.sun.grizzly.async.AsyncQueueWriter; +import com.sun.grizzly.async.TCPAsyncQueueWriter; +import com.sun.grizzly.async.AsyncQueueWriterContextTask; import com.sun.grizzly.util.Cloner; import com.sun.grizzly.util.Copyable; import com.sun.grizzly.util.SelectionKeyOP; @@ -171,6 +174,7 @@ */ protected ProtocolChainInstanceHandler instanceHandler; + protected AsyncQueueWriter asyncQueueWriter; public TCPSelectorHandler(){ } @@ -234,6 +238,10 @@ public void preSelect(Context ctx) throws IOException { initOpRegistriesIfRequired(); + if (asyncQueueWriter == null) { + asyncQueueWriter = new TCPAsyncQueueWriter(this); + } + if (selector == null){ try{ connectorInstanceHandler = new ConnectorInstanceHandler. @@ -423,7 +431,7 @@ /** * Shuntdown this instance by closing its Selector and associated channels. */ - public void shutdown(){ + public void shutdown() { if (selector != null){ for (SelectionKey selectionKey : selector.keys()) { selectionKeyHandler.close(selectionKey); @@ -454,7 +462,11 @@ "selector.close",ex); } + if (asyncQueueWriter != null) { + asyncQueueWriter.close(); + asyncQueueWriter = null; } + } /** * {@inheritDoc} @@ -494,10 +506,11 @@ throws IOException{ // disable OP_READ on key before doing anything else key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); - if (key.attachment() instanceof CallbackHandler){ - final Context context = ctx.getController().pollContext(key); + Object attach = key.attachment(); + if (attach instanceof CallbackHandler){ + final Context context = pollContext(ctx, key); context.setCurrentOpType(Context.OpType.OP_READ); - invokeCallbackHandler(context); + invokeCallbackHandler((CallbackHandler) attach, context); return false; } else { return true; @@ -515,12 +528,19 @@ throws IOException{ // disable OP_WRITE on key before doing anything else key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - if (key.attachment() instanceof CallbackHandler){ - final Context context = ctx.getController().pollContext(key); - context.setSelectionKey(key); + + Object attach = null; + + if (asyncQueueWriter.hasReadyAsyncWriteData(key)) { + final Context context = pollContext(ctx, key); context.setCurrentOpType(Context.OpType.OP_WRITE); - invokeCallbackHandler(context); + invokeAsyncQueueWriter(context); return false; + } else if ((attach = key.attachment()) instanceof CallbackHandler){ + final Context context = pollContext(ctx, key); + context.setCurrentOpType(Context.OpType.OP_WRITE); + invokeCallbackHandler((CallbackHandler) attach, context); + return false; } else { return true; } @@ -541,11 +561,11 @@ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); - if (key.attachment() instanceof CallbackHandler){ - Context context = ctx.getController().pollContext(key); - context.setSelectionKey(key); + Object attach = key.attachment(); + if (attach instanceof CallbackHandler){ + final Context context = pollContext(ctx, key); context.setCurrentOpType(Context.OpType.OP_CONNECT); - invokeCallbackHandler(context); + invokeCallbackHandler((CallbackHandler) attach, context); } return false; } @@ -556,13 +576,14 @@ * @param context Context * @throws java.io.IOException */ - protected void invokeCallbackHandler(Context context) throws IOException{ - context.setSelectorHandler(this); - + protected void invokeCallbackHandler(CallbackHandler callbackHandler, + Context context) throws IOException{ IOEventioEvent = new IOEvent.DefaultIOEvent(context); context.setIOEvent(ioEvent); try { - context.execute(); + CallbackHandlerContextTask task = CallbackHandlerContextTask.poll(); + task.setCallBackHandler(callbackHandler); + context.execute(task); } catch (PipelineFullException ex){ throw new IOException(ex.getMessage()); } @@ -570,6 +591,22 @@ /** + * Invoke a AsyncQueueWriter + * @param context Context + * @throws java.io.IOException + */ + protected void invokeAsyncQueueWriter(Context context) throws IOException { + AsyncQueueWriterContextTask task = AsyncQueueWriterContextTask.poll(); + task.setAsyncQueueWriter(asyncQueueWriter); + try { + context.execute(task); + } catch (PipelineFullException ex) { + throw new IOException(ex.getMessage()); + } + } + + + /** * Return an instance of the default ConnectorHandler, * which is the TCPConnectorHandler * @return ConnectorHandler @@ -654,6 +691,13 @@ this.selector = selector; } + /** + * {@inheritDoc} + */ + public AsyncQueueWriter getAsyncQueueWriter() { + return asyncQueueWriter; + } + public long getSelectTimeout() { return selectTimeout; } @@ -855,8 +899,19 @@ } catch (IOException ex){ ; // LOG ME } + + asyncQueueWriter.onClose(channel); } + private Context pollContext(final Context serverContext, + final SelectionKey key) { + final Context context = serverContext.getController().pollContext(key); + context.setSelectionKey(key); + context.setSelectorHandler(this); + context.setAsyncQueueWriter(asyncQueueWriter); + return context; + } + //--------------- ConnectorInstanceHandler ----------------------------- /** * Return factory object, which knows how Index: UDPConnectorHandler.java --- UDPConnectorHandler.java Base (BASE) +++ UDPConnectorHandler.java Locally Modified (Based On LOCAL) @@ -23,6 +23,8 @@ package com.sun.grizzly; +import com.sun.grizzly.async.AsyncWriteCallbackHandler; +import com.sun.grizzly.async.AsyncQueueWritable; import com.sun.grizzly.util.ByteBufferInputStream; import java.io.IOException; import java.net.SocketAddress; @@ -47,7 +49,8 @@ * * @author Jeanfrancois Arcand */ -public class UDPConnectorHandler implements ConnectorHandler{ +public class UDPConnectorHandler implements + ConnectorHandler, AsyncQueueWritable { /** * The underlying UDPSelectorHandler used to mange SelectionKeys. @@ -339,6 +342,72 @@ /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(ByteBuffer buffer) throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + datagramChannel.keyFor(selectorHandler.getSelector()), buffer); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + datagramChannel.keyFor(selectorHandler.getSelector()), buffer, + callbackHandler); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, + boolean isCloneByteBuffer) throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + datagramChannel.keyFor(selectorHandler.getSelector()), buffer, + callbackHandler, isCloneByteBuffer); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer) + throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress, + buffer); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress, + buffer, callbackHandler); + } + + + /** + * {@inheritDoc} + */ + public boolean writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) + throws IOException { + return selectorHandler.getAsyncQueueWriter().write( + datagramChannel.keyFor(selectorHandler.getSelector()), dstAddress, + buffer, callbackHandler, isCloneByteBuffer); + } + + + /** * Receive bytes. * @param byteBuffer The byteBuffer to store bytes. * @param socketAddress @@ -469,5 +538,4 @@ public UDPSelectorHandler getSelectorHandler() { return selectorHandler; } - } Index: UDPSelectorHandler.java --- UDPSelectorHandler.java Base (BASE) +++ UDPSelectorHandler.java Locally Modified (Based On LOCAL) @@ -23,6 +23,7 @@ package com.sun.grizzly; +import com.sun.grizzly.async.UDPAsyncQueueWriter; import com.sun.grizzly.util.Copyable; import com.sun.grizzly.util.SelectionKeyOP; import java.io.IOException; @@ -92,6 +93,10 @@ public void preSelect(Context ctx) throws IOException { initOpRegistriesIfRequired(); + if (asyncQueueWriter == null) { + asyncQueueWriter = new UDPAsyncQueueWriter(this); + } + if (selector == null){ try{ connectorInstanceHandler = new ConnectorInstanceHandler. @@ -174,7 +179,12 @@ Controller.logger().log(Level.SEVERE, "closeSocketException",ex); } + + if (asyncQueueWriter != null) { + asyncQueueWriter.close(); + asyncQueueWriter = null; } + } /** @@ -258,6 +268,8 @@ } catch (IOException ex){ ; // LOG ME } + + asyncQueueWriter.onClose(channel); } //--------------- ConnectorInstanceHandler -----------------------------