users@grizzly.java.net

Oneway TCP throughput test - timeout and bufferunderflow

From: Neil Avery <neil.avery_at_gmail.com>
Date: Sat, 17 May 2008 18:54:05 +0100

Ive been trying to setup a one-way tcp streaming through grizzly. The code
is below - after sending through 65-70Kmessages (in a couple of seconds) - I
receive the exception below. Note, Ive increased the timeout and attempt
value on the OutputWriter. On the serverside I can get bufferunderflows...
Is there something I can tune or fixable?
Thanks & Regards Neil
java.io.IOException: Client disconnected, timeout:30000 attempts:11
    at com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:128)
    at com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:73)
    at
com.sun.grizzly.TCPConnectorHandler.write(TCPConnectorHandler.java:416)
    at com.stuff.net.GrizzlyTest.testGrizzlyThroughput(GrizzlyTest.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at junit.framework.TestCase.runTest(TestCase.java:168)
    at junit.framework.TestCase.runBare(TestCase.java:134)
    at junit.framework.TestResult$1.protect(TestResult.java:110)

The test is very simple..........as follows...

public class GrizzlyTest extends TestCase {

    int receivedCount = 0;

    public void testGrizzlyThroughput() throws Exception {

        Server s = new Server();
        s.execute(9094);

        Controller controller = new Controller();
        MyClient myClient = new MyClient(controller);

        final TCPConnectorHandler connectorHandler = (TCPConnectorHandler)
controller.acquireConnectorHandler(Controller.Protocol.TCP);
        OutputWriter.setDefaultWriteTimeout(30 * 1000);

        connectorHandler.connect(new InetSocketAddress("localhost", 9094),
new CallbackHandler<Context>() {

            public void onConnect(IOEvent<Context> e) {
                SelectionKey k = e.attachment().getSelectionKey();
                System.out.println("Callbackhandler: OnConnect...");
                try {
                    connectorHandler.finishConnect(k);
                } catch (Exception ex) {
                    System.out.println("exception in CallbackHandler:" +
ex.getMessage());
                }
                e.attachment().getSelectorHandler().register(k,
SelectionKey.OP_READ);
            }

            public void onRead(IOEvent<Context> e) {
                System.out.println("Callbackhandler: OnRead...");
            }

            public void onWrite(IOEvent<Context> e) {
                System.out.println("Callbackhandler: OnWrite...");
            }
        });

        ByteBuffer buf = ByteBuffer.allocate(100);
        int amount = 10000;
        long start = System.currentTimeMillis();
        for (int i = 0; i < amount ; i++) {
            buf.put(HEADER_BYTES);
            buf.putInt("stuff".getBytes().length);
            buf.put("stuff".getBytes());
            long size = connectorHandler.write(buf, true);

            // this doesnt help
            Thread.yield();
            buf.flip();
            buf.clear();
        }
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        long elapse = start - end;
// double throughput = amount/(elapse/1000);
// System.err.println("Throughput:" + throughput + "/second elapse:"
+ elapse);
        assertTrue("nothing was received", receivedCount > 0);
        assertEquals("Expected:" + amount + " but got:" + receivedCount,
amount, receivedCount);

        connectorHandler.close();
        controller.stop();

    }

    public class MyClient {
        public MyClient(Controller controller) throws InterruptedException {
            TCPSelectorHandler tcpSelector = new TCPSelectorHandler(true);
            tcpSelector.setPort(9090);
            controller.addSelectorHandler(tcpSelector);
            controller.addStateListener(new ControllerStateListenerAdapter()
{

                public void onException(Throwable e) {
                    System.out.println("Grizzly controller exception:" +
e.getMessage());
                }

                public void onReady() {
                    System.out.println("ClientConnectorReady!");
                }
            });
            new Thread(controller).start();
            Thread.sleep(500);
        }
    }

    public static String HEADER = "LL_TCP";
    public static byte[] HEADER_BYTES = HEADER.getBytes();
    byte[] bufferSize = new byte[HEADER.length() + Integer.SIZE/8];

    public class Server implements ProtocolFilter {

        final ProtocolFilter read = new ReadFilter();
        final TCPSelectorHandler tcpHandler = new TCPSelectorHandler();
        final Controller controller = new Controller();

        void execute(int pPort) {
            tcpHandler.setPort(pPort);
            controller.setSelectorHandler(tcpHandler);

            final ProtocolFilter myFilter = this;
            controller.setProtocolChainInstanceHandler(new
DefaultProtocolChainInstanceHandler() {
                public ProtocolChain poll() {
                    ProtocolChain protocolChain = protocolChains.poll();
                    if (protocolChain == null) {
                        protocolChain = new DefaultProtocolChain();
                        protocolChain.addFilter(read);
                         protocolChain.addFilter(myFilter);
                    }
                    return protocolChain;
                }

            });
            new Thread(controller).start();
        }

        public boolean execute(Context ctx) throws IOException {

            try {
                final WorkerThread workerThread =
((WorkerThread)Thread.currentThread());
                   String message = "";
                   ByteBuffer buffer = workerThread.getByteBuffer();
                   buffer.flip();

                   /**
                    * assemble packets from the stream - yep this is wrong
but done to
                    * break up the conjoined way they are received
                    */
                   while (buffer.hasRemaining()) {
                       byte[] header = new byte[HEADER_BYTES.length];
                       ByteBuffer byteBuffer = buffer.get(header);
                       int packetLength = buffer.getInt();
                       byte[] packet = new byte[packetLength];
                       buffer.get(packet);

                       receivedCount++;
// if (receivedCount % 1000 == 0) {
                           System.err.println(receivedCount + " Got Packet:"
+ new String(packet));
// }
                   }

                   buffer.clear();

            } catch (Throwable t) {
                t.printStackTrace();
            }
            return false;

        }

        public boolean postExecute(Context ctx) throws IOException {
            return true;
        }
    }