import java.io.IOException; import java.io.InputStream; import java.net.BindException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import org.apache.commons.logging.Log; import com.sun.grizzly.CallbackHandler; import com.sun.grizzly.ConnectorHandler; import com.sun.grizzly.Context; import com.sun.grizzly.Controller; import com.sun.grizzly.IOEvent; /** * Implements Callbackhandler for the HTTP protocol */ public class HttpCallBackHandler implements CallbackHandlerFactory { private static Log log = Logging.getLog(HttpCallBackHandler.class); /** * Creates a callbackhandler for the HTTP protocol * * @param connectorHandler * @param writeBuffer * @param readBuffer * @return callbackhandler */ public CallbackHandler createCallBackHandler(final Controller controller, final ConnectorHandler connectorHandler, final IncomingMessageHandler handler, final ByteBuffer readBuffer, final ByteBuffer writeBuffer) { return new CallbackHandler() { public void onConnect(IOEvent ioEvent) { SelectionKey key = ioEvent.attachment().getSelectionKey(); if (!key.isValid()) { // log.fatal("Key is not valid: " + key.hashCode() + " Socket:" // + ((SocketChannel) key.channel()).socket()); return; } try { connectorHandler.finishConnect(key); } catch (BindException be) { log.fatal("BindException: Out of sockets?"); } catch (IOException e) { e.printStackTrace(); log.fatal("Unable to connect key: "); } catch (Exception ex) { ex.printStackTrace(); } controller.registerKey(key, SelectionKey.OP_READ, Controller.Protocol.TCP); } public void onRead(IOEvent ioEvent) { Context ctx = ioEvent.attachment(); SelectionKey key = ctx.getSelectionKey(); if (!key.isValid()) { log.fatal("onRead() key not valid"); return; } key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); SocketChannel channel = (SocketChannel) key.channel(); Boolean close = false; try { int count = channel.read(readBuffer); if (log.isDebugEnabled()) { log.debug("onRead: Read " + count + "bytes"); } readBuffer.flip(); if (readBuffer.hasRemaining()) { InputStream is = newInputStream(readBuffer); HttpResponse response = new HttpParser().parseResponse(is); handler.incomingMessage(response); writeBuffer.put(response.getContent()); writeBuffer.flip(); connectorHandler.write(writeBuffer, true); // Close connection on Connection: close close = "close".equalsIgnoreCase(response.getConnection()); if (close == true) { if (log.isDebugEnabled()) { log.debug("Closing channel:" + channel.toString()); log.debug("Closing connectorhandle:" + connectorHandler.toString()); log.debug("Releasing connectorhandler"); } connectorHandler.close(); controller.releaseConnectorHandler(connectorHandler); } } } catch (IOException e) { log.fatal("Exception in onRead: ", e); } finally { if (close == false) { // Reset key interests key.interestOps(SelectionKey.OP_READ); } else { //key.cancel(); //key.interestOps(SelectionKey.OP_CONNECT); } // Clear buffer for reuse readBuffer.clear(); } } public void onWrite(@SuppressWarnings("unused") IOEvent ioEvent) { if (log.isDebugEnabled()) { log.debug("onWrite in httpCallbackhandler"); } } }; } // Returns an input stream for a ByteBuffer. // The read() methods use the relative ByteBuffer get() methods. /** * @param buf * @return InputStream */ public static InputStream newInputStream(final ByteBuffer buf) { return new InputStream() { @SuppressWarnings("unused") @Override public synchronized int read() throws IOException { if (!buf.hasRemaining()) { return -1; } return buf.get(); } @SuppressWarnings("unused") @Override public synchronized int read(byte[] bytes, int off, int len) throws IOException { // Read only what's left int len2 = Math.min(len, buf.remaining()); buf.get(bytes, off, len2); return len2; } }; } }