Hello,
here I would like to describe the initial propose I have for AsyncWriter
implementation.
Think it makes sense to associate AsyncWriter with SelectorHandler, so
we could have good control on events happening with Channel.
There are 3 classes attached.
AsyncWriter: This is the main class user will use to write data in
async. mode. Class has several overloaded write methods (see javadoc for
more details).
AsyncWriteCallbackHandler: interface, which could be implemented by
custom code to be notified, when Buffer will be async. written. Instance
of AsyncWriteCallbackHandler could be passed as one of AsyncWriter.write
parameters.
AsyncWriteQueue: represents Chanel <-> WriteQueue correspondence collection.
Classes are described more accurately in javadocs.
As I mentioned, think AsyncWriter should be integrated to the
SelectorHandler. So the way developer will be able to use its
functionality is:
selectorHandler.getAsyncWriter().write(byteBuffer...).
Will appreciate any feedback! :)
Thanks.
WBR,
Alexey.
/*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the License). You may not use this file except in
* compliance with the License.
*
* You can obtain a copy of the license at
*
https://glassfish.dev.java.net/public/CDDLv1.0.html or
* glassfish/bootstrap/legal/CDDLv1.0.txt.
* See the License for the specific language governing
* permissions and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* Header Notice in each file and include the License file
* at glassfish/bootstrap/legal/CDDLv1.0.txt.
* If applicable, add the following below the CDDL Header,
* with the fields enclosed by brackets [] replaced by
* you own identifying information:
* "Portions Copyrighted [year] [name of copyright owner]"
*
* Copyright 2007 Sun Microsystems, Inc. All rights reserved.
*/
package com.sun.grizzly;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritableByteChannel;
/**
*
* @author Alexey Stashok
*/
public class AsyncWriter {
private SelectorHandler selectorHandler;
private AsyncWriteQueue writeQueue;
public AsyncWriter(SelectorHandler selectorHandler) {
this.selectorHandler = selectorHandler;
writeQueue = new AsyncWriteQueue();
}
/**
* Writes <code>ByteBuffer</code> to the <code>SelectableChannel</code>
* First it tries to write <code>ByteBuffer</code> to the given <code>SelectableChannel</code>.
* If, after write is called, <code>ByteBuffer</code> still has ready data to be written -
* <code>ByteBuffer</code> will be added to <code>AsyncWriteQueue</code>
* and <code>SelectableChannel<code> will be registered on
* <code>SelectorHandler</code>, waiting for OP_WRITE event.
*
* @param channel <code>SelectableChannel</code> <code>ByteBuffer</code> should be written to
* @param buffer <code>ByteBuffer</code>
* @return true, if <code>ByteBuffer</code> was written completely, false if write operation was put to queue
* @throws java.io.IOException
*/
public boolean write(SelectableChannel channel, ByteBuffer buffer) throws IOException {
return write(channel, buffer, null);
}
/**
* Writes <code>ByteBuffer</code> to the <code>SelectableChannel</code>
* First it tries to write <code>ByteBuffer</code> to the given
* <code>SelectableChannel</code>.
* If, after write is called, <code>ByteBuffer</code> still has ready
* data to be written - <code>ByteBuffer</code> will be added to
* <code>AsyncWriteQueue</code> and <code>SelectableChannel<code> will
* be registered on <code>SelectorHandler</code>, waiting for OP_WRITE event.
*
* @param channel <code>SelectableChannel</code> <code>ByteBuffer</code>
* should be written to
* @param buffer <code>ByteBuffer</code>
* @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
* which will get notified, when
* <code>ByteBuffer</code> will be completely written
* @return true, if <code>ByteBuffer</code> was written completely,
* false if write operation was put to queue
* @throws java.io.IOException
*/
public boolean write(SelectableChannel channel, ByteBuffer buffer,
AsyncWriteCallbackHandler callbackHandler) throws IOException {
return write(channel, buffer, callbackHandler, false);
}
/**
* Writes <code>ByteBuffer</code> to the <code>SelectableChannel</code>
* First it tries to write <code>ByteBuffer</code> to the given
* <code>SelectableChannel</code>.
* If, after write is called, <code>ByteBuffer</code> still has ready
* data to be written - <code>ByteBuffer</code> will be added to
* <code>AsyncWriteQueue</code> and <code>SelectableChannel<code> will
* be registered on <code>SelectorHandler</code>, waiting for OP_WRITE event.
*
* @param channel <code>SelectableChannel</code> <code>ByteBuffer</code>
* should be written to
* @param buffer <code>ByteBuffer</code>
* @param callbackHandler <code>AsyncWriteCallbackHandler</code>,
* which will get notified, when
* <code>ByteBuffer</code> will be completely written
* @param isCloneByteBuffer if true - AsyncWriter will clone given
* <code>ByteBuffer</code> before puting it to the
* <code>AsyncWriteQueue</code>
* @return true, if <code>ByteBuffer</code> was written completely,
* false if write operation was put to queue
* @throws java.io.IOException
*/
public boolean write(SelectableChannel channel, ByteBuffer buffer,
AsyncWriteCallbackHandler callbackHandler, boolean isCloneByteBuffer) throws IOException {
// If AsyncWriteQueue is empty - try to write ByteBuffer here
if (!hasReadyAsyncWriteData(channel)) {
((WritableByteChannel) channel).write(buffer);
}
if (buffer.hasRemaining()) {
// clone ByteBuffer if required
if (isCloneByteBuffer) {
int size = buffer.remaining();
ByteBuffer newBuffer = buffer.isDirect() ?
ByteBuffer.allocateDirect(size) :
ByteBuffer.allocate(size);
newBuffer.put(buffer);
newBuffer.position(0);
buffer = newBuffer;
}
// add new element to the queue
writeQueue.offer(channel, new AsyncWriteQueue.AsyncQueueRecord(buffer, callbackHandler));
selectorHandler.register(channel, SelectionKey.OP_WRITE);
return false;
}
return true;
}
/**
* Checks whether there is any data in <code>AsyncWriteQueue</code> ready
* for the given <code>SelectableChannel</code>
*
* @param channel <code>SelectableChannel</code>
* @return true, if there is ready data. False otherwise.
*/
public boolean hasReadyAsyncWriteData(SelectableChannel channel) {
return writeQueue.size(channel) > 0;
}
/**
* Callback method, which should be called by <code>SelectorHandler</code> to
* notify, that given <code>SelectableChannel</code> is ready to transmit data.
*
* @param channel <code>SelectableChannel</code>
* @throws java.io.IOException
*/
public void onWrite(SelectableChannel channel) throws IOException {
AsyncWriteQueue.AsyncQueueRecord queueRecord = writeQueue.peek(channel);
ByteBuffer byteBuffer = queueRecord.byteBuffer;
((WritableByteChannel) channel).write(byteBuffer);
if (byteBuffer.hasRemaining()) {
selectorHandler.register(channel, SelectionKey.OP_WRITE);
} else {
AsyncWriteQueue.AsyncQueueRecord removedQueueRecord = writeQueue.poll(channel);
assert removedQueueRecord == queueRecord;
if (removedQueueRecord.callbackHandler != null) {
removedQueueRecord.callbackHandler.notifyWriteCompleted(channel, byteBuffer);
}
}
}
/**
* Callback method, which should be called by <code>SelectorHandler</code> to
* notify, that given <code>SelectableChannel</code> is going to be closed, so
* related <code>SelectableChannel</code> data could be released from
* <code>AsyncWriteQueue</code>
*
* @param channel <code>SelectableChannel</code>
* @throws java.io.IOException
*/
public void onClose(SelectableChannel channel) {
writeQueue.removeEntry(channel);
}
}
/*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the License). You may not use this file except in
* compliance with the License.
*
* You can obtain a copy of the license at
*
https://glassfish.dev.java.net/public/CDDLv1.0.html or
* glassfish/bootstrap/legal/CDDLv1.0.txt.
* See the License for the specific language governing
* permissions and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* Header Notice in each file and include the License file
* at glassfish/bootstrap/legal/CDDLv1.0.txt.
* If applicable, add the following below the CDDL Header,
* with the fields enclosed by brackets [] replaced by
* you own identifying information:
* "Portions Copyrighted [year] [name of copyright owner]"
*
* Copyright 2007 Sun Microsystems, Inc. All rights reserved.
*/
package com.sun.grizzly;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
/**
* Callback handler interface, used by AsyncWriter to notify custom code about
* completion of specific <code>ByteBuffer</code> writing.
*
* @author Alexey Stashok
*/
public interface AsyncWriteCallbackHandler {
public void notifyWriteCompleted(SelectableChannel channel, ByteBuffer buffer);
}
/*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the License). You may not use this file except in
* compliance with the License.
*
* You can obtain a copy of the license at
*
https://glassfish.dev.java.net/public/CDDLv1.0.html or
* glassfish/bootstrap/legal/CDDLv1.0.txt.
* See the License for the specific language governing
* permissions and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* Header Notice in each file and include the License file
* at glassfish/bootstrap/legal/CDDLv1.0.txt.
* If applicable, add the following below the CDDL Header,
* with the fields enclosed by brackets [] replaced by
* you own identifying information:
* "Portions Copyrighted [year] [name of copyright owner]"
*
* Copyright 2007 Sun Microsystems, Inc. All rights reserved.
*/
package com.sun.grizzly;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Class represents collection of <code>SelectableChannel</code>s and
* correspondent queue of <code>ByteBuffer</code>, which
* should be written asynchronously.
*
* @author Alexey Stashok
*/
public class AsyncWriteQueue {
private Map<SelectableChannel, ConcurrentLinkedQueue<AsyncQueueRecord>> queueMap =
new ConcurrentHashMap<SelectableChannel, ConcurrentLinkedQueue<AsyncQueueRecord>>();
/**
* Add data to the <code>AsyncWriteQueue</code>, corresponding to the given
* <code>SelectableChannel</code>
*
* @param channel <code>SelectableChannel</code>
* @param queueRecord write data unit
*/
public void offer(SelectableChannel channel, AsyncQueueRecord queueRecord) {
ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
if (queue == null) {
synchronized(channel) {
queue = queueMap.get(channel);
if (queue == null) {
queue = new ConcurrentLinkedQueue<AsyncQueueRecord>();
queueMap.put(channel, queue);
}
}
}
queue.offer(queueRecord);
}
/**
* Get head element of <code>SelectableChannel</code> write queue.
* Element will not be removed from queue.
*
* @param channel <code>SelectableChannel</code>
*
* @return <code>AsyncQueueRecord</code> write data unit
*/
public AsyncQueueRecord peek(SelectableChannel channel) {
ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
if (queue != null) {
return queue.peek();
}
return null;
}
/**
* Get head element of <code>SelectableChannel</code> write queue.
* Element will be removed from queue.
*
* @param channel <code>SelectableChannel</code>
*
* @return <code>AsyncQueueRecord</code> write data unit
*/
public AsyncQueueRecord poll(SelectableChannel channel) {
ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
if (queue != null) {
return queue.poll();
}
return null;
}
/**
* Remove head element of <code>SelectableChannel</code> write queue.
*
* @param channel <code>SelectableChannel</code>
*/
public void removeEntry(SelectableChannel channel) {
queueMap.remove(channel);
}
/**
* Get the size of <code>SelectableChannel</code> write queue.
*
* @param channel <code>SelectableChannel</code>
* @return size of <code>SelectableChannel</code> write queue.
*/
public int size(SelectableChannel channel) {
ConcurrentLinkedQueue<AsyncQueueRecord> queue = queueMap.get(channel);
return queue == null ? 0 : queue.size();
}
/**
* <code>AsyncWriteQueue</code> data unit
*/
public static class AsyncQueueRecord {
public ByteBuffer byteBuffer;
public AsyncWriteCallbackHandler callbackHandler;
public AsyncQueueRecord(ByteBuffer byteBuffer,
AsyncWriteCallbackHandler callbackHandler) {
this.byteBuffer = byteBuffer;
this.callbackHandler = callbackHandler;
}
}
}