import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import com.sun.grizzly.CallbackHandler; import com.sun.grizzly.ConnectorHandler; import com.sun.grizzly.Context; import com.sun.grizzly.Controller; import com.sun.grizzly.ControllerStateListenerAdapter; import com.sun.grizzly.DefaultProtocolChain; import com.sun.grizzly.DefaultSelectionKeyHandler; import com.sun.grizzly.IOEvent; import com.sun.grizzly.ProtocolChain; import com.sun.grizzly.ProtocolChainInstanceHandler; import com.sun.grizzly.ProtocolFilter; import com.sun.grizzly.UDPSelectorHandler; import com.sun.grizzly.util.DefaultThreadPool; import com.sun.grizzly.Controller.Protocol; import com.sun.grizzly.connectioncache.client.CacheableConnectorHandlerPool; import com.sun.grizzly.filter.ReadFilter; public class GrizzlyCacheTest { private Controller clientController = null; /* ----------- Server ----------------- */ public Controller initServer(){ Controller controller = new Controller(); UDPSelectorHandler udpSelector = new UDPSelectorHandler(); udpSelector.setPort(5060); udpSelector.setSelectionKeyHandler(new DefaultSelectionKeyHandler()); controller.addSelectorHandler(udpSelector); controller.setThreadPool(new DefaultThreadPool()); ProtocolChainInstanceHandler pciHandler = new ProtocolChainInstanceHandler() { final private ProtocolChain protocolChain = new DefaultProtocolChain(); public ProtocolChain poll() { return protocolChain; } public boolean offer(ProtocolChain instance) { return true; } }; controller.setProtocolChainInstanceHandler(pciHandler); ProtocolChain protocolChain = pciHandler.poll(); protocolChain.addFilter(new ReadFilter()); protocolChain.addFilter(new TestFilter()); return controller; } private class TestFilter implements ProtocolFilter{ private int counter = 0; public boolean execute(Context ctx) throws IOException { ConnectorHandler connectorHandler = clientController.acquireConnectorHandler(Protocol.UDP); InetSocketAddress destination = null; synchronized(this){ int number = counter % 4; if (number == 0) destination = new InetSocketAddress(InetAddress.getLocalHost(), 5063); else if (number == 1) destination = new InetSocketAddress(InetAddress.getLocalHost(), 5064); else if (number == 2) destination = new InetSocketAddress(InetAddress.getLocalHost(), 5065); else if (number == 3) destination = new InetSocketAddress(InetAddress.getLocalHost(), 5066); counter++; } TestCallbackHandler callbackHandler = new TestCallbackHandler(connectorHandler); connectorHandler.connect(destination, callbackHandler); ByteBuffer buffer = ByteBuffer.allocate(10); buffer.put(new byte[]{'r', 'e', 't', 'u', 'r', 'n'}); try{ connectorHandler.write(buffer, false); }catch(Exception e){ }finally{ connectorHandler.close(); } clientController.releaseConnectorHandler(connectorHandler); return true; } public boolean postExecute(Context ctx) throws IOException { return true; } } private class TestCallbackHandler implements CallbackHandler{ private ConnectorHandler connectorHandler = null; public TestCallbackHandler(ConnectorHandler handler){ this.connectorHandler = handler; } public void onConnect(IOEvent ioEvent) { SelectionKey key = ioEvent.attachment().getSelectionKey(); try { connectorHandler.finishConnect(key); clientController.registerKey(key, SelectionKey.OP_READ, Controller.Protocol.UDP); } catch (IOException ex) { ex.printStackTrace(); clientController.getSelectorHandler(Protocol.UDP).getSelectionKeyHandler().cancel(key); } } public void onRead(IOEvent ioEvent) { try { Context ctx = ioEvent.attachment(); SelectionKey key = ctx.getSelectionKey(); if (!key.isValid()) { return; } key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); ctx.getProtocolChain().execute(ioEvent.attachment()); } catch (Throwable e) { e.printStackTrace(); } } public void onWrite(IOEvent ioEvent) { Context ctx = ioEvent.attachment(); SelectionKey key = ctx.getSelectionKey(); key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } } /* --------- Client ----------------- */ public Controller initClient(){ this.clientController = new Controller(); UDPSelectorHandler udpSelector = new UDPSelectorHandler(true); udpSelector.setPort(5061); udpSelector.setSelectionKeyHandler(new DefaultSelectionKeyHandler()); this.clientController.addSelectorHandler(udpSelector); this.clientController.setThreadPool(new DefaultThreadPool()); ProtocolChainInstanceHandler pciHandler = new ProtocolChainInstanceHandler() { final private ProtocolChain protocolChain = new DefaultProtocolChain(); public ProtocolChain poll() { return protocolChain; } public boolean offer(ProtocolChain instance) { return true; } }; this.clientController.setProtocolChainInstanceHandler(pciHandler); CacheableConnectorHandlerPool connectorHandlerPool = new CacheableConnectorHandlerPool(this.clientController, 100, 1, 1); this.clientController.setConnectorHandlerPool(connectorHandlerPool); return this.clientController; } public void start(){ final CountDownLatch latch = new CountDownLatch(1); Controller controller = initServer(); controller.addStateListener(new ControllerStateListenerAdapter() { @Override public void onReady() { latch.countDown(); } @Override public void onException(Throwable e) { if (latch.getCount() > 0) { Controller.logger().log(Level.SEVERE, "Exception during " + "starting the controller", e); latch.countDown(); } else { Controller.logger().log(Level.SEVERE, "Exception during " + "controller processing", e); } } }); new Thread(controller).start(); try{ latch.await(); }catch(InterruptedException e){ } final CountDownLatch clientLatch = new CountDownLatch(1); Controller clientController = initClient(); clientController.addStateListener(new ControllerStateListenerAdapter() { @Override public void onReady() { clientLatch.countDown(); } @Override public void onException(Throwable e) { if (latch.getCount() > 0) { Controller.logger().log(Level.SEVERE, "Exception during " + "starting the controller", e); clientLatch.countDown(); } else { Controller.logger().log(Level.SEVERE, "Exception during " + "controller processing", e); } } }); new Thread(clientController).start(); try{ clientLatch.await(); }catch(InterruptedException e){ } } public static void main(String[] args){ GrizzlyCacheTest test = new GrizzlyCacheTest(); test.start(); try{ DatagramSocket socket = new DatagramSocket(5062); InetAddress local = InetAddress.getLocalHost(); byte[] data = new byte[]{'t', 'e', 's', 't'}; for (int i = 0; i < 20; i++){ DatagramPacket packet = new DatagramPacket(data, 0, data.length, local, 5060); socket.send(packet); socket.send(packet); socket.send(packet); } System.out.println("Send done."); }catch(Exception e){ e.printStackTrace(); } } }