package client; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import com.sun.grizzly.CallbackHandler; import com.sun.grizzly.ConnectorHandler; import com.sun.grizzly.Context; import com.sun.grizzly.Controller; import com.sun.grizzly.ControllerStateListener; import com.sun.grizzly.DefaultProtocolChain; import com.sun.grizzly.IOEvent; import com.sun.grizzly.Pipeline; import com.sun.grizzly.ProtocolChain; import com.sun.grizzly.ProtocolChainInstanceHandler; import com.sun.grizzly.TCPSelectorHandler; import com.sun.grizzly.UDPSelectorHandler; import com.sun.grizzly.filter.EchoFilter; import com.sun.grizzly.filter.ReadFilter; public class MyNIOClient { private static final Logger LOG = Logger.getLogger(MyNIOClient.class); private Controller grizzlyController; private Pipeline selectorPipeline; private CountDownLatch controllerStartedLatch; public void setGrizzlyController(Controller grizzlyController) { this.grizzlyController = grizzlyController; } public void setSelectorPipeline(Pipeline selectorPipeline) { this.selectorPipeline = selectorPipeline; } /* * selector and connector must both be of the same transport protocol, * either TCP or UDP * * Note: TCPSelectorHandler is the superclass of UDPSelectorHandler */ private TCPSelectorHandler selectorHandler; private ConnectorHandler connectorHandler; /** * The default constructor is used when private member initialization is * done using dependency injection or by explicitly calling the setter * methods for the private members. */ public MyNIOClient() { } public MyNIOClient(Controller grizzlyController) { this(grizzlyController, null); } public MyNIOClient(Controller grizzlyController, Pipeline selectorPipeline) { this.grizzlyController = grizzlyController; this.selectorPipeline = selectorPipeline; } /** * Initialize the grizzly controller, start the controller and initialize * the connectorHandler according to the transport protocol, either TCP or * UDP. * * @param transportProtocol */ public void init(Controller.Protocol transportProtocol) { if(transportProtocol == Controller.Protocol.TCP) { selectorHandler = new TCPSelectorHandler(true); } else if(transportProtocol == Controller.Protocol.UDP) { selectorHandler = new UDPSelectorHandler(true); } /* * Set the pipeline to use with the selectorHandler if declared. If not, * it will use the one from the grizzly Controller by default */ if(selectorPipeline != null) { selectorHandler.setPipeline(selectorPipeline); } /* * Setting up the protocolChainInstanceHandler for the selectorHandler */ ProtocolChainInstanceHandler pciHandler = new ClientProtocolChainInstanceHandler(); ProtocolChain protocolChain = pciHandler.poll(); /* * We don't need to add to the protocolChain a ReadFilter because a * ParserProtocolFilter extends a ReadFilter * * */ protocolChain.addFilter(new ReadFilter()); protocolChain.addFilter(new EchoFilter()); selectorHandler.setProtocolChainInstanceHandler(pciHandler); grizzlyController.addSelectorHandler(selectorHandler); /* * Start the grizzly controller */ controllerStartedLatch = new CountDownLatch(1); ControllerStateListener controllerStateListener = new ClientControllerStateListenerAdapter(controllerStartedLatch); grizzlyController.addStateListener(controllerStateListener); new Thread(grizzlyController).start(); try { controllerStartedLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } /* * Initialize the connector */ connectorHandler = selectorHandler.acquireConnectorHandler(); connectorHandler.setController(grizzlyController); } /** * Establishes a TCP/IP Connection to a remoteAddress from the localHost * ethernet interface * * @param remoteAddress * @throws IOException * @throws InterruptedException */ public void connect(SocketAddress remoteAddress) throws IOException { connectorHandler.connect(remoteAddress, new ClientCallBackHandler(remoteAddress, null), this.selectorHandler); } /** * Establishes a TCP/IP Connection to a remoteAddress from a user defined * ethernet interface specified by a localAddress * * @param remoteAddress * @param localAddress * @throws IOException * @throws InterruptedException */ public void connect(SocketAddress remoteAddress, SocketAddress localAddress) throws IOException { connectorHandler.connect(remoteAddress, localAddress, new ClientCallBackHandler(remoteAddress, localAddress), this.selectorHandler); } public void send(ByteBuffer outputByteBuffer) throws IOException { /* * Make a blocking write operation */ long byteWritten = connectorHandler.write(outputByteBuffer, false); System.err.println("Writing "+byteWritten+" bytes"); outputByteBuffer.clear(); } /* * PRIVATE INNER CLASS SECTION */ private class ClientControllerStateListenerAdapter implements ControllerStateListener { private CountDownLatch controllerStartedLatch; private long controllerStartTime; private long controllerStopTime; public ClientControllerStateListenerAdapter(CountDownLatch controllerStartedLatch) { this.controllerStartedLatch = controllerStartedLatch; } public void onException(Throwable e) { e.printStackTrace(); } public void onReady() { controllerStartedLatch.countDown(); if(LOG.isTraceEnabled()) { LOG.trace("Client GrizzlyController Ready"); } } public void onStarted() { controllerStartTime = System.currentTimeMillis(); if(LOG.isTraceEnabled()) { LOG.trace("Client GrizzlyController started at: " + controllerStartTime); } } public void onStopped() { controllerStopTime = System.currentTimeMillis(); if(LOG.isTraceEnabled()) { LOG.trace("Client GrizzlyController stopped at: " + controllerStopTime); LOG.trace("Client GrizzlyController was up during: " + (controllerStopTime - controllerStartTime)); } } } private class ClientProtocolChainInstanceHandler implements ProtocolChainInstanceHandler { private final ProtocolChain protocolChain = new DefaultProtocolChain(); public boolean offer(ProtocolChain instance) { System.err.println("offer"); return true; } public ProtocolChain poll() { System.err.println("poll"); return protocolChain; } } private class ClientCallBackHandler implements CallbackHandler { /* * This constructor is added so we can pass the context of the * connection to allow the callbackHandler to log the connection context */ private SocketAddress remoteAddress; private SocketAddress localAddress; public ClientCallBackHandler(SocketAddress remoteAddress, SocketAddress localAddress) { this.remoteAddress = remoteAddress; this.localAddress = localAddress; } public void onConnect(IOEvent ioEvent) { SelectionKey k = ioEvent.attachment().getSelectionKey(); try { connectorHandler.finishConnect(k); if(LOG.isTraceEnabled()) { String localAddressStr = "LOCALHOST"; if(localAddress != null) { localAddressStr = localAddress.toString(); } LOG.trace("connection established from " + localAddressStr + " to " + remoteAddress.toString()); } } catch (IOException e) { e.printStackTrace(); } ioEvent.attachment().getSelectorHandler().register(k, SelectionKey.OP_READ); } public void onRead(IOEvent ioEvent) { if(LOG.isTraceEnabled()) { LOG.trace("Packet received from " + remoteAddress.toString()); } } public void onWrite(IOEvent ioEvent) { if(LOG.isTraceEnabled()) { LOG.trace("Packet sent to " + remoteAddress.toString()); } } } }