Hi there,
Undoutly, Grizzly is a good NIO framework. For practice purpose, I'm
writing a NIO client sending a HTTP request and receiving its response
based on Grizzly.
1. Performing connecting to server in the ConnectorHandler's
onConnect() method, at the same time, for writing HTTP GET request
purpose, the OP_WRITE interest registered in this method. I think this
registration will be resulted in triggering the ConnectorHandler's
onWrite() method.
2. Performing writing in the ConnectorHandler's onWrite() method,
through the asynchronous way like this:
@Override
public void onWrite(IOEvent<Context> ioEvent) {
System.out.println("onWrite()");
try {
connectorHandler.writeToAsyncQueue(ByteBuffer
.wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
.getBytes()), asyncWriteHandler);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
3. Registering OP_READ interest in the AsyncWriteCallbackHandler's
onWriteCompleted() method. By doing this, I think the
ConnectorHandler's onRead() method will be triggered, so I can read
the HTTP response. If reading the response in blocking way, then there
is no problems.
MY PROBLEM IS: IF I USE ConnectorHandler(in fact a
TcpConnectorHandler).readFromAsyncQueue() METHOD TO READ THE RESPONSE,
THEN I CANNOT GET THE RESPONSE CORRECTLY. WHAT'S MORE, THE
AsyncReadCallbackHandler's onReadCompleted() METHOD NEVER BEING
CALLED!
Below is the testing client code. Thanks for your help!
package tests.grizzly;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.charset.Charset;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import com.sun.grizzly.CallbackHandler;
import com.sun.grizzly.Context;
import com.sun.grizzly.Controller;
import com.sun.grizzly.ControllerStateListenerAdapter;
import com.sun.grizzly.DefaultProtocolChain;
import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
import com.sun.grizzly.IOEvent;
import com.sun.grizzly.ProtocolChain;
import com.sun.grizzly.ProtocolFilter;
import com.sun.grizzly.TCPConnectorHandler;
import com.sun.grizzly.TCPSelectorHandler;
import com.sun.grizzly.Controller.Protocol;
import com.sun.grizzly.async.AsyncReadCallbackHandler;
import com.sun.grizzly.async.AsyncReadQueueRecord;
import com.sun.grizzly.async.AsyncWriteCallbackHandler;
import com.sun.grizzly.async.AsyncWriteQueueRecord;
import com.sun.grizzly.util.WorkerThread;
public class TestNonblockingClient10 {
public class MyProtocolFilter implements ProtocolFilter {
@Override
public boolean execute(Context ctx) throws IOException {
System.out.println("execute()");
ByteBuffer byteBuffer = ((WorkerThread) Thread.currentThread())
.getByteBuffer();
System.out.println(Charset.defaultCharset().decode(byteBuffer));
return true;
}
@Override
public boolean postExecute(Context ctx) throws IOException {
System.out.println("postExecute()");
return true;
}
}
/**
* @param args
* @throws IOException
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException,
IOException {
TestNonblockingClient10 client = new TestNonblockingClient10();
client.start();
}
private Controller controller;
private TCPConnectorHandler connectorHandler;
protected CountDownLatch controllerReadyLatch = new CountDownLatch(1);
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10240);
CallbackHandler<Context> connectorCallbackHandler = new
CallbackHandler<Context>() {
@Override
public void onConnect(IOEvent<Context> ioEvent) {
try {
connectorHandler.finishConnect(ioEvent.attachment()
.getSelectionKey());
System.out.println("connected.");
// trigger the ConnectorHandler's onWrite() method.
ioEvent.attachment().getSelectionKey().interestOps(
SelectionKey.OP_WRITE);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void onRead(IOEvent<Context> ioEvent) {
System.out.println("onRead()");
Context context = ioEvent.attachment();
// context.getSelectionKey().interestOps(SelectionKey.OP_READ);
// try {
// context.getProtocolChain().execute(context);
// } catch (Exception e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
try {
connectorHandler.read(readBuffer, true);
System.out.println(Charset.defaultCharset().decode(readBuffer));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void onWrite(IOEvent<Context> ioEvent) {
System.out.println("onWrite()");
try {
connectorHandler.writeToAsyncQueue(ByteBuffer
.wrap("GET /\r\nHTTP 1.0/1.1\r\nHOST:127.0.0.1\r\n\r\n"
.getBytes()), asyncWriteHandler);
// ioEvent.attachment().getSelectionKey().interestOps(
// SelectionKey.OP_READ);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
protected AsyncReadCallbackHandler asyncReadHandler = new
AsyncReadCallbackHandler() {
@Override
public void onIOException(IOException ioException, SelectionKey key,
ByteBuffer buffer, Queue<AsyncReadQueueRecord> remainingQueue) {
System.out.println("onIOException()");
}
@Override
public void onReadCompleted(SelectionKey key, SocketAddress srcAddress,
ByteBuffer buffer) {
System.out.println("onReadCompleted()");
System.out.println(Charset.defaultCharset().decode(buffer));
}
};
protected AsyncWriteCallbackHandler asyncWriteHandler = new
AsyncWriteCallbackHandler() {
@Override
public void onIOException(IOException ioException, SelectionKey key,
ByteBuffer buffer, Queue<AsyncWriteQueueRecord> remainingQueue) {
// TODO Auto-generated method stub
}
@Override
public void onWriteCompleted(SelectionKey key, ByteBuffer buffer) {
System.out.println("onWriteCompleted()");
key.interestOps(SelectionKey.OP_READ);
}
};
private void start() throws InterruptedException, IOException {
controller = createController();
new Thread(controller).start();
controllerReadyLatch.await();
connectorHandler = (TCPConnectorHandler) controller
.acquireConnectorHandler(Protocol.TCP);
// trigger the connector handler's onConnect() method.
connectorHandler.connect(new InetSocketAddress(InetAddress
.getLocalHost(), 8080), connectorCallbackHandler);
}
private Controller createController() {
Controller tmpController = new Controller();
TCPSelectorHandler tcpSelectorHandler = new TCPSelectorHandler(true);
tmpController.setSelectorHandler(tcpSelectorHandler);
DefaultProtocolChainInstanceHandler instanceHandler = new
DefaultProtocolChainInstanceHandler() {
private ProtocolChain protocolChain;
@Override
public ProtocolChain poll() {
if (protocolChain == null) {
protocolChain = new DefaultProtocolChain();
}
return protocolChain;
}
};
// instanceHandler.poll().addFilter(new ReadFilter());
// instanceHandler.poll().addFilter(new MyProtocolFilter());
tmpController.setProtocolChainInstanceHandler(instanceHandler);
tmpController.addStateListener(new ControllerStateListenerAdapter() {
@Override
public void onReady() {
controllerReadyLatch.countDown();
super.onReady();
}
});
return tmpController;
}
}