dev@grizzly.java.net

Re: Async HTTP responses

From: Jeanfrancois Arcand <Jeanfrancois.Arcand_at_Sun.COM>
Date: Thu, 27 Nov 2008 09:37:44 -0500

Salut,

Oleksiy Stashok wrote:
>>
>
> Hi,
>
>> first, Great work as usual!!
> Thank you :)
>
>
>>> Hi,
>>> I've add support for async HTTP responses.
>>> The async HTTP response feature could be set using system properties,
>>> or programatically:
>>> selectorThread.setAsyncHttpWriteEnabled(true);
>>> It's also possible to tune the max number of bytebuffers stored in pool.
>>> SocketChannelOutputBuffer.setMaxBufferPoolSize(...)
>>> Default value is: 16384 (which is probably more than enough).
>>> Here, I attach the diff file
>>> Will appreciate the feedback.
>>
>> Can you send me binary so I can test the performance of this new
>> monster behavior?
> I'll probably commit this to the trunk. 1.9.0-SNAPSHOT, which, IMHO, is
> safe enough, because this behavior is not default.
> Also Async HTTP response impl. depends on changes I made for Async
> queue. They are not commited yet.
>
>
>> General questions (with the response you will be able to blog about it
>> :-)):
>>
>> 1. How many threads are you using when executing the write operations?
>> Do we have a dedicated thread pool? I know the answer for the thread
>> pool (no), but are you re-using the current thread pool? AIO does that
>> and there is some chances of locking the entire systems if you aren't
>> carefull.
> We're reusing Worker thread pool.
> How it could be locked? :)

because if all WorkerThreads blocks trying to write the queue.


>
>
>> 2. Can you make sure you compile using:
>>
>> % cd trunk
>> % export JAVA_HOME=JDK_7 b98
>> % mvn -f pom-jdk7.pom
>>
>> Just to make sure you don't break the AIO implementation?
>
> Can not :( jdk7 is something my Mac will not work with :)
> But I think it will not break anything... I didn't change any API.

LOL I forgot about your Mac :-)


>
>
>> 3. When you execute async write, do you:
>>
>> {A} clone the bb -> spawn a thread -> write clonedBB
>
>>
>> {B} create a new bb (newBB) -> spawn a thread -> write bb
>
> {B} will be chosen, when socketBuffer is enabled and {A} otherwise.

OK. Did you see my comments inline? Just checking :-)

A+

-- Jeanfrancois


>
> Thanks.
>
> WBR,
> Alexey.
>
>>
>>
>> With {b} you are avoiding a byte copy. I didn't deep dived into the
>> code to see how async write works exactly yet :-)
>>
>>
>>> Thanks.
>>> WBR,
>>> Alexey.
>>> # This patch file was generated by NetBeans IDE
>>> # Following Index: paths are relative to:
>>> /Users/oleksiys/Projects/Grizzly/trunk/modules/http
>>> # This patch can be applied using context Tools: Patch action on
>>> respective folder.
>>> # It uses platform neutral UTF-8 encoding and \n newlines.
>>> # Above lines and this line are ignored by the patching process.
>>> Index: src/main/java/com/sun/grizzly/http/DefaultProcessorTask.java
>>> --- src/main/java/com/sun/grizzly/http/DefaultProcessorTask.java Base
>>> (BASE)
>>> +++ src/main/java/com/sun/grizzly/http/DefaultProcessorTask.java
>>> Locally Modified (Based On LOCAL)
>>> @@ -403,6 +403,11 @@
>>> protected boolean disableUploadTimeout = true;
>>> + /**
>>> + * Flag, which indicates if async HTTP write is enabled
>>> + */
>>> + protected boolean isAsyncHttpWriteEnabled; + //
>>> ----------------------------------------------------- Constructor
>>> ---- //
>>> public DefaultProcessorTask(){
>>> @@ -449,6 +454,7 @@
>>> maxHttpHeaderSize,
>>>
>>> bufferResponse); }
>>> +
>>> request.setInputBuffer(inputBuffer);
>>> response.setOutputBuffer(outputBuffer);
>>> @@ -508,7 +514,11 @@
>>> inputStream = (InputReader)input;
>>> SocketChannelOutputBuffer channelOutputBuffer
>>> = ((SocketChannelOutputBuffer)outputBuffer);
>>> -
>>> channelOutputBuffer.setChannel((SocketChannel)key.channel());
>>> + channelOutputBuffer.setAsyncHttpWriteEnabled(
>>> + isAsyncHttpWriteEnabled);
>>> + channelOutputBuffer.setAsyncQueueWriter(
>>> +
>>> selectorThread.getSelectorHandler().getAsyncQueueWriter());
>>> + channelOutputBuffer.setSelectionKey(key);
>>> response.setChannel((SocketChannel)key.channel());
>>> } configPreProcess();
>>> @@ -1613,8 +1623,25 @@
>>> return uploadTimeout;
>>> }
>>> + /**
>>> + * Is async HTTP write enabled.
>>> + * @return <tt>true</tt>, if async HTTP write enabled, or
>>> <tt>false</tt>
>>> + * otherwise.
>>> + */
>>> + public boolean isAsyncHttpWriteEnabled() {
>>> + return isAsyncHttpWriteEnabled;
>>> + }
>>> /**
>>> + * Set if async HTTP write enabled.
>>> + * @param isAsyncHttpWriteEnabled <tt>true</tt>, if async HTTP
>>> write
>>> + * enabled, or <tt>false</tt> otherwise.
>>> + */
>>> + public void setAsyncHttpWriteEnabled(boolean
>>> isAsyncHttpWriteEnabled) {
>>> + this.isAsyncHttpWriteEnabled = isAsyncHttpWriteEnabled;
>>> + }
>>> + + /**
>>> * Register a new <code>RequestProcessor</code> instance.
>>> */
>>> private void registerMonitoring(){
>>> Index: src/main/java/com/sun/grizzly/http/SelectorThread.java
>>> --- src/main/java/com/sun/grizzly/http/SelectorThread.java Base (BASE)
>>> +++ src/main/java/com/sun/grizzly/http/SelectorThread.java Locally
>>> Modified (Based On LOCAL)
>>> @@ -195,6 +195,12 @@
>>> protected int maxHttpHeaderSize = Constants.DEFAULT_HEADER_SIZE;
>>> + /**
>>> + * Is Async HTTP write enabled.
>>> + */
>>> + protected boolean isAsyncHttpWriteEnabled;
>>> +
>>> +
>>> protected int maxPostSize = 2 * 1024 * 1024;
>>> @@ -964,7 +970,7 @@
>>> }
>>> - protected ProcessorTask
>>> configureProcessorTask(DefaultProcessorTask task){
>>> + protected ProcessorTask
>>> configureProcessorTask(DefaultProcessorTask task) {
>>> task.setAdapter(adapter);
>>> task.setMaxHttpHeaderSize(maxHttpHeaderSize);
>>> task.setBufferSize(requestBufferSize);
>>> @@ -975,6 +981,7 @@
>>> task.setMaxPostSize(maxPostSize);
>>> task.setTimeout(uploadTimeout);
>>> task.setDisableUploadTimeout(disableUploadTimeout);
>>> + task.setAsyncHttpWriteEnabled(isAsyncHttpWriteEnabled);
>>> if ( keepAliveCounter.dropConnection() ) {
>>> task.setDropConnection(true);
>>> @@ -1681,8 +1688,25 @@
>>> this.maxHttpHeaderSize = maxHttpHeaderSize;
>>> }
>>> + /**
>>> + * Is async HTTP write enabled
>>> + * @return <tt>true</tt>, if async HTTP write enabled,
>>> + * or <tt>false</tt> otherwise.
>>> + */
>>> + public boolean isAsyncHttpWriteEnabled() {
>>> + return isAsyncHttpWriteEnabled;
>>> + }
>>> /**
>>> + * Set if async HTTP write enabled
>>> + * @param isAsyncHttpWriteEnabled <tt>true</tt>, if async HTTP
>>> + * write enabled, or <tt>false</tt> otherwise.
>>> + */
>>> + public void setAsyncHttpWriteEnabled(boolean
>>> isAsyncHttpWriteEnabled) {
>>> + this.isAsyncHttpWriteEnabled = isAsyncHttpWriteEnabled;
>>> + }
>>> + + /**
>>> * The minimun threads created at startup.
>>> */ public void setMinThreads(int minWorkerThreads){
>>> @@ -1719,6 +1743,10 @@
>>> }
>>> + public TCPSelectorHandler getSelectorHandler() {
>>> + return selectorHandler;
>>> + }
>>> + public Controller getController() {
>>> return controller;
>>> }
>>> Index: src/main/java/com/sun/grizzly/http/SelectorThreadConfig.java
>>> --- src/main/java/com/sun/grizzly/http/SelectorThreadConfig.java Base
>>> (BASE)
>>> +++ src/main/java/com/sun/grizzly/http/SelectorThreadConfig.java
>>> Locally Modified (Based On LOCAL)
>>> @@ -139,6 +139,12 @@
>>> "com.sun.grizzly.useFileCache";
>>> + private final static String IS_ASYNC_HTTP_WRITE =
>>> + "com.sun.grizzly.http.asyncwrite.enabled";
>>> +
>>> + private final static String ASYNC_HTTP_WRITE_MAX_BUFFER_POOL_SIZE =
>>> + "com.sun.grizzly.http.asyncwrite.maxBufferPoolSize";
>>> +
>>> /**
>>> * The string manager for this package.
>>> */
>>> @@ -311,16 +317,25 @@
>>> }
>>> }
>>> - if (System.getProperty(USE_FILE_CACHE)!= null){
>>> selectorThread.setFileCacheIsEnabled(
>>> Boolean.valueOf(
>>>
>>> System.getProperty(USE_FILE_CACHE)).booleanValue());
>>>
>>> selectorThread.setLargeFileCacheEnabled(selectorThread.isFileCacheEnabled());
>>>
>>> }
>>> +
>>> + if (System.getProperty(IS_ASYNC_HTTP_WRITE)!= null) {
>>> + selectorThread.setAsyncHttpWriteEnabled(
>>> + Boolean.getBoolean(IS_ASYNC_HTTP_WRITE));
>>> }
>>> + if
>>> (System.getProperty(ASYNC_HTTP_WRITE_MAX_BUFFER_POOL_SIZE) != null) {
>>> +
>>> SocketChannelOutputBuffer.setMaxBufferPoolSize(Integer.getInteger(
>>> + ASYNC_HTTP_WRITE_MAX_BUFFER_POOL_SIZE, -1));
>>> + }
>>> + }
>>> + /**
>>> * Configure properties on {_at_link SelectorThread}
>>> */
>>> Index: src/main/java/com/sun/grizzly/http/SocketChannelOutputBuffer.java
>>> --- src/main/java/com/sun/grizzly/http/SocketChannelOutputBuffer.java
>>> Base (BASE)
>>> +++ src/main/java/com/sun/grizzly/http/SocketChannelOutputBuffer.java
>>> Locally Modified (Based On LOCAL)
>>> @@ -38,6 +38,10 @@
>>> package com.sun.grizzly.http;
>>> +import com.sun.grizzly.async.AsyncQueueWriteUnit;
>>> +import com.sun.grizzly.async.AsyncQueueWriter;
>>> +import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>>> +import com.sun.grizzly.async.ByteBufferCloner;
>>> import java.io.IOException;
>>> import java.io.OutputStream;
>>> import java.nio.ByteBuffer;
>>> @@ -45,7 +49,14 @@
>>> import com.sun.grizzly.util.OutputWriter;
>>> import com.sun.grizzly.tcp.Response;
>>> import com.sun.grizzly.tcp.http11.InternalOutputBuffer;
>>> +import com.sun.grizzly.util.ByteBufferFactory;
>>> import java.nio.channels.Channel;
>>> +import java.nio.channels.SelectionKey;
>>> +import java.util.Queue;
>>> +import java.util.concurrent.ArrayBlockingQueue;
>>> +import java.util.concurrent.Future;
>>> +import java.util.logging.Level;
>>> +import java.util.logging.Logger;
>>> /**
>>> * Output buffer.
>>> @@ -54,16 +65,60 @@
>>> * * @author Jean-Francois Arcand
>>> * @author Scott Oaks
>>> + * @author Alexey Stashok
>>> */
>>> -public class SocketChannelOutputBuffer extends InternalOutputBuffer{
>>> +public class SocketChannelOutputBuffer extends InternalOutputBuffer {
>>> + private static Logger logger = SelectorThread.logger();
>>> + private static final int DEFAULT_BUFFER_POOL_SIZE = 16384;
>>> +
>>> + private static int maxBufferPoolSize = DEFAULT_BUFFER_POOL_SIZE;
>>> +
>>> /**
>>> + * ByteBuffer pool to be used with async write
>>> + */
>>> + private static Queue<ByteBuffer> bufferPool =
>>> + new ArrayBlockingQueue<ByteBuffer>(maxBufferPoolSize);
>>
>> Can you instead have a pool per instance, and limit the size of the
>> pool based on the number of maxThreads? Take a look at AIO
>>
>> http-aio/src/main/java/com/sun/grizzly/aio/http/AsyncSocketChannelOutputBuffer.java
>>
>>
>> there I avoid synchronized on a single queue.
>>
>>
>>> +
>>> + /**
>>> + * {_at_link ByteBufferCloner} implementation, which is called by
>>> Grizzly
>>> + * framework at the time, when asynchronous write queue can not
>>> write
>>> + * the buffer direcly on socket and instead will put it in queue.
>>> + * This implementation tries to get temporary ByteBuffer from
>>> the pool,
>>> + * if no ByteBuffer is available - then new one will be created.
>>> + */
>>> + private static final ByteBufferCloner asyncHttpByteBufferCloner =
>>> + new ByteBufferClonerImpl();
>>> +
>>> + /**
>>> + * {_at_link AsyncWriteCallbackHandler} implementation, which is
>>> responsible
>>> + * for returning cloned ByteBuffers to the pool
>>> + */
>>> + private static final AsyncWriteCallbackHandler
>>> asyncHttpWriteCallbackHandler =
>>> + new AsyncWriteCallbackHandlerImpl();
>>> +
>>> + /**
>>> * Underlying output channel.
>>> */
>>> protected Channel channel;
>>> + /**
>>> + * Underlying selection key of the output channel.
>>> + */
>>> + protected SelectionKey selectionKey;
>>> /**
>>> + * Flag, which indicates if async HTTP write is enabled
>>> + */
>>> + protected boolean isAsyncHttpWriteEnabled;
>>> + + /**
>>> + * Asynchronous queue writer, which will be used if asyncHttp mode
>>> + * is enabled
>>> + */
>>> + protected AsyncQueueWriter asyncQueueWriter;
>>> + + /**
>>> * Underlying ByteByteBuffer
>>> */
>>> protected ByteBuffer outputByteBuffer;
>>> @@ -132,6 +187,66 @@
>>> return channel;
>>> }
>>> +
>>> + /**
>>> + * Gets the underlying selection key of the output channel.
>>> + * @return the underlying selection key of the output channel.
>>> + */
>>> + public SelectionKey getSelectionKey() {
>>> + return selectionKey;
>>> + }
>>> +
>>> + + /**
>>> + * Sets the underlying selection key of the output channel.
>>> + * @param selectionKey the underlying selection key of the
>>> output channel.
>>> + */
>>> + public void setSelectionKey(SelectionKey selectionKey) {
>>> + this.selectionKey = selectionKey;
>>> + channel = selectionKey.channel();
>>> + }
>>
>> setChannel() is always invoked by DefaultProcessorTask
>>
>>> +
>>> + /**
>>> + * Is async HTTP write enabled.
>>> + * @return <tt>true</tt>, if async HTTP write enabled, or
>>> <tt>false</tt>
>>> + * otherwise.
>>> + */
>>> + public boolean isAsyncHttpWriteEnabled() {
>>> + return isAsyncHttpWriteEnabled;
>>> + }
>>> +
>>> + /**
>>> + * Set if async HTTP write enabled.
>>> + * @param isAsyncHttpWriteEnabled <tt>true</tt>, if async HTTP
>>> write
>>> + * enabled, or <tt>false</tt> otherwise.
>>> + */
>>> + public void setAsyncHttpWriteEnabled(boolean
>>> isAsyncHttpWriteEnabled) {
>>> + this.isAsyncHttpWriteEnabled = isAsyncHttpWriteEnabled;
>>> + }
>>> +
>>> + /**
>>> + * Gets the asynchronous queue writer, which will be used if
>>> asyncHttp mode
>>> + * is enabled
>>> + * + * @return The asynchronous queue writer, which will be
>>> used if asyncHttp
>>> + * mode is enabled
>>> + */
>>> + protected AsyncQueueWriter getAsyncQueueWriter() {
>>> + return asyncQueueWriter;
>>> + }
>>> +
>>> + /**
>>> + * Sets the asynchronous queue writer, which will be used if
>>> asyncHttp mode
>>> + * is enabled
>>> + * + * @param asyncQueueWriter The asynchronous queue
>>> writer, which will be
>>> + * used if asyncHttp mode is enabled
>>> + */
>>> + protected void setAsyncQueueWriter(AsyncQueueWriter
>>> asyncQueueWriter) {
>>> + this.asyncQueueWriter = asyncQueueWriter;
>>> + }
>>> + +
>>> // ---------------------------------------------------------
>>> Public Methods
>>> /**
>>> @@ -180,10 +295,28 @@
>>> * Flush the buffer by looping until the {_at_link ByteBuffer} is empty
>>> * @param bb the ByteBuffer to write.
>>> */ - public void flushChannel(ByteBuffer bb) throws
>>> IOException{
>>> - OutputWriter.flushChannel(((SocketChannel)channel), bb);
>>> + public void flushChannel(ByteBuffer bb) throws IOException {
>>> + if (logger.isLoggable(Level.FINEST)) {
>>> + logger.finest("flushChannel isAsyncHttpWriteEnabled=" +
>>> + isAsyncHttpWriteEnabled + " bb=" + bb);
>>> + }
>>> +
>>> + if (!isAsyncHttpWriteEnabled) {
>>> + OutputWriter.flushChannel(((SocketChannel) channel), bb);
>>> bb.clear();
>>> + } else if (asyncQueueWriter != null) {
>>> + Future future = asyncQueueWriter.write(selectionKey, bb,
>>> + asyncHttpWriteCallbackHandler, null,
>>> + asyncHttpByteBufferCloner);
>>> + if (logger.isLoggable(Level.FINEST)) {
>>> + logger.finest("async flushChannel isDone=" +
>>> future.isDone());
>>> }
>>> + bb.clear();
>>> + } else {
>>> + SelectorThread.logger().log(Level.WARNING,
>>> + "HTTP async write is enabled, but AsyncWriter is
>>> null.");
>>> + }
>>> + }
>>> /**
>>> @@ -270,6 +403,102 @@
>>> }
>>> + /**
>>> + * {_at_link AsyncWriteCallbackHandler} implementation, which is
>>> responsible
>>> + * for returning cloned ByteBuffers to the pool
>>> + */
>>> + private static final class AsyncWriteCallbackHandlerImpl implements
>>> + AsyncWriteCallbackHandler {
>>> + public void onWriteCompleted(SelectionKey key,
>>> + AsyncQueueWriteUnit writtenRecord) {
>>> + if (logger.isLoggable(Level.FINEST)) {
>>> + logger.finest("onWriteCompleted isCloned=" +
>>> + writtenRecord.isCloned());
>>> + }
>>> + + if (writtenRecord.isCloned()) {
>>> + returnBuffer(writtenRecord.getByteBuffer());
>>> + }
>>> + }
>>> +
>>> + public void onIOException(IOException ioException,
>>> SelectionKey key,
>>> + ByteBuffer buffer, Queue<AsyncQueueWriteUnit>
>>> remainingQueue) {
>>> + if (logger.isLoggable(Level.FINEST)) {
>>> + logger.finest("onIOException key=" + key +
>>> + " exception=" + ioException);
>>> + }
>>> + returnBuffer(buffer);
>>> + + for(AsyncQueueWriteUnit unit :
>>> remainingQueue) {
>>> + returnBuffer(unit.getByteBuffer());
>>> + }
>>> + }
>>> +
>>> + private boolean returnBuffer(ByteBuffer buffer) {
>>> + buffer.clear();
>>> + int size = buffer.remaining();
>>> + if (logger.isLoggable(Level.FINEST)) {
>>> + logger.finest("return buffer buffer=" + buffer + "
>>> maxSize=" +
>>> + maxBufferedBytes);
>>> + }
>>> +
>>> + if (size <= maxBufferedBytes) {
>>> + boolean wasReturned = bufferPool.offer(buffer);
>>> + if (logger.isLoggable(Level.FINEST)) {
>>> + logger.finest("return buffer to pool. result=" +
>>> wasReturned);
>>> + }
>>> +
>>> + return wasReturned;
>>> + }
>>> +
>>> + return false;
>>> + }
>>> + }
>>> +
>>> + /**
>>> + * {_at_link ByteBufferCloner} implementation, which is called by
>>> Grizzly
>>> + * framework at the time, when asynchronous write queue can not
>>> write
>>> + * the buffer direcly on socket and instead will put it in queue.
>>> + * This implementation tries to get temporary ByteBuffer from
>>> the pool,
>>> + * if no ByteBuffer is available - then new one will be created.
>>> + */
>>> + private static final class ByteBufferClonerImpl
>>> + implements ByteBufferCloner {
>>> + + public ByteBuffer clone(ByteBuffer
>>> originalByteBuffer) {
>>> + if (logger.isLoggable(Level.FINEST)) {
>>> + logger.finest("clone buffer=" + originalByteBuffer +
>>> + " maxBufferedBytes=" + maxBufferedBytes);
>>> + }
>>> + + int size = originalByteBuffer.remaining();
>>> +
>>> + ByteBuffer clone = null;
>>> + + if (size <= maxBufferedBytes) {
>>> + clone = bufferPool.poll();
>>> + }
>>> +
>>> + if (logger.isLoggable(Level.FINEST)) {
>>> + logger.finest("clone buffer from pool=" + clone);
>>> + }
>>> + + if (clone == null || clone.remaining() <
>>> size) {
>>> + int allocateSize = Math.max(size, maxBufferedBytes /
>>> 2);
>>> + clone = createByteBuffer(allocateSize,
>>> + originalByteBuffer.isDirect());
>>> + }
>>> +
>>> + clone.put(originalByteBuffer);
>>> + clone.flip();
>>> + return clone;
>>> + }
>>> + }
>>> +
>>> + private static ByteBuffer createByteBuffer(int size, boolean
>>> isDirect) {
>>> + return ByteBufferFactory.allocateView(size, isDirect);
>>> + }
>>> +
>>> public static int getMaxBufferedBytes() {
>>> return maxBufferedBytes;
>>> }
>>> @@ -278,4 +507,20 @@
>>> public static void setMaxBufferedBytes(int aMaxBufferedBytes) {
>>> maxBufferedBytes = aMaxBufferedBytes;
>>> }
>>> +
>>> +
>>> + public static void setMaxBufferPoolSize(int size) {
>>> + int poolSize = (size >= 0) ? size : DEFAULT_BUFFER_POOL_SIZE;
>>> +
>>> + if (maxBufferPoolSize == poolSize) return;
>>> + + maxBufferPoolSize = poolSize;
>>> +
>>> + bufferPool = new
>>> ArrayBlockingQueue<ByteBuffer>(maxBufferPoolSize);
>>> }
>>> +
>>
>> Hum I don't like set method that does works :-) Can we change the name
>> to ajustMaxBufferPoolSize()?
>>
>>
>>
>>> +
>>> + public static int getMaxBufferPoolSize() {
>>> + return maxBufferPoolSize;
>>> + }
>>> +}
>>> Index: src/test/java/com/sun/grizzly/http/ArpSSLTest.java
>>> --- src/test/java/com/sun/grizzly/http/ArpSSLTest.java Base (BASE)
>>> +++ src/test/java/com/sun/grizzly/http/ArpSSLTest.java Locally
>>> Modified (Based On LOCAL)
>>> @@ -37,7 +37,6 @@
>>> */
>>> package com.sun.grizzly.http;
>>> -import com.sun.grizzly.arp.AsyncTask;
>>> import com.sun.grizzly.arp.AsyncHandler;
>>> import com.sun.grizzly.arp.AsyncFilter;
>>> import com.sun.grizzly.arp.AsyncExecutor;
>>> Index: src/test/java/com/sun/grizzly/http/AsyncHTTPResponseTest.java
>>> --- src/test/java/com/sun/grizzly/http/AsyncHTTPResponseTest.java
>>> Locally New
>>> +++ src/test/java/com/sun/grizzly/http/AsyncHTTPResponseTest.java
>>> Locally New
>>> @@ -0,0 +1,132 @@
>>> +/*
>>> + *
>>> + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
>>> + *
>>> + * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
>>> + *
>>> + * The contents of this file are subject to the terms of either the GNU
>>> + * General Public License Version 2 only ("GPL") or the Common
>>> Development
>>> + * and Distribution License("CDDL") (collectively, the "License"). You
>>> + * may not use this file except in compliance with the License. You
>>> can obtain
>>> + * a copy of the License at
>>> https://glassfish.dev.java.net/public/CDDL+GPL.html
>>> + * or glassfish/bootstrap/legal/LICENSE.txt. See the License for
>>> the specific
>>> + * language governing permissions and limitations under the License.
>>> + *
>>> + * When distributing the software, include this License Header
>>> Notice in each
>>> + * file and include the License file at
>>> glassfish/bootstrap/legal/LICENSE.txt.
>>> + * Sun designates this particular file as subject to the "Classpath"
>>> exception
>>> + * as provided by Sun in the GPL Version 2 section of the License
>>> file that
>>> + * accompanied this code. If applicable, add the following below
>>> the License
>>> + * Header, with the fields enclosed by brackets [] replaced by your own
>>> + * identifying information: "Portions Copyrighted [year]
>>> + * [name of copyright owner]"
>>> + *
>>> + * Contributor(s):
>>> + *
>>> + * If you wish your version of this file to be governed by only the
>>> CDDL or
>>> + * only the GPL Version 2, indicate your decision by adding
>>> "[Contributor]
>>> + * elects to include this software in this distribution under the
>>> [CDDL or GPL
>>> + * Version 2] license." If you don't indicate a single choice of
>>> license, a
>>> + * recipient has the option to distribute your version of this file
>>> under
>>> + * either the CDDL, the GPL Version 2 or to extend the choice of
>>> license to
>>> + * its licensees as provided above. However, if you add GPL Version
>>> 2 code
>>> + * and therefore, elected the GPL Version 2 license, then the option
>>> applies
>>> + * only if the new code is made subject to such option by the copyright
>>> + * holder.
>>> + *
>>> + */
>>> +
>>> +package com.sun.grizzly.http;
>>> +
>>> +import com.sun.grizzly.tcp.Adapter;
>>> +import com.sun.grizzly.tcp.Request;
>>> +import com.sun.grizzly.tcp.Response;
>>> +import com.sun.grizzly.tcp.StaticResourcesAdapter;
>>> +import com.sun.grizzly.util.buf.ByteChunk;
>>> +import com.sun.grizzly.util.buf.CharChunk;
>>> +import java.net.HttpURLConnection;
>>> +import java.net.URL;
>>> +import junit.framework.TestCase;
>>> +
>>> +/**
>>> + *
>>> + * @author Alexey Stashok
>>> + */
>>> +public class AsyncHTTPResponseTest extends TestCase {
>>> + public static final int PORT = 18890;
>>> +
>>> + private static final byte[] abcd = new byte[] {'a', 'b', 'c', 'd'};
>>> +
>>> + public void testSimpleAsyncResponse() throws Exception {
>>> + int responseLength = 1024 * 1024;
>>> +
>>> + SelectorThread selectorThread = new SelectorThread();
>>> + try {
>>> + selectorThread.setPort(PORT);
>>> + selectorThread.setAsyncHttpWriteEnabled(true);
>>> + selectorThread.setAdapter(
>>> + new BigResponseAdapter(responseLength));
>>> + selectorThread.listen();
>>> +
>>> + HttpURLConnection connection = (HttpURLConnection)
>>> + new URL("http://localhost:" +
>>> PORT).openConnection();
>>> +
>>> + int code = connection.getResponseCode();
>>> + assertEquals(code, 200);
>>> +
>>> + int length = connection.getContentLength();
>>> + assertEquals(length, responseLength);
>>> +
>>> + byte[] content = new byte[length];
>>> +
>>> + int readBytes;
>>> + int offset = 0;
>>> + do {
>>> + readBytes =
>>> connection.getInputStream().read(content, offset,
>>> + length - offset);
>>> + offset += readBytes;
>>> + } while(readBytes != -1 && offset < length);
>>> +
>>> + assertEquals(offset, length);
>>> +
>>> + checkResult(content);
>>> + } finally {
>>> + selectorThread.stopEndpoint();
>>> + }
>>> + }
>>> +
>>> + public static class BigResponseAdapter implements Adapter {
>>> + private int length;
>>> +
>>> + public BigResponseAdapter(int length) {
>>> + this.length = length;
>>> + }
>>> +
>>> + public void service(Request req, Response res) throws
>>> Exception {
>>> + ByteChunk chunk = new ByteChunk(length);
>>> + byte[] content = new byte[length];
>>> + for (int i=0; i<length; i++) {
>>> + content[i] = abcd[i % abcd.length];
>>> + }
>>> + + res.setStatus(200);
>>> + res.setContentLength(length);
>>> + res.setContentType("text/html");
>>> + chunk.append(content, 0, length);
>>> + res.getOutputBuffer().doWrite(chunk, res);
>>> + }
>>> +
>>> + public void afterService(Request req, Response res) throws
>>> Exception {
>>> + }
>>> + }
>>> +
>>> + private static boolean checkResult(byte[] content) {
>>> + for (int i=0; i<content.length; i++) {
>>> + if (content[i] != abcd[i % abcd.length]) {
>>> + return false;
>>> + }
>>> + }
>>> +
>>> + return true;
>>> + }
>>> +}
>>
>> Looks good!!!
>>
>> -- Jeanfrancois
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>