users@grizzly.java.net

Re: How to implement the client's message flow based on async read/write?

From: JianXing Yi <jianxing.yi_at_gmail.com>
Date: Fri, 6 Jun 2008 13:43:02 +0800

Hi,

1. in the grizzly-based nio client, i try to establish a relatively large
number of connections to a server, e.g. 30,000. i avoid 30s closing idle
connections through the SelectionKeyHandler. my problem is that it seems
that about 5,000 connections is the limit. no more connections could be
established after that. btw, i've already adjusted the system tcp
parameters, such as the tcp connection number (greater than 30,000) and the
keep-alive time.

2. i want to iterate over the ConnectorHandlerPool to perform transaction on
each connection. but it seems that ConnectorHandlerPool does not support the
iteration explicitly. so as a walk-around, i collect the ConnectorHandler
after executing ConnectorHandler.finishConnect manually.

any help is greatly appreciated. thanks
-Yi Bing

2008/5/28 Oleksiy Stashok <Oleksiy.Stashok_at_sun.com>:

> Hi,
>
>
> So once ConnectorHandler is connected - you can start to use async.
> read/write queues, don't wait for CallbackHandler.onRead/Write
> notification calls. But for sure it's possible to combine that as you
> do.
> -------------------------------------------
> It's the truth.
> After I revise the size of the readBuffer from 10240 to 1024, the
> async. read/write operations work as expected. So, what the general
> async. buffer rules? I think I could investigate AsyncReadCondition.
> Any suggestion?
>
> One note with AsyncQueueReader: when data comes to the client channel,
> framework checks if there is ByteBuffer in AsyncReaderQueue waiting for
> incoming data, if not - CallbackHandler.onRead() will be called. So, if you
> want to use just AsyncReaderQueue - you have to be always sure, that there
> is ByteBuffer in queue waiting for incoming data :)
>
> IMHO, for your scenario it makes sense to use combined method:
> CallbackHandler to process client incoming data (implement
> CallbackHandler.onRead()), and use AsyncWriteQueue to send data to the
> server (don't put any logic to the CallbackHandler.onWrite()).
>
> If you will have any question on that - just let me know :)
>
>
> Another, it seems TcpSelectorHandler(true) is the same as
> TcpSelectorHandler() in this client example.
>
> Not sure it's the same. Depending on that TCPSelectorHandler will pass
> different initialization steps.
>
> Thanks.
>
> WBR,
> Alexey.
>
>
>
> Thanks!
>
> 2008/5/28, JianXing Yi <jianxing.yi_at_gmail.com>:
>
> Many thanks for your help!
>
>
> 2008/5/27, Oleksiy Stashok <Oleksiy.Stashok_at_sun.com>:
>
> Hi,
>
>
> try to call handler.readFromAsyncQueue() before writing anything...
>
> like following...
>
>
> @Override
>
> public void onWrite(IOEvent<Context> ioEvent) {
>
> System.out.println("onWrite()");
>
> try {
>
>
> connectorHandler.readFromAsyncQueue(......);
>
>
> connectorHandler.writeToAsyncQueue(ByteBuffer
>
> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>
> .getBytes()), asyncWriteHandler);
>
> // ioEvent.attachment().getSelectionKey().interestOps(
>
> // SelectionKey.OP_READ);
>
> } catch (IOException e) {
>
> // TODO Auto-generated catch block
>
> e.printStackTrace();
>
> }
>
> }
>
>
> The trick with async read/write queues is that you can use it without
>
> CallbackHandler's notifications. Framework will take care about
>
> sending and receiving data async. itself.
>
> After reading your mail, I've tried this but it seems not work. What a pity
>
> :-(.
>
> But the way your mentioned is exactly what I want. I've found some API
>
> performing registration IO EVENT HANDLERs to Grizzly framework but failed.
>
> It seems that Grizzly does not work in a direct EVENT-HANDLER way... Am I
>
> misunderstanding?
>
> If working in the conbination way you mentioned, then SelectionKey(s)
> should
>
> be properly registered in right place. I do this as below:
>
> 1. ConnectorHandler.connect()
>
> 2. ConnectorCallbackHandler.onConnect(): register OP_WRITE to trigger
>
> ConnectorCallbackHandler.onWrite()
>
> 3. ConnectorCallbackHandler.onWrite()
>
> *firstly write the data in async way
>
> *register OP_READ to trigger ConnectorCallbackHandler.onRead() in
>
> AsyncWriteCallbackHandler.onWriteCompleted() method.
>
> 4. ConnectorCallbackHandler.onRead():
>
> *read the data in async way
>
> *get the complete data in AsyncReadCallbackHandler.onReadCompleted()
>
> AM I IN THE RIGHT WAY? OR ANY OTHER MISUNDERSTANDINGS?
>
>
> Thanks again!
>
>
> So once ConnectorHandler is connected - you can start to use async.
>
> read/write queues, don't wait for CallbackHandler.onRead/Write
>
> notification calls.
>
> But for sure it's possible to combine that as you do.
>
>
> Thanks.
>
>
> WBR,
>
> Alexey.
>
>
> On May 27, 2008, at 10:47 , JianXing Yi wrote:
>
>
> Hi there,
>
>
> Undoutly, Grizzly is a good NIO framework. For practice purpose, I'm
>
> writing a NIO client sending a HTTP request and receiving its response
>
> based on Grizzly.
>
>
> 1. Performing connecting to server in the ConnectorHandler's
>
> onConnect() method, at the same time, for writing HTTP GET request
>
> purpose, the OP_WRITE interest registered in this method. I think this
>
> registration will be resulted in triggering the ConnectorHandler's
>
> onWrite() method.
>
> 2. Performing writing in the ConnectorHandler's onWrite() method,
>
> through the asynchronous way like this:
>
> @Override
>
> public void onWrite(IOEvent<Context> ioEvent) {
>
> System.out.println("onWrite()");
>
> try {
>
> connectorHandler.writeToAsyncQueue(ByteBuffer
>
> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>
> .getBytes()), asyncWriteHandler);
>
> } catch (IOException e) {
>
> // TODO Auto-generated catch block
>
> e.printStackTrace();
>
> }
>
> }
>
> 3. Registering OP_READ interest in the AsyncWriteCallbackHandler's
>
> onWriteCompleted() method. By doing this, I think the
>
> ConnectorHandler's onRead() method will be triggered, so I can read
>
> the HTTP response. If reading the response in blocking way, then there
>
> is no problems.
>
> MY PROBLEM IS: IF I USE ConnectorHandler(in fact a
>
> TcpConnectorHandler).readFromAsyncQueue() METHOD TO READ THE RESPONSE,
>
> THEN I CANNOT GET THE RESPONSE CORRECTLY. WHAT'S MORE, THE
>
> AsyncReadCallbackHandler's onReadCompleted() METHOD NEVER BEING
>
> CALLED!
>
>
> Below is the testing client code. Thanks for your help!
>
>
>
> package tests.grizzly;
>
>
> import java.io.IOException;
>
> import java.net.InetAddress;
>
> import java.net.InetSocketAddress;
>
> import java.net.SocketAddress;
>
> import java.nio.ByteBuffer;
>
> import java.nio.channels.SelectionKey;
>
> import java.nio.charset.Charset;
>
> import java.util.Queue;
>
> import java.util.concurrent.CountDownLatch;
>
>
> import com.sun.grizzly.CallbackHandler;
>
> import com.sun.grizzly.Context;
>
> import com.sun.grizzly.Controller;
>
> import com.sun.grizzly.ControllerStateListenerAdapter;
>
> import com.sun.grizzly.DefaultProtocolChain;
>
> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
>
> import com.sun.grizzly.IOEvent;
>
> import com.sun.grizzly.ProtocolChain;
>
> import com.sun.grizzly.ProtocolFilter;
>
> import com.sun.grizzly.TCPConnectorHandler;
>
> import com.sun.grizzly.TCPSelectorHandler;
>
> import com.sun.grizzly.Controller.Protocol;
>
> import com.sun.grizzly.async.AsyncReadCallbackHandler;
>
> import com.sun.grizzly.async.AsyncReadQueueRecord;
>
> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>
> import com.sun.grizzly.async.AsyncWriteQueueRecord;
>
> import com.sun.grizzly.util.WorkerThread;
>
>
> public class TestNonblockingClient10 {
>
>
> public class MyProtocolFilter implements ProtocolFilter {
>
>
> @Override
>
> public boolean execute(Context ctx) throws IOException {
>
> System.out.println("execute()");
>
> ByteBuffer byteBuffer = ((WorkerThread) Thread.currentThread())
>
> .getByteBuffer();
>
> System.out.println(Charset.defaultCharset().decode(byteBuffer));
>
> return true;
>
> }
>
>
> @Override
>
> public boolean postExecute(Context ctx) throws IOException {
>
> System.out.println("postExecute()");
>
> return true;
>
> }
>
>
> }
>
>
> /**
>
> * @param args
>
> * @throws IOException
>
> * @throws InterruptedException
>
> */
>
> public static void main(String[] args) throws InterruptedException,
>
> IOException {
>
> TestNonblockingClient10 client = new TestNonblockingClient10();
>
> client.start();
>
> }
>
>
> private Controller controller;
>
> private TCPConnectorHandler connectorHandler;
>
> protected CountDownLatch controllerReadyLatch = new
>
> CountDownLatch(1);
>
> ByteBuffer readBuffer = ByteBuffer.allocateDirect(10240);
>
> CallbackHandler<Context> connectorCallbackHandler = new
>
> CallbackHandler<Context>() {
>
>
> @Override
>
> public void onConnect(IOEvent<Context> ioEvent) {
>
> try {
>
> connectorHandler.finishConnect(ioEvent.attachment()
>
> .getSelectionKey());
>
> System.out.println("connected.");
>
> // trigger the ConnectorHandler's onWrite() method.
>
> ioEvent.attachment().getSelectionKey().interestOps(
>
> SelectionKey.OP_WRITE);
>
> } catch (IOException e) {
>
> // TODO Auto-generated catch block
>
> e.printStackTrace();
>
> }
>
> }
>
>
> @Override
>
> public void onRead(IOEvent<Context> ioEvent) {
>
> System.out.println("onRead()");
>
> Context context = ioEvent.attachment();
>
> // context.getSelectionKey().interestOps(SelectionKey.OP_READ);
>
> // try {
>
> // context.getProtocolChain().execute(context);
>
> // } catch (Exception e) {
>
> // // TODO Auto-generated catch block
>
> // e.printStackTrace();
>
> // }
>
> try {
>
> connectorHandler.read(readBuffer, true);
>
> System.out.println(Charset.defaultCharset().decode(readBuffer));
>
> } catch (IOException e) {
>
> // TODO Auto-generated catch block
>
> e.printStackTrace();
>
> }
>
> }
>
>
> @Override
>
> public void onWrite(IOEvent<Context> ioEvent) {
>
> System.out.println("onWrite()");
>
> try {
>
> connectorHandler.writeToAsyncQueue(ByteBuffer
>
> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>
> .getBytes()), asyncWriteHandler);
>
> // ioEvent.attachment().getSelectionKey().interestOps(
>
> // SelectionKey.OP_READ);
>
> } catch (IOException e) {
>
> // TODO Auto-generated catch block
>
> e.printStackTrace();
>
> }
>
> }
>
> };
>
> protected AsyncReadCallbackHandler asyncReadHandler = new
>
> AsyncReadCallbackHandler() {
>
>
> @Override
>
> public void onIOException(IOException ioException, SelectionKey key,
>
> ByteBuffer buffer, Queue<AsyncReadQueueRecord> remainingQueue) {
>
> System.out.println("onIOException()");
>
> }
>
>
> @Override
>
> public void onReadCompleted(SelectionKey key, SocketAddress
>
> srcAddress,
>
> ByteBuffer buffer) {
>
> System.out.println("onReadCompleted()");
>
> System.out.println(Charset.defaultCharset().decode(buffer));
>
> }
>
> };
>
> protected AsyncWriteCallbackHandler asyncWriteHandler = new
>
> AsyncWriteCallbackHandler() {
>
>
> @Override
>
> public void onIOException(IOException ioException, SelectionKey key,
>
> ByteBuffer buffer, Queue<AsyncWriteQueueRecord> remainingQueue) {
>
> // TODO Auto-generated method stub
>
>
> }
>
>
> @Override
>
> public void onWriteCompleted(SelectionKey key, ByteBuffer buffer) {
>
> System.out.println("onWriteCompleted()");
>
> key.interestOps(SelectionKey.OP_READ);
>
> }
>
> };
>
>
> private void start() throws InterruptedException, IOException {
>
> controller = createController();
>
> new Thread(controller).start();
>
>
> controllerReadyLatch.await();
>
>
> connectorHandler = (TCPConnectorHandler) controller
>
> .acquireConnectorHandler(Protocol.TCP);
>
>
> // trigger the connector handler's onConnect() method.
>
> connectorHandler.connect(new InetSocketAddress(InetAddress
>
> .getLocalHost(), 8080), connectorCallbackHandler);
>
> }
>
>
> private Controller createController() {
>
> Controller tmpController = new Controller();
>
>
> TCPSelectorHandler tcpSelectorHandler = new
>
> TCPSelectorHandler(true);
>
> tmpController.setSelectorHandler(tcpSelectorHandler);
>
>
> DefaultProtocolChainInstanceHandler instanceHandler = new
>
> DefaultProtocolChainInstanceHandler() {
>
>
> private ProtocolChain protocolChain;
>
>
> @Override
>
> public ProtocolChain poll() {
>
> if (protocolChain == null) {
>
> protocolChain = new DefaultProtocolChain();
>
> }
>
> return protocolChain;
>
> }
>
> };
>
> // instanceHandler.poll().addFilter(new ReadFilter());
>
> // instanceHandler.poll().addFilter(new MyProtocolFilter());
>
>
> tmpController.setProtocolChainInstanceHandler(instanceHandler);
>
>
> tmpController.addStateListener(new
>
> ControllerStateListenerAdapter() {
>
> @Override
>
> public void onReady() {
>
> controllerReadyLatch.countDown();
>
> super.onReady();
>
> }
>
> });
>
> return tmpController;
>
> }
>
>
> }
>
>
> ---------------------------------------------------------------------
>
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
>
>
> ---------------------------------------------------------------------
>
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>
>
>