users@grizzly.java.net

Re: How to implement the client's message flow based on async read/write?

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Mon, 09 Jun 2008 14:55:09 +0200

Hi Yi,

>
> 1. in the grizzly-based nio client, i try to establish a relatively
> large number of connections to a server, e.g. 30,000. i avoid 30s
> closing idle connections through the SelectionKeyHandler. my problem
> is that it seems that about 5,000 connections is the limit. no more
> connections could be established after that. btw, i've already
> adjusted the system tcp parameters, such as the tcp connection
> number (greater than 30,000) and the keep-alive time.
It should be some system limitations, in Grizzly we don't have any. If
you will provide me the code, I can test how many connections I can
create on my local machine, just to compare the results.
Also it's interesting what kind of scenario you have, which requires
30K+ tcp connections? :))

>
> 2. i want to iterate over the ConnectorHandlerPool to perform
> transaction on each connection. but it seems that
> ConnectorHandlerPool does not support the iteration explicitly. so
> as a walk-around, i collect the ConnectorHandler after executing
> ConnectorHandler.finishConnect manually.
May be it's the only way right now. As ConnectorHandlerPool initially
wasn't introduced to serve existing live connections, but to cache/
reuse ones, which were released (closed).

Thanks.

WBR,
Alexey.

>
>
> any help is greatly appreciated. thanks
> -Yi Bing
>
> 2008/5/28 Oleksiy Stashok <Oleksiy.Stashok_at_sun.com>:
> Hi,
>
>>
>> So once ConnectorHandler is connected - you can start to use async.
>> read/write queues, don't wait for CallbackHandler.onRead/Write
>> notification calls. But for sure it's possible to combine that as you
>> do.
>> -------------------------------------------
>> It's the truth.
>> After I revise the size of the readBuffer from 10240 to 1024, the
>> async. read/write operations work as expected. So, what the general
>> async. buffer rules? I think I could investigate AsyncReadCondition.
>> Any suggestion?
>
> One note with AsyncQueueReader: when data comes to the client
> channel, framework checks if there is ByteBuffer in AsyncReaderQueue
> waiting for incoming data, if not - CallbackHandler.onRead() will be
> called. So, if you want to use just AsyncReaderQueue - you have to
> be always sure, that there is ByteBuffer in queue waiting for
> incoming data :)
>
> IMHO, for your scenario it makes sense to use combined method:
> CallbackHandler to process client incoming data (implement
> CallbackHandler.onRead()), and use AsyncWriteQueue to send data to
> the server (don't put any logic to the CallbackHandler.onWrite()).
>
> If you will have any question on that - just let me know :)
>
>>
>> Another, it seems TcpSelectorHandler(true) is the same as
>> TcpSelectorHandler() in this client example.
> Not sure it's the same. Depending on that TCPSelectorHandler will
> pass different initialization steps.
>
> Thanks.
>
> WBR,
> Alexey.
>
>>
>>
>> Thanks!
>>
>> 2008/5/28, JianXing Yi <jianxing.yi_at_gmail.com>:
>>> Many thanks for your help!
>>>
>>> 2008/5/27, Oleksiy Stashok <Oleksiy.Stashok_at_sun.com>:
>>>> Hi,
>>>>
>>>> try to call handler.readFromAsyncQueue() before writing anything...
>>>> like following...
>>>>
>>>>> @Override
>>>>> public void onWrite(IOEvent<Context> ioEvent) {
>>>>> System.out.println("onWrite()");
>>>>> try {
>>>>
>>>> connectorHandler.readFromAsyncQueue(......);
>>>>>
>>>>> connectorHandler.writeToAsyncQueue(ByteBuffer
>>>>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>>>>> .getBytes()), asyncWriteHandler);
>>>>> // ioEvent.attachment().getSelectionKey().interestOps(
>>>>> // SelectionKey.OP_READ);
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>
>>>> The trick with async read/write queues is that you can use it
>>>> without
>>>> CallbackHandler's notifications. Framework will take care about
>>>> sending and receiving data async. itself.
>>> After reading your mail, I've tried this but it seems not work.
>>> What a pity
>>> :-(.
>>> But the way your mentioned is exactly what I want. I've found some
>>> API
>>> performing registration IO EVENT HANDLERs to Grizzly framework but
>>> failed.
>>> It seems that Grizzly does not work in a direct EVENT-HANDLER
>>> way... Am I
>>> misunderstanding?
>>> If working in the conbination way you mentioned, then
>>> SelectionKey(s) should
>>> be properly registered in right place. I do this as below:
>>> 1. ConnectorHandler.connect()
>>> 2. ConnectorCallbackHandler.onConnect(): register OP_WRITE to
>>> trigger
>>> ConnectorCallbackHandler.onWrite()
>>> 3. ConnectorCallbackHandler.onWrite()
>>> *firstly write the data in async way
>>> *register OP_READ to trigger ConnectorCallbackHandler.onRead() in
>>> AsyncWriteCallbackHandler.onWriteCompleted() method.
>>> 4. ConnectorCallbackHandler.onRead():
>>> *read the data in async way
>>> *get the complete data in AsyncReadCallbackHandler.onReadCompleted()
>>> AM I IN THE RIGHT WAY? OR ANY OTHER MISUNDERSTANDINGS?
>>>
>>> Thanks again!
>>>
>>>> So once ConnectorHandler is connected - you can start to use async.
>>>> read/write queues, don't wait for CallbackHandler.onRead/Write
>>>> notification calls.
>>>> But for sure it's possible to combine that as you do.
>>>>
>>>> Thanks.
>>>>
>>>> WBR,
>>>> Alexey.
>>>>
>>>> On May 27, 2008, at 10:47 , JianXing Yi wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> Undoutly, Grizzly is a good NIO framework. For practice purpose,
>>>>> I'm
>>>>> writing a NIO client sending a HTTP request and receiving its
>>>>> response
>>>>> based on Grizzly.
>>>>>
>>>>> 1. Performing connecting to server in the ConnectorHandler's
>>>>> onConnect() method, at the same time, for writing HTTP GET request
>>>>> purpose, the OP_WRITE interest registered in this method. I
>>>>> think this
>>>>> registration will be resulted in triggering the ConnectorHandler's
>>>>> onWrite() method.
>>>>> 2. Performing writing in the ConnectorHandler's onWrite() method,
>>>>> through the asynchronous way like this:
>>>>> @Override
>>>>> public void onWrite(IOEvent<Context> ioEvent) {
>>>>> System.out.println("onWrite()");
>>>>> try {
>>>>> connectorHandler.writeToAsyncQueue(ByteBuffer
>>>>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>>>>> .getBytes()), asyncWriteHandler);
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>> 3. Registering OP_READ interest in the AsyncWriteCallbackHandler's
>>>>> onWriteCompleted() method. By doing this, I think the
>>>>> ConnectorHandler's onRead() method will be triggered, so I can
>>>>> read
>>>>> the HTTP response. If reading the response in blocking way, then
>>>>> there
>>>>> is no problems.
>>>>> MY PROBLEM IS: IF I USE ConnectorHandler(in fact a
>>>>> TcpConnectorHandler).readFromAsyncQueue() METHOD TO READ THE
>>>>> RESPONSE,
>>>>> THEN I CANNOT GET THE RESPONSE CORRECTLY. WHAT'S MORE, THE
>>>>> AsyncReadCallbackHandler's onReadCompleted() METHOD NEVER BEING
>>>>> CALLED!
>>>>>
>>>>> Below is the testing client code. Thanks for your help!
>>>>>
>>>>>
>>>>> package tests.grizzly;
>>>>>
>>>>> import java.io.IOException;
>>>>> import java.net.InetAddress;
>>>>> import java.net.InetSocketAddress;
>>>>> import java.net.SocketAddress;
>>>>> import java.nio.ByteBuffer;
>>>>> import java.nio.channels.SelectionKey;
>>>>> import java.nio.charset.Charset;
>>>>> import java.util.Queue;
>>>>> import java.util.concurrent.CountDownLatch;
>>>>>
>>>>> import com.sun.grizzly.CallbackHandler;
>>>>> import com.sun.grizzly.Context;
>>>>> import com.sun.grizzly.Controller;
>>>>> import com.sun.grizzly.ControllerStateListenerAdapter;
>>>>> import com.sun.grizzly.DefaultProtocolChain;
>>>>> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
>>>>> import com.sun.grizzly.IOEvent;
>>>>> import com.sun.grizzly.ProtocolChain;
>>>>> import com.sun.grizzly.ProtocolFilter;
>>>>> import com.sun.grizzly.TCPConnectorHandler;
>>>>> import com.sun.grizzly.TCPSelectorHandler;
>>>>> import com.sun.grizzly.Controller.Protocol;
>>>>> import com.sun.grizzly.async.AsyncReadCallbackHandler;
>>>>> import com.sun.grizzly.async.AsyncReadQueueRecord;
>>>>> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>>>>> import com.sun.grizzly.async.AsyncWriteQueueRecord;
>>>>> import com.sun.grizzly.util.WorkerThread;
>>>>>
>>>>> public class TestNonblockingClient10 {
>>>>>
>>>>> public class MyProtocolFilter implements ProtocolFilter {
>>>>>
>>>>> @Override
>>>>> public boolean execute(Context ctx) throws IOException {
>>>>> System.out.println("execute()");
>>>>> ByteBuffer byteBuffer = ((WorkerThread) Thread.currentThread())
>>>>> .getByteBuffer();
>>>>>
>>>>> System.out.println(Charset.defaultCharset().decode(byteBuffer));
>>>>> return true;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public boolean postExecute(Context ctx) throws IOException {
>>>>> System.out.println("postExecute()");
>>>>> return true;
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>> /**
>>>>> * @param args
>>>>> * @throws IOException
>>>>> * @throws InterruptedException
>>>>> */
>>>>> public static void main(String[] args) throws
>>>>> InterruptedException,
>>>>> IOException {
>>>>> TestNonblockingClient10 client = new TestNonblockingClient10();
>>>>> client.start();
>>>>> }
>>>>>
>>>>> private Controller controller;
>>>>> private TCPConnectorHandler connectorHandler;
>>>>> protected CountDownLatch controllerReadyLatch = new
>>>>> CountDownLatch(1);
>>>>> ByteBuffer readBuffer = ByteBuffer.allocateDirect(10240);
>>>>> CallbackHandler<Context> connectorCallbackHandler = new
>>>>> CallbackHandler<Context>() {
>>>>>
>>>>> @Override
>>>>> public void onConnect(IOEvent<Context> ioEvent) {
>>>>> try {
>>>>> connectorHandler.finishConnect(ioEvent.attachment()
>>>>> .getSelectionKey());
>>>>> System.out.println("connected.");
>>>>> // trigger the ConnectorHandler's onWrite() method.
>>>>> ioEvent.attachment().getSelectionKey().interestOps(
>>>>> SelectionKey.OP_WRITE);
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void onRead(IOEvent<Context> ioEvent) {
>>>>> System.out.println("onRead()");
>>>>> Context context = ioEvent.attachment();
>>>>> // context.getSelectionKey().interestOps(SelectionKey.OP_READ);
>>>>> // try {
>>>>> // context.getProtocolChain().execute(context);
>>>>> // } catch (Exception e) {
>>>>> // // TODO Auto-generated catch block
>>>>> // e.printStackTrace();
>>>>> // }
>>>>> try {
>>>>> connectorHandler.read(readBuffer, true);
>>>>>
>>>>> System.out.println(Charset.defaultCharset().decode(readBuffer));
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void onWrite(IOEvent<Context> ioEvent) {
>>>>> System.out.println("onWrite()");
>>>>> try {
>>>>> connectorHandler.writeToAsyncQueue(ByteBuffer
>>>>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>>>>> .getBytes()), asyncWriteHandler);
>>>>> // ioEvent.attachment().getSelectionKey().interestOps(
>>>>> // SelectionKey.OP_READ);
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>> };
>>>>> protected AsyncReadCallbackHandler asyncReadHandler = new
>>>>> AsyncReadCallbackHandler() {
>>>>>
>>>>> @Override
>>>>> public void onIOException(IOException ioException,
>>>>> SelectionKey key,
>>>>> ByteBuffer buffer, Queue<AsyncReadQueueRecord>
>>>>> remainingQueue) {
>>>>> System.out.println("onIOException()");
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void onReadCompleted(SelectionKey key, SocketAddress
>>>>> srcAddress,
>>>>> ByteBuffer buffer) {
>>>>> System.out.println("onReadCompleted()");
>>>>> System.out.println(Charset.defaultCharset().decode(buffer));
>>>>> }
>>>>> };
>>>>> protected AsyncWriteCallbackHandler asyncWriteHandler = new
>>>>> AsyncWriteCallbackHandler() {
>>>>>
>>>>> @Override
>>>>> public void onIOException(IOException ioException,
>>>>> SelectionKey key,
>>>>> ByteBuffer buffer, Queue<AsyncWriteQueueRecord>
>>>>> remainingQueue) {
>>>>> // TODO Auto-generated method stub
>>>>>
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void onWriteCompleted(SelectionKey key, ByteBuffer
>>>>> buffer) {
>>>>> System.out.println("onWriteCompleted()");
>>>>> key.interestOps(SelectionKey.OP_READ);
>>>>> }
>>>>> };
>>>>>
>>>>> private void start() throws InterruptedException, IOException {
>>>>> controller = createController();
>>>>> new Thread(controller).start();
>>>>>
>>>>> controllerReadyLatch.await();
>>>>>
>>>>> connectorHandler = (TCPConnectorHandler) controller
>>>>> .acquireConnectorHandler(Protocol.TCP);
>>>>>
>>>>> // trigger the connector handler's onConnect() method.
>>>>> connectorHandler.connect(new InetSocketAddress(InetAddress
>>>>> .getLocalHost(), 8080), connectorCallbackHandler);
>>>>> }
>>>>>
>>>>> private Controller createController() {
>>>>> Controller tmpController = new Controller();
>>>>>
>>>>> TCPSelectorHandler tcpSelectorHandler = new
>>>>> TCPSelectorHandler(true);
>>>>> tmpController.setSelectorHandler(tcpSelectorHandler);
>>>>>
>>>>> DefaultProtocolChainInstanceHandler instanceHandler = new
>>>>> DefaultProtocolChainInstanceHandler() {
>>>>>
>>>>> private ProtocolChain protocolChain;
>>>>>
>>>>> @Override
>>>>> public ProtocolChain poll() {
>>>>>> if (protocolChain == null) {
>>>>>> protocolChain = new DefaultProtocolChain();
>>>>>> }
>>>>>> return protocolChain;
>>>>>> }
>>>>>> };
>>>>>> // instanceHandler.poll().addFilter(new ReadFilter());
>>>>>> // instanceHandler.poll().addFilter(new MyProtocolFilter());
>>>>>>
>>>>>> tmpController.setProtocolChainInstanceHandler(instanceHandler);
>>>>>>
>>>>>> tmpController.addStateListener(new
>>>>>> ControllerStateListenerAdapter() {
>>>>>> @Override
>>>>>> public void onReady() {
>>>>>> controllerReadyLatch.countDown();
>>>>>> super.onReady();
>>>>>> }
>>>>>> });
>>>>>> return tmpController;
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>>
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>>>
>>>>>
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>
>
>