>> Complete Source Code for the Tutorial
Contributed and maintained by Wenbo
Zhu
August 2007 [Revision number: V0-1]
This publication is applicable to Grizzly 1.6.x releases.
This document provides a complete tutorial for understanding and
using the Grizzly Connection Caching framework. The example code also
demonstrates the general use of Grizzly and its underlying NIO
technologies.
For more information about Grizzly, see the Grizzly Home page on the
java.net web site.
Expected duration: 45 minutes
Before you proceed, make sure you review the requirements in this section.
This tutorial assumes that you have some basic knowledge of, or programming experience with, the following technologies.
Before you begin, you need to install the following software on your computer:
This tutorial will show step by step how to build a very
simple "messaging ORB" using the Grizzly framework on the server side
and the general NIO on the client side. The underlying design is
briefly discussed here first:
Server side
Client side
In the following, we will describe how the server side function
(morb) is implemented. The
server uses Grizzly framework for both receiving request messages (as
server) and sending response messages (as
client). As a result, both Grizzly server api and client api are
used, as well as their respective connection caching mechanisms.
public interface RequestHandler { ... } |
String handle(String request) throws Exception; |
public class ServerRequestController { ... } |
public ServerRequestController(String serverId) { ... } // serverId is an app. defined identifier for tracing purpose only |
Now we will describe the initialization open() and shutdown close() routines. The code is
annotated as following:
public void open(String addr) throws Exception { ... } |
this.localAddr =
resolveAddressParam(addr);
// defined with the Utils class this.grizzlyController = new Controller(); // the Grizzly controller, for both sending and receiving ConnectorHandlerPool cacheableHandlerPool = new CacheableConnectorHandlerPool( // enable connection caching for outbound messages (sender) grizzlyController, 5, 2, 2); grizzlyController.setConnectorHandlerPool(cacheableHandlerPool); SelectionKeyHandler cacheableKeyHandler = new CacheableSelectionKeyHandler( // enable connection caching for inbound messages (receiver) 5, 2); grizzlyController.setSelectionKeyHandler(cacheableKeyHandler); TCPSelectorHandler tcpHandler = new TCPSelectorHandler(); // typical setup for a Grizzly TCP endpoint final MessageDecoder filter = new MessageDecoder(); tcpHandler.setInet(localAddr.getAddress()); tcpHandler.setPort(localAddr.getPort()); grizzlyController .setProtocolChainInstanceHandler(new DefaultProtocolChainInstanceHandler() { public ProtocolChain poll() { ProtocolChain protocolChain = protocolChains.poll(); if (protocolChain == null) { protocolChain = new DefaultProtocolChain(); protocolChain.addFilter(new ReadFilter()); protocolChain.addFilter(filter); } return protocolChain; } }); grizzlyController.addSelectorHandler(tcpHandler); grizzlyController.start(); // Grizzly runs from here ... |
private class MessageDecoder implements ProtocolFilter {
... } |
public boolean execute(Context context)
throws IOException { final WorkerThread workerThread = ((WorkerThread) Thread .currentThread()); ByteBuffer buffer = workerThread.getByteBuffer(); buffer.flip(); if (buffer.hasRemaining()) { handleRequest(readBuffer(buffer), context); } buffer.clear(); return false; } public boolean postExecute(Context context) throws IOException { return true; } private void handleRequest(String msg, Context context) throws IOException { String msgBuffer = (String) context.getAttribute("msgBuffer"); // earlier partial content if (msgBuffer == null) { msgBuffer = ""; } msgBuffer = msgBuffer + msg; if (msgBuffer.indexOf(PAYLOAD_DELIMITER) != -1) { String[] messages = msgBuffer.split(PAYLOAD_DELIMITER); for (int i = 0; i < messages.length - 1; i++) { handleAtomicRequest(messages[i]); // a complete message body is received } String lastMessage = messages[messages.length - 1]; if (msgBuffer.endsWith(PAYLOAD_DELIMITER)) { handleAtomicRequest(lastMessage); msgBuffer = ""; } else { msgBuffer = lastMessage; } } context.setAttribute("msgBuffer", msgBuffer); // save the partial message for subsequent processing } private void handleAtomicRequest(String msg) { // message format ::= try { // <client id - inet address>"\n" int pos = msg.indexOf("\n"); // <serverObjectId - app. defined>"\n" String clientId = msg.substring(0, pos); // <message content - representing parameters><payload delimeter> String content = msg.substring(pos + 1); // <payload delimeter> ::= "###\n" pos = content.indexOf("\n"); String serverObjectId = content.substring(0, pos); String msgBody = content.substring(pos + 1); RequestHandler handler = handlers.get(serverObjectId); if (handler != null) { String rsp = handler.handle(msgBody); sendResponse(clientId, rsp); } } catch (Exception ex) { trace(ex); } } private void sendResponse(String clientId, String msg) { ... } |
The message format is quite dummy, but it
has the essential parts of client-server invocation semantics. The
receiver thread (as provided from Grizzly) will be also responsible for
sending out the response to the wire (asynchronously). ARP could also
be used here, to make it fancier.
Overall, processing the receiving message
is straightforward (with the use of Grizzly) .
The following code shows the detail of sendResponse. The more obvious use
of Grizzly connection caching actually happens here. As you can see
from the code, under the predefined cache pool limits, an outbound
connection (to a remote client) will be acquired from the connection
cache pool directly (if available). This saves the cost of generating a
new connection, which involves expensive native socket operation.
private void sendResponse(String clientId, String
msg) { ... } |
String rspMsg = serverId + "\n" + msg +
PAYLOAD_DELIMITER;
// the response message format InetSocketAddress clientIdAddr = resolveAddress(clientId); ConnectorHandler clientConnector = grizzlyController // Initiates new connection or reuses one from cache .acquireConnectorHandler(Controller.Protocol.TCP); try { while (true) { // intermittent - expected ?? try { clientConnector .connect(clientIdAddr, new BlockingCallbackHandler( clientConnector, rspMsg)); break; } catch (AlreadyConnectedException ex) { trace("partial failure: already connected"); } } try { // If limit is not reached - connection will be put back to the cache clientConnector.close(); } finally { grizzlyController.releaseConnectorHandler(clientConnector); } } catch (IOException ex) { trace(ex); } } |
The actual handling of sending the
response is again asynchronous, via the callback API (Grizzly).
private static class BlockingCallbackHandler implements
CallbackHandler<Context> { ... } |
private ConnectorHandler connector; private String msg; public BlockingCallbackHandler(ConnectorHandler connector, String msg) { this.connector = connector; this.msg = msg; } public void onConnect(IOEvent<Context> ioEvent) { SelectionKey key = ioEvent.attachment().getSelectionKey(); connector.finishConnect(key); connector.getController().registerKey(key, SelectionKey.OP_WRITE, Controller.Protocol.TCP); } public void onRead(IOEvent<Context> ioEvent) { } public void onWrite(IOEvent<Context> ioEvent) { try { connector.write(ByteBuffer.wrap(msg.getBytes()), true); // a blocking write that uses the Grizzly "sender" API } catch (IOException ex) { trace(ex); } } |
Due to the space limitation, we will not show the client code here.
In the following, the client-side design is briefly described. It is of
course possible to implement the client runtime (interacting with a
remote MORB) with Grizzly too, or any other implementation as long as
it aligns with the server morb protocol.
The provided client implementation uses the basic NIO and adopts the
following guidelines as recommended in the Grizzly document:
In this section, we will go through the server and client setup, and describe what's expected from the example testing program, and how to observe the connection caching behavior.
public class MessagingORB { ... } |
private ServerRequestController
serverController; public static void main(String[] args) throws Exception { if (args.length < 1) { trace("Usage: MessagingORB <local ip>:<local port>"); return; } MessagingORB morb = new MessagingORB(); morb.init(args[0]); morb.deploy(); morb.open(args[0]); morb.close(); } public void init(String localAddr) throws Exception { serverController = new ServerRequestController("server-" + localAddr); } public void open(String addr) throws Exception { serverController.open(addr); } public void close() { serverController.close(); } public void deploy() { // a simple server object instance - named as "server", which echoes each request serverController.register("server", new RequestHandler() { public String handle(String request) throws Exception { trace("invoke " + request); return "request handled: " + request; } }); } |
public class SimpleMORBClientApp { ... } |
private
ResponseDispatchChannel in; private Map<InetSocketAddress, RequestChannel> requestChannelCache = new HashMap<InetSocketAddress, RequestChannel>(); private List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>(); private Thread dispatchThread; public static void main(String[] args) throws Exception { if (args.length < 2) { trace("Usage: SimpleClientApp <local ip>:<local port> <server ip>:<server port> [<server ip>:<server port>]"); return; } SimpleMORBClientApp app = new SimpleMORBClientApp(); app.init(args[0]); for (int i = 1; i < args.length; i++) { app.register(resolveAddressParam(args[i])); } app.start(); app.doSomething(); app.end(); } public void init(String localAddr) throws Exception { in = new ResponseDispatchChannel(resolveAddressParam(localAddr)); in.open(); } public void register(final InetSocketAddress serverId) throws Exception { // a simple client response handler that prints out the request text servers.add(serverId); in.register(serverId, new ResponseHandler() { public void handle(String response) throws Exception { trace("response from " + serverId + ":" + response); } }); } public void start() { dispatchThread = new Thread(new Runnable() { public void run() { try { in.dispatch(); } catch (InterruptedException ex) { trace("dispatch thread interrupted"); } catch (Exception ex) { trace(ex); } } }); dispatchThread.start(); } public void end() { dispatchThread.interrupt(); try { dispatchThread.join(); } catch (InterruptedException ex) { } for (RequestChannel channel : requestChannelCache.values()) { channel.close(); } in.close(); } public void doSomething() { RequestChannel out; int i = 0; String msg; do { for (InetSocketAddress server : servers) { msg = in.getClientId().toString() + "[" + i++ + "]" + "any message"; // set the String length > 256 to test multiple read/write (buffer=1024b) out = requestChannelCache.get(server); try { if (out == null) { out = new RequestChannel(server); out.open(); } out.sendRequest("server", msg, in.getClientId()); } catch (Exception ex) { trace(ex); trace("retry: " + server); out.close(); out = new RequestChannel(server); try { out.open(); out.sendRequest("server", msg, in.getClientId()); } catch (Exception exx) { trace(exx); trace("failed: " + server); out.close(); continue; } } requestChannelCache.put(server, out); } } while (step("type c to continue") == 'c'); } |
With the above example test application, both the inbound connection
cache and outbound connection cache of the Grizzly framework are
demonstrated. Interested readers may try to adjust the # of client
instances and server instances, and use tools like "netstat -nt" to
monitor the open connections between clients and servers, and their
variations.
I hope you find this tutorial useful in understanding the Grizzly and
its connection cache framework. Comments to make the overall tutorial
simpler (i.e. shorter) or more
complicated (i.e. revealing more Grizzly) are more than welcome!