users@grizzly.java.net

Re: How to implement the client's message flow based on async read/write?

From: JianXing Yi <jianxing.yi_at_gmail.com>
Date: Wed, 28 May 2008 17:27:04 +0800

Hi Alexey,

Thanks for your help!

So once ConnectorHandler is connected - you can start to use async.
read/write queues, don't wait for CallbackHandler.onRead/Write
notification calls. But for sure it's possible to combine that as you
do.
-------------------------------------------
It's the truth.
After I revise the size of the readBuffer from 10240 to 1024, the
async. read/write operations work as expected. So, what the general
async. buffer rules? I think I could investigate AsyncReadCondition.
Any suggestion?

Another, it seems TcpSelectorHandler(true) is the same as
TcpSelectorHandler() in this client example.

Thanks!

2008/5/28, JianXing Yi <jianxing.yi_at_gmail.com>:
> Many thanks for your help!
>
> 2008/5/27, Oleksiy Stashok <Oleksiy.Stashok_at_sun.com>:
>> Hi,
>>
>> try to call handler.readFromAsyncQueue() before writing anything...
>> like following...
>>
>>> @Override
>>> public void onWrite(IOEvent<Context> ioEvent) {
>>> System.out.println("onWrite()");
>>> try {
>>
>> connectorHandler.readFromAsyncQueue(......);
>>>
>>> connectorHandler.writeToAsyncQueue(ByteBuffer
>>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>>> .getBytes()), asyncWriteHandler);
>>> // ioEvent.attachment().getSelectionKey().interestOps(
>>> // SelectionKey.OP_READ);
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>
>> The trick with async read/write queues is that you can use it without
>> CallbackHandler's notifications. Framework will take care about
>> sending and receiving data async. itself.
> After reading your mail, I've tried this but it seems not work. What a pity
> :-(.
> But the way your mentioned is exactly what I want. I've found some API
> performing registration IO EVENT HANDLERs to Grizzly framework but failed.
> It seems that Grizzly does not work in a direct EVENT-HANDLER way... Am I
> misunderstanding?
> If working in the conbination way you mentioned, then SelectionKey(s) should
> be properly registered in right place. I do this as below:
> 1. ConnectorHandler.connect()
> 2. ConnectorCallbackHandler.onConnect(): register OP_WRITE to trigger
> ConnectorCallbackHandler.onWrite()
> 3. ConnectorCallbackHandler.onWrite()
> *firstly write the data in async way
> *register OP_READ to trigger ConnectorCallbackHandler.onRead() in
> AsyncWriteCallbackHandler.onWriteCompleted() method.
> 4. ConnectorCallbackHandler.onRead():
> *read the data in async way
> *get the complete data in AsyncReadCallbackHandler.onReadCompleted()
> AM I IN THE RIGHT WAY? OR ANY OTHER MISUNDERSTANDINGS?
>
> Thanks again!
>
>> So once ConnectorHandler is connected - you can start to use async.
>> read/write queues, don't wait for CallbackHandler.onRead/Write
>> notification calls.
>> But for sure it's possible to combine that as you do.
>>
>> Thanks.
>>
>> WBR,
>> Alexey.
>>
>> On May 27, 2008, at 10:47 , JianXing Yi wrote:
>>
>>> Hi there,
>>>
>>> Undoutly, Grizzly is a good NIO framework. For practice purpose, I'm
>>> writing a NIO client sending a HTTP request and receiving its response
>>> based on Grizzly.
>>>
>>> 1. Performing connecting to server in the ConnectorHandler's
>>> onConnect() method, at the same time, for writing HTTP GET request
>>> purpose, the OP_WRITE interest registered in this method. I think this
>>> registration will be resulted in triggering the ConnectorHandler's
>>> onWrite() method.
>>> 2. Performing writing in the ConnectorHandler's onWrite() method,
>>> through the asynchronous way like this:
>>> @Override
>>> public void onWrite(IOEvent<Context> ioEvent) {
>>> System.out.println("onWrite()");
>>> try {
>>> connectorHandler.writeToAsyncQueue(ByteBuffer
>>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>>> .getBytes()), asyncWriteHandler);
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>> 3. Registering OP_READ interest in the AsyncWriteCallbackHandler's
>>> onWriteCompleted() method. By doing this, I think the
>>> ConnectorHandler's onRead() method will be triggered, so I can read
>>> the HTTP response. If reading the response in blocking way, then there
>>> is no problems.
>>> MY PROBLEM IS: IF I USE ConnectorHandler(in fact a
>>> TcpConnectorHandler).readFromAsyncQueue() METHOD TO READ THE RESPONSE,
>>> THEN I CANNOT GET THE RESPONSE CORRECTLY. WHAT'S MORE, THE
>>> AsyncReadCallbackHandler's onReadCompleted() METHOD NEVER BEING
>>> CALLED!
>>>
>>> Below is the testing client code. Thanks for your help!
>>>
>>>
>>> package tests.grizzly;
>>>
>>> import java.io.IOException;
>>> import java.net.InetAddress;
>>> import java.net.InetSocketAddress;
>>> import java.net.SocketAddress;
>>> import java.nio.ByteBuffer;
>>> import java.nio.channels.SelectionKey;
>>> import java.nio.charset.Charset;
>>> import java.util.Queue;
>>> import java.util.concurrent.CountDownLatch;
>>>
>>> import com.sun.grizzly.CallbackHandler;
>>> import com.sun.grizzly.Context;
>>> import com.sun.grizzly.Controller;
>>> import com.sun.grizzly.ControllerStateListenerAdapter;
>>> import com.sun.grizzly.DefaultProtocolChain;
>>> import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
>>> import com.sun.grizzly.IOEvent;
>>> import com.sun.grizzly.ProtocolChain;
>>> import com.sun.grizzly.ProtocolFilter;
>>> import com.sun.grizzly.TCPConnectorHandler;
>>> import com.sun.grizzly.TCPSelectorHandler;
>>> import com.sun.grizzly.Controller.Protocol;
>>> import com.sun.grizzly.async.AsyncReadCallbackHandler;
>>> import com.sun.grizzly.async.AsyncReadQueueRecord;
>>> import com.sun.grizzly.async.AsyncWriteCallbackHandler;
>>> import com.sun.grizzly.async.AsyncWriteQueueRecord;
>>> import com.sun.grizzly.util.WorkerThread;
>>>
>>> public class TestNonblockingClient10 {
>>>
>>> public class MyProtocolFilter implements ProtocolFilter {
>>>
>>> @Override
>>> public boolean execute(Context ctx) throws IOException {
>>> System.out.println("execute()");
>>> ByteBuffer byteBuffer = ((WorkerThread) Thread.currentThread())
>>> .getByteBuffer();
>>> System.out.println(Charset.defaultCharset().decode(byteBuffer));
>>> return true;
>>> }
>>>
>>> @Override
>>> public boolean postExecute(Context ctx) throws IOException {
>>> System.out.println("postExecute()");
>>> return true;
>>> }
>>>
>>> }
>>>
>>> /**
>>> * @param args
>>> * @throws IOException
>>> * @throws InterruptedException
>>> */
>>> public static void main(String[] args) throws InterruptedException,
>>> IOException {
>>> TestNonblockingClient10 client = new TestNonblockingClient10();
>>> client.start();
>>> }
>>>
>>> private Controller controller;
>>> private TCPConnectorHandler connectorHandler;
>>> protected CountDownLatch controllerReadyLatch = new
>>> CountDownLatch(1);
>>> ByteBuffer readBuffer = ByteBuffer.allocateDirect(10240);
>>> CallbackHandler<Context> connectorCallbackHandler = new
>>> CallbackHandler<Context>() {
>>>
>>> @Override
>>> public void onConnect(IOEvent<Context> ioEvent) {
>>> try {
>>> connectorHandler.finishConnect(ioEvent.attachment()
>>> .getSelectionKey());
>>> System.out.println("connected.");
>>> // trigger the ConnectorHandler's onWrite() method.
>>> ioEvent.attachment().getSelectionKey().interestOps(
>>> SelectionKey.OP_WRITE);
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> @Override
>>> public void onRead(IOEvent<Context> ioEvent) {
>>> System.out.println("onRead()");
>>> Context context = ioEvent.attachment();
>>> // context.getSelectionKey().interestOps(SelectionKey.OP_READ);
>>> // try {
>>> // context.getProtocolChain().execute(context);
>>> // } catch (Exception e) {
>>> // // TODO Auto-generated catch block
>>> // e.printStackTrace();
>>> // }
>>> try {
>>> connectorHandler.read(readBuffer, true);
>>> System.out.println(Charset.defaultCharset().decode(readBuffer));
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> @Override
>>> public void onWrite(IOEvent<Context> ioEvent) {
>>> System.out.println("onWrite()");
>>> try {
>>> connectorHandler.writeToAsyncQueue(ByteBuffer
>>> .wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
>>> .getBytes()), asyncWriteHandler);
>>> // ioEvent.attachment().getSelectionKey().interestOps(
>>> // SelectionKey.OP_READ);
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>> };
>>> protected AsyncReadCallbackHandler asyncReadHandler = new
>>> AsyncReadCallbackHandler() {
>>>
>>> @Override
>>> public void onIOException(IOException ioException, SelectionKey key,
>>> ByteBuffer buffer, Queue<AsyncReadQueueRecord> remainingQueue) {
>>> System.out.println("onIOException()");
>>> }
>>>
>>> @Override
>>> public void onReadCompleted(SelectionKey key, SocketAddress
>>> srcAddress,
>>> ByteBuffer buffer) {
>>> System.out.println("onReadCompleted()");
>>> System.out.println(Charset.defaultCharset().decode(buffer));
>>> }
>>> };
>>> protected AsyncWriteCallbackHandler asyncWriteHandler = new
>>> AsyncWriteCallbackHandler() {
>>>
>>> @Override
>>> public void onIOException(IOException ioException, SelectionKey key,
>>> ByteBuffer buffer, Queue<AsyncWriteQueueRecord> remainingQueue) {
>>> // TODO Auto-generated method stub
>>>
>>> }
>>>
>>> @Override
>>> public void onWriteCompleted(SelectionKey key, ByteBuffer buffer) {
>>> System.out.println("onWriteCompleted()");
>>> key.interestOps(SelectionKey.OP_READ);
>>> }
>>> };
>>>
>>> private void start() throws InterruptedException, IOException {
>>> controller = createController();
>>> new Thread(controller).start();
>>>
>>> controllerReadyLatch.await();
>>>
>>> connectorHandler = (TCPConnectorHandler) controller
>>> .acquireConnectorHandler(Protocol.TCP);
>>>
>>> // trigger the connector handler's onConnect() method.
>>> connectorHandler.connect(new InetSocketAddress(InetAddress
>>> .getLocalHost(), 8080), connectorCallbackHandler);
>>> }
>>>
>>> private Controller createController() {
>>> Controller tmpController = new Controller();
>>>
>>> TCPSelectorHandler tcpSelectorHandler = new
>>> TCPSelectorHandler(true);
>>> tmpController.setSelectorHandler(tcpSelectorHandler);
>>>
>>> DefaultProtocolChainInstanceHandler instanceHandler = new
>>> DefaultProtocolChainInstanceHandler() {
>>>
>>> private ProtocolChain protocolChain;
>>>
>>> @Override
>>> public ProtocolChain poll() {
>>> if (protocolChain == null) {
>>> protocolChain = new DefaultProtocolChain();
>>> }
>>> return protocolChain;
>>> }
>>> };
>>> // instanceHandler.poll().addFilter(new ReadFilter());
>>> // instanceHandler.poll().addFilter(new MyProtocolFilter());
>>>
>>> tmpController.setProtocolChainInstanceHandler(instanceHandler);
>>>
>>> tmpController.addStateListener(new
>>> ControllerStateListenerAdapter() {
>>> @Override
>>> public void onReady() {
>>> controllerReadyLatch.countDown();
>>> super.onReady();
>>> }
>>> });
>>> return tmpController;
>>> }
>>>
>>> }
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: users-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: users-help_at_grizzly.dev.java.net
>>
>>
>