import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.sun.grizzly.BaseSelectionKeyHandler; import com.sun.grizzly.CallbackHandler; import com.sun.grizzly.ConnectorHandler; import com.sun.grizzly.Controller; import com.sun.grizzly.ControllerStateListener; import com.sun.grizzly.DefaultProtocolChain; import com.sun.grizzly.ProtocolChain; import com.sun.grizzly.ProtocolChainInstanceHandler; import com.sun.grizzly.TCPSelectorHandler; import com.sun.grizzly.connectioncache.client.CacheableConnectorHandler; import com.sun.grizzly.filter.ReadFilter; import com.sun.grizzly.util.ByteBufferFactory; import com.sun.grizzly.util.ConnectionCloseHandler; import com.sun.grizzly.util.WorkerThreadFactory; import com.sun.grizzly.util.ByteBufferFactory.ByteBufferType; /** * TcpClient is the base class for TCP communication. Starts the * grizzly framework and does the actual send/receive * */ public class TcpGrizzlyClient implements TcpClientInterface { private static Log log = LogFactory.getLog(TcpGrizzlyClient.class); private final static int READBUFFER_SIZE = 5000; private final static int WRITEBUFFER_SIZE = 0; final private ByteBuffer readBB = ByteBufferFactory.allocate(ByteBufferType.HEAP_VIEW, READBUFFER_SIZE); final private ByteBuffer writeBB = ByteBufferFactory.allocate(ByteBufferType.HEAP_VIEW, WRITEBUFFER_SIZE); /** * Hostname. Used when sending */ private String hostname; /** * Port. Used when sending */ private int port; private Controller controller; /** * The Grizzly controller to use * * @param controller */ public void setController(Controller controller) { this.controller = controller; } /** * Returns the grizzly controller in use * * @return A grizzly controller */ public Controller getController() { return controller; } /** * The Grizzly controller */ private static Boolean grizzlyInitialized = false; /** * grizzlyUpstartPhase */ protected static Boolean grizzlyUpstartPhase = false; private IncomingMessageHandler handler; /** * Creates callBackHandlers */ protected CallbackHandlerFactory callbackFactory; /** * grizzlyStartLatch Used for waiting for grizzly * framework to start */ protected final CountDownLatch grizzlyStartLatch; /** * Creates a new instance of TcpClient, Grizzly based * * @param hostname * @param port */ public TcpGrizzlyClient(String hostname, int port) { this.hostname = hostname; this.port = port; grizzlyStartLatch = new CountDownLatch(1); } protected synchronized Boolean initGrizzly(Controller controller) { synchronized (grizzlyInitialized) { if (grizzlyInitialized == true) { return true; } log.warn("Initializing Grizzly on: " + hostname + ":" + port); grizzlyInitialized = true; } ProtocolChain protocolChain; TCPSelectorHandler tcpSelector = new TCPSelectorHandler(true); BaseSelectionKeyHandler selectionKeyHandler = new BaseSelectionKeyHandler(); // to be notified when a client/server close the connection selectionKeyHandler.setConnectionCloseHandler(new ConnectionCloseHandler() { public void locallyClosed(SelectionKey key) { if (log.isDebugEnabled()) { log.debug(key + " is closed from local"); } } public void remotlyClosed(SelectionKey key) { if (log.isDebugEnabled()) { log.debug(key + " is closed from remote"); } } }); tcpSelector.setSelectionKeyHandler(selectionKeyHandler); tcpSelector.setReuseAddress(true); tcpSelector.setLinger(0); controller.addSelectorHandler(tcpSelector); controller.addStateListener(new ControllerStateListener() { public void onReady() { log.info("Grizzly ready"); grizzlyStartLatch.countDown(); } public void onException(Throwable arg0) { throw new NttException("Exception in Grizzly framework " + arg0.getMessage()); } public void onStarted() { log.debug("Grizzly started"); } public void onStopped() { log.debug("Grizzly stopped"); } }); ProtocolChainInstanceHandler pciHandler = new ProtocolChainInstanceHandler() { final private ProtocolChain pc = new DefaultProtocolChain(); public ProtocolChain poll() { return pc; } public boolean offer(@SuppressWarnings("unused") ProtocolChain arg0) { return false; } }; controller.setProtocolChainInstanceHandler(pciHandler); final BlockingQueue workQueue = new ArrayBlockingQueue(2000); ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1500, 2000, 100, TimeUnit.MILLISECONDS, workQueue, new WorkerThreadFactory()); controller.setThreadPool(threadPool); protocolChain = pciHandler.poll(); protocolChain.addFilter(new ReadFilter()); return true; } /** * sends byte array * * @param message * @throws IOException */ public void send(byte[] message) throws IOException { send(false, message); } /** * Sends byte-array on optional multihomed (round-robin on local * addresses) socket * * @param multiHomed * @param message * @throws IOException */ @SuppressWarnings("unchecked") public void send(boolean multiHomed, byte[] message) throws IOException { if (log.isInfoEnabled()) { log.info("Multihoming send: multiHomed=" + multiHomed); } ConnectorHandler ch = getConnectorHandler(controller, multiHomed); writeBuffer(ch, message); } /** * Send byte-array on specified local address * * @param localAddress * @param message * @throws IOException */ @SuppressWarnings("unchecked") public void send(String localAddress, byte[] message) throws IOException { if (log.isInfoEnabled()) { log.info("Specified localAddress send: localAddress = " + localAddress); } ConnectorHandler ch = getConnectorHandler(controller, localAddress); writeBuffer(ch, message); } /** * Returns a connectorHandler * * @param grizzlyController * @param localAddress * @return ConnectorHandler * @throws IOException */ public ConnectorHandler getConnectorHandler(Controller grizzlyController, String localAddress) throws IOException { ConnectorHandler ch = grizzlyController.acquireConnectorHandler(Controller.Protocol.TCP); if (!ch.isConnected()) { connectChannel(ch, localAddress, false); } return ch; } /** * Returns a connectionHandler for writing. * * @param grizzlyController * @param multiHomed * @return ConnectorHandler * @throws IOException */ public ConnectorHandler getConnectorHandler(Controller grizzlyController, Boolean multiHomed) throws IOException { ConnectorHandler ch = grizzlyController.acquireConnectorHandler(Controller.Protocol.TCP); if (!ch.isConnected()) { connectChannel(ch, null, multiHomed); } return ch; } /** * @param multiHomed * @return connectorHandler * @throws IOException */ public ConnectorHandler getConnectorHandler(Boolean multiHomed) throws IOException { return getConnectorHandler(controller, multiHomed); } /** * @param localAddress * @return connectorHandler * @throws IOException */ public ConnectorHandler getConnectorHandler(String localAddress) throws IOException { return getConnectorHandler(controller, localAddress); } /** * Connects the channel. Multihomed has precedence over * localAddress * * @param ch * @param localAddress * @param multiHomed * @return channel * @throws IOException */ @SuppressWarnings("unchecked") protected synchronized SelectableChannel connectChannel(ConnectorHandler ch, String localAddress, boolean multiHomed) throws IOException { // ByteBuffers for reading & writing if (log.isDebugEnabled()) { log.debug("Allocating buffers"); } CallbackHandler callBackHandler = callbackFactory.createCallBackHandler(controller, ch, handler, readBB, writeBB); InetSocketAddress remoteAddress = new InetSocketAddress(getHostname(), getPort()); // TODO Log connection if (multiHomed == false) { if (localAddress != null) { InetSocketAddress localSocketAddress = new InetSocketAddress(localAddress, 0); ch.connect(remoteAddress, localSocketAddress, callBackHandler); } else { ch.connect(remoteAddress, callBackHandler); } } else { InetSocketAddress localSocketAddress = new InetSocketAddress(TcpSocketConnection.localAddressToUse(), 0); ch.connect(remoteAddress, localSocketAddress, callBackHandler); } // TODO if (ch instanceof CacheableConnectorHandler) { CacheableConnectorHandler cch = (CacheableConnectorHandler) ch; ch = cch.getUnderlyingConnectorHandler(); } SocketChannel sc = (SocketChannel) ch.getUnderlyingChannel(); if (log.isDebugEnabled()) { log.debug("Connect MH:" + multiHomed + " Remote:" + remoteAddress.getHostName() + ":" + remoteAddress.getPort() + " Local:" + localAddress); } return sc; } /** * Writes the message to the connectorHandler * * @param connectorHandler * @param message * @throws IOException */ protected long writeBuffer(ConnectorHandler connectorHandler, byte[] message) throws IOException { long len = 0; if (connectorHandler.isConnected()) { len = connectorHandler.write(ByteBuffer.wrap(message), true); if (log.isDebugEnabled()) { log.debug("Wrote " + len + " bytes"); } } else { if (log.isFatalEnabled()) { log.fatal("ConnectorHandler: " + connectorHandler + " is not connected. Channel: " + connectorHandler.getUnderlyingChannel()); } len = -1; } return len; } /** * Starts Grizzly controller */ protected synchronized void startGrizzlyController() { // Check if someone else has tried to start Grizzly framework // TODO: Is this really needed? Maybe it is OK with one Grizzly per User instance if (grizzlyUpstartPhase == true) { try { // Then we will wait until started properly //System.out.println("Grizzly is starting, " + " waiting."); // grizzlyStartLatch.await(); } catch (Exception e) { throw new NttException("Grizzly NIO framework failed to start", e); } } //synchronized (controller) { if (!controller.isStarted()) { new Thread(controller, "Grizzly client").start(); try { grizzlyUpstartPhase = true; grizzlyStartLatch.await(5, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new NttException("Grizzly NIO framework failed to start", e); } } //} } /** * @return hostname */ public String getHostname() { return hostname; } /** * @return port */ public int getPort() { return port; } /** * @throws IOException */ public synchronized void shutdown() throws IOException { if (controller.isStarted()) { controller.stop(); } } /** * @param handler */ public void setHandler(IncomingMessageHandler handler) { this.handler = handler; } /** * @param callbackFactory */ public void setCallbackFactory(CallbackHandlerFactory callbackFactory) { this.callbackFactory = callbackFactory; } }