import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.sun.grizzly.BaseSelectionKeyHandler;
import com.sun.grizzly.CallbackHandler;
import com.sun.grizzly.ConnectorHandler;
import com.sun.grizzly.Controller;
import com.sun.grizzly.ControllerStateListener;
import com.sun.grizzly.DefaultProtocolChain;
import com.sun.grizzly.ProtocolChain;
import com.sun.grizzly.ProtocolChainInstanceHandler;
import com.sun.grizzly.TCPSelectorHandler;
import com.sun.grizzly.connectioncache.client.CacheableConnectorHandler;
import com.sun.grizzly.filter.ReadFilter;
import com.sun.grizzly.util.ByteBufferFactory;
import com.sun.grizzly.util.ConnectionCloseHandler;
import com.sun.grizzly.util.WorkerThreadFactory;
import com.sun.grizzly.util.ByteBufferFactory.ByteBufferType;
/**
* TcpClient is the base class for TCP communication. Starts the
* grizzly framework and does the actual send/receive
*
*/
public class TcpGrizzlyClient implements TcpClientInterface {
private static Log log = LogFactory.getLog(TcpGrizzlyClient.class);
private final static int READBUFFER_SIZE = 5000;
private final static int WRITEBUFFER_SIZE = 0;
final private ByteBuffer readBB = ByteBufferFactory.allocate(ByteBufferType.HEAP_VIEW, READBUFFER_SIZE);
final private ByteBuffer writeBB = ByteBufferFactory.allocate(ByteBufferType.HEAP_VIEW, WRITEBUFFER_SIZE);
/**
* Hostname. Used when sending
*/
private String hostname;
/**
* Port. Used when sending
*/
private int port;
private Controller controller;
/**
* The Grizzly controller to use
*
* @param controller
*/
public void setController(Controller controller) {
this.controller = controller;
}
/**
* Returns the grizzly controller in use
*
* @return A grizzly controller
*/
public Controller getController() {
return controller;
}
/**
* The Grizzly controller
*/
private static Boolean grizzlyInitialized = false;
/**
* grizzlyUpstartPhase
*/
protected static Boolean grizzlyUpstartPhase = false;
private IncomingMessageHandler handler;
/**
* Creates callBackHandlers
*/
protected CallbackHandlerFactory callbackFactory;
/**
* grizzlyStartLatch
Used for waiting for grizzly
* framework to start
*/
protected final CountDownLatch grizzlyStartLatch;
/**
* Creates a new instance of TcpClient
, Grizzly based
*
* @param hostname
* @param port
*/
public TcpGrizzlyClient(String hostname, int port) {
this.hostname = hostname;
this.port = port;
grizzlyStartLatch = new CountDownLatch(1);
}
protected synchronized Boolean initGrizzly(Controller controller) {
synchronized (grizzlyInitialized) {
if (grizzlyInitialized == true) {
return true;
}
log.warn("Initializing Grizzly on: " + hostname + ":" + port);
grizzlyInitialized = true;
}
ProtocolChain protocolChain;
TCPSelectorHandler tcpSelector = new TCPSelectorHandler(true);
BaseSelectionKeyHandler selectionKeyHandler = new BaseSelectionKeyHandler();
// to be notified when a client/server close the connection
selectionKeyHandler.setConnectionCloseHandler(new ConnectionCloseHandler() {
public void locallyClosed(SelectionKey key) {
if (log.isDebugEnabled()) {
log.debug(key + " is closed from local");
}
}
public void remotlyClosed(SelectionKey key) {
if (log.isDebugEnabled()) {
log.debug(key + " is closed from remote");
}
}
});
tcpSelector.setSelectionKeyHandler(selectionKeyHandler);
tcpSelector.setReuseAddress(true);
tcpSelector.setLinger(0);
controller.addSelectorHandler(tcpSelector);
controller.addStateListener(new ControllerStateListener() {
public void onReady() {
log.info("Grizzly ready");
grizzlyStartLatch.countDown();
}
public void onException(Throwable arg0) {
throw new NttException("Exception in Grizzly framework " + arg0.getMessage());
}
public void onStarted() {
log.debug("Grizzly started");
}
public void onStopped() {
log.debug("Grizzly stopped");
}
});
ProtocolChainInstanceHandler pciHandler = new ProtocolChainInstanceHandler() {
final private ProtocolChain pc = new DefaultProtocolChain();
public ProtocolChain poll() {
return pc;
}
public boolean offer(@SuppressWarnings("unused") ProtocolChain arg0) {
return false;
}
};
controller.setProtocolChainInstanceHandler(pciHandler);
final BlockingQueue workQueue = new ArrayBlockingQueue(2000);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1500, 2000, 100, TimeUnit.MILLISECONDS, workQueue,
new WorkerThreadFactory());
controller.setThreadPool(threadPool);
protocolChain = pciHandler.poll();
protocolChain.addFilter(new ReadFilter());
return true;
}
/**
* sends byte array
*
* @param message
* @throws IOException
*/
public void send(byte[] message) throws IOException {
send(false, message);
}
/**
* Sends byte-array on optional multihomed (round-robin on local
* addresses) socket
*
* @param multiHomed
* @param message
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void send(boolean multiHomed, byte[] message) throws IOException {
if (log.isInfoEnabled()) {
log.info("Multihoming send: multiHomed=" + multiHomed);
}
ConnectorHandler ch = getConnectorHandler(controller, multiHomed);
writeBuffer(ch, message);
}
/**
* Send byte-array on specified local address
*
* @param localAddress
* @param message
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void send(String localAddress, byte[] message) throws IOException {
if (log.isInfoEnabled()) {
log.info("Specified localAddress send: localAddress = " + localAddress);
}
ConnectorHandler ch = getConnectorHandler(controller, localAddress);
writeBuffer(ch, message);
}
/**
* Returns a connectorHandler
*
* @param grizzlyController
* @param localAddress
* @return ConnectorHandler
* @throws IOException
*/
public ConnectorHandler, ?> getConnectorHandler(Controller grizzlyController, String localAddress)
throws IOException {
ConnectorHandler, ?> ch = grizzlyController.acquireConnectorHandler(Controller.Protocol.TCP);
if (!ch.isConnected()) {
connectChannel(ch, localAddress, false);
}
return ch;
}
/**
* Returns a connectionHandler for writing.
*
* @param grizzlyController
* @param multiHomed
* @return ConnectorHandler
* @throws IOException
*/
public ConnectorHandler, ?> getConnectorHandler(Controller grizzlyController, Boolean multiHomed)
throws IOException {
ConnectorHandler, ?> ch = grizzlyController.acquireConnectorHandler(Controller.Protocol.TCP);
if (!ch.isConnected()) {
connectChannel(ch, null, multiHomed);
}
return ch;
}
/**
* @param multiHomed
* @return connectorHandler
* @throws IOException
*/
public ConnectorHandler, ?> getConnectorHandler(Boolean multiHomed) throws IOException {
return getConnectorHandler(controller, multiHomed);
}
/**
* @param localAddress
* @return connectorHandler
* @throws IOException
*/
public ConnectorHandler, ?> getConnectorHandler(String localAddress) throws IOException {
return getConnectorHandler(controller, localAddress);
}
/**
* Connects the channel. Multihomed has precedence over
* localAddress
*
* @param ch
* @param localAddress
* @param multiHomed
* @return channel
* @throws IOException
*/
@SuppressWarnings("unchecked")
protected synchronized SelectableChannel connectChannel(ConnectorHandler ch, String localAddress, boolean multiHomed)
throws IOException {
// ByteBuffers for reading & writing
if (log.isDebugEnabled()) {
log.debug("Allocating buffers");
}
CallbackHandler> callBackHandler = callbackFactory.createCallBackHandler(controller, ch, handler, readBB,
writeBB);
InetSocketAddress remoteAddress = new InetSocketAddress(getHostname(), getPort());
// TODO Log connection
if (multiHomed == false) {
if (localAddress != null) {
InetSocketAddress localSocketAddress = new InetSocketAddress(localAddress, 0);
ch.connect(remoteAddress, localSocketAddress, callBackHandler);
} else {
ch.connect(remoteAddress, callBackHandler);
}
} else {
InetSocketAddress localSocketAddress = new InetSocketAddress(TcpSocketConnection.localAddressToUse(), 0);
ch.connect(remoteAddress, localSocketAddress, callBackHandler);
}
// TODO
if (ch instanceof CacheableConnectorHandler) {
CacheableConnectorHandler cch = (CacheableConnectorHandler) ch;
ch = cch.getUnderlyingConnectorHandler();
}
SocketChannel sc = (SocketChannel) ch.getUnderlyingChannel();
if (log.isDebugEnabled()) {
log.debug("Connect MH:" + multiHomed + " Remote:" + remoteAddress.getHostName() + ":"
+ remoteAddress.getPort() + " Local:" + localAddress);
}
return sc;
}
/**
* Writes the message to the connectorHandler
*
* @param connectorHandler
* @param message
* @throws IOException
*/
protected long writeBuffer(ConnectorHandler, ?> connectorHandler, byte[] message) throws IOException {
long len = 0;
if (connectorHandler.isConnected()) {
len = connectorHandler.write(ByteBuffer.wrap(message), true);
if (log.isDebugEnabled()) {
log.debug("Wrote " + len + " bytes");
}
} else {
if (log.isFatalEnabled()) {
log.fatal("ConnectorHandler: " + connectorHandler + " is not connected. Channel: "
+ connectorHandler.getUnderlyingChannel());
}
len = -1;
}
return len;
}
/**
* Starts Grizzly controller
*/
protected synchronized void startGrizzlyController() {
// Check if someone else has tried to start Grizzly framework
// TODO: Is this really needed? Maybe it is OK with one Grizzly per User instance
if (grizzlyUpstartPhase == true) {
try {
// Then we will wait until started properly
//System.out.println("Grizzly is starting, " + " waiting.");
// grizzlyStartLatch.await();
} catch (Exception e) {
throw new NttException("Grizzly NIO framework failed to start", e);
}
}
//synchronized (controller) {
if (!controller.isStarted()) {
new Thread(controller, "Grizzly client").start();
try {
grizzlyUpstartPhase = true;
grizzlyStartLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new NttException("Grizzly NIO framework failed to start", e);
}
}
//}
}
/**
* @return hostname
*/
public String getHostname() {
return hostname;
}
/**
* @return port
*/
public int getPort() {
return port;
}
/**
* @throws IOException
*/
public synchronized void shutdown() throws IOException {
if (controller.isStarted()) {
controller.stop();
}
}
/**
* @param handler
*/
public void setHandler(IncomingMessageHandler handler) {
this.handler = handler;
}
/**
* @param callbackFactory
*/
public void setCallbackFactory(CallbackHandlerFactory callbackFactory) {
this.callbackFactory = callbackFactory;
}
}