users@grizzly.java.net

Re: Oneway TCP throughput test - timeout and bufferunderflow

From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Date: Tue, 20 May 2008 10:57:47 +0200

Hi,

IMHO it should work.
Please check following commit #1153 [1]

Thanks.
WBR,
Alexey.

[1]
Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/
CallbackHandlerSelectionKeyAttachment.java
Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/CallbackHandlerSelectionKeyAttachment.java?view=diff&rev=1153&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/CallbackHandlerSelectionKeyAttachment.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/CallbackHandlerSelectionKeyAttachment.java&r1=1152&r2=1153
=
=
=
=
=
=
========================================================================
--- trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/
CallbackHandlerSelectionKeyAttachment.java (original)
+++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/
CallbackHandlerSelectionKeyAttachment.java 2008-05-19 14:20:08+0000
@@ -85,4 +85,14 @@
             pool.offer(this);
         }
     }
+
+ /**
+ * Callback handler never should lead to the timeout situation,
+ * as connections could use it, but perform IO operations in
sync. mode.
+ * @return null
+ */
+ @Override
+ public Long getTimeout() {
+ return null;
+ }
}

On May 19, 2008, at 20:57 , Neil Avery wrote:

> Thanks for the update guys, Ive just synced up with grizzly/trunk
> and have rerun the test. Its gets further (191K msgs), however it
> still fails with the same exception,
>
> Ill sort out the protocol handling once I can get the messages
> flowing through seemlessly! - seems very close....
>
>
> Can I verify the change you made and that Im seeing it in my svn
> checkout?
>
> Thanks again, Neil.
>
> java.io.IOException: Client disconnected
> at com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:
> 123)
>
> at com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:
> 73)
> at
> com.sun.grizzly.TCPConnectorHandler.write(TCPConnectorHandler.java:
> 416)
> at
> com
> .liquidlabs.net.GrizzlyTest.testGrizzlyThroughput(GrizzlyTest.java:73)
>
>
>
> ==========================================================
>
> Date: Mon, 19 May 2008 16:39:16 +0200
> From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
> Content-type: multipart/alternative;
> boundary="Boundary_(ID_bODhTAP90MizXacjMQDsvw)"
>
> Subject: Oneway TCP throughput test - timeout and bufferunderflow
>
> Hello Neil,
>
> basically you have 2 issues :)
>
> 1) java.io.IOException: Client disconnected, timeout:30000 attempts:11
> This is Grizzly bug, which I've just fixed [1]. So if you will build
> Grizzly from trunk - you should not see this problem any more.
>
>
> 2) bufferunderflow
> This happens because under load ReadFilter doesn't read whole the
> packet you need, but just half of it.
> So ByteBuffer, which comes to your Server Filter contains just part of
> message, and when you try to get it to the byte[] - it throws
>
> exception, because byte[].length > ByteBuffer.remaining().
> I would suggest you to take a look at ParserProtocolFilter code and
> example how you can implement custom protocol parser [2]. IMHO Server
> ProtocolFilter, which you have right now, should be realized as
> Parser.
>
>
> Thanks.
>
> WBR,
> Alexey.
>
> [1] https://grizzly.dev.java.net/issues/show_bug.cgi?id=141
> [2] http://weblogs.java.net/blog/sdo/archive/2007/12/grizzly_protoco.html
>
>
>
>
> 2008/5/17 Neil Avery <neil.avery_at_gmail.com>:
> Ive been trying to setup a one-way tcp streaming through grizzly.
> The code is below - after sending through 65-70Kmessages (in a
> couple of seconds) - I receive the exception below. Note, Ive
> increased the timeout and attempt value on the OutputWriter. On the
> serverside I can get bufferunderflows...
> Is there something I can tune or fixable?
> Thanks & Regards Neil
> java.io.IOException: Client disconnected, timeout:30000 attempts:11
> at
> com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:128)
> at
> com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:73)
> at
> com.sun.grizzly.TCPConnectorHandler.write(TCPConnectorHandler.java:
> 416)
> at
> com.stuff.net.GrizzlyTest.testGrizzlyThroughput(GrizzlyTest.java:82)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun
> .reflect
> .NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun
> .reflect
> .DelegatingMethodAccessorImpl
> .invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at junit.framework.TestCase.runTest(TestCase.java:168)
> at junit.framework.TestCase.runBare(TestCase.java:134)
> at junit.framework.TestResult$1.protect(TestResult.java:110)
>
> The test is very simple..........as follows...
>
> public class GrizzlyTest extends TestCase {
>
> int receivedCount = 0;
>
> public void testGrizzlyThroughput() throws Exception {
>
> Server s = new Server();
> s.execute(9094);
>
> Controller controller = new Controller();
> MyClient myClient = new MyClient(controller);
>
> final TCPConnectorHandler connectorHandler =
> (TCPConnectorHandler)
> controller.acquireConnectorHandler(Controller.Protocol.TCP);
> OutputWriter.setDefaultWriteTimeout(30 * 1000);
>
> connectorHandler.connect(new InetSocketAddress("localhost",
> 9094), new CallbackHandler<Context>() {
>
> public void onConnect(IOEvent<Context> e) {
> SelectionKey k = e.attachment().getSelectionKey();
> System.out.println("Callbackhandler: OnConnect...");
> try {
> connectorHandler.finishConnect(k);
> } catch (Exception ex) {
> System.out.println("exception in
> CallbackHandler:" + ex.getMessage());
> }
> e.attachment().getSelectorHandler().register(k,
> SelectionKey.OP_READ);
> }
>
> public void onRead(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnRead...");
> }
>
> public void onWrite(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnWrite...");
> }
> });
>
> ByteBuffer buf = ByteBuffer.allocate(100);
> int amount = 10000;
> long start = System.currentTimeMillis();
> for (int i = 0; i < amount ; i++) {
> buf.put(HEADER_BYTES);
> buf.putInt("stuff".getBytes().length);
> buf.put("stuff".getBytes());
> long size = connectorHandler.write(buf, true);
>
> // this doesnt help
> Thread.yield();
> buf.flip();
> buf.clear();
> }
> Thread.sleep(1000);
> long end = System.currentTimeMillis();
> long elapse = start - end;
> // double throughput = amount/(elapse/1000);
> // System.err.println("Throughput:" + throughput + "/second
> elapse:" + elapse);
> assertTrue("nothing was received", receivedCount > 0);
> assertEquals("Expected:" + amount + " but got:" +
> receivedCount, amount, receivedCount);
>
> connectorHandler.close();
> controller.stop();
>
> }
>
> public class MyClient {
> public MyClient(Controller controller) throws
> InterruptedException {
> TCPSelectorHandler tcpSelector = new
> TCPSelectorHandler(true);
> tcpSelector.setPort(9090);
> controller.addSelectorHandler(tcpSelector);
> controller.addStateListener(new
> ControllerStateListenerAdapter() {
>
> public void onException(Throwable e) {
> System.out.println("Grizzly controller
> exception:" + e.getMessage());
> }
>
> public void onReady() {
> System.out.println("ClientConnectorReady!");
> }
> });
> new Thread(controller).start();
> Thread.sleep(500);
> }
> }
>
> public static String HEADER = "LL_TCP";
> public static byte[] HEADER_BYTES = HEADER.getBytes();
> byte[] bufferSize = new byte[HEADER.length() + Integer.SIZE/8];
>
> public class Server implements ProtocolFilter {
>
> final ProtocolFilter read = new ReadFilter();
> final TCPSelectorHandler tcpHandler = new
> TCPSelectorHandler();
> final Controller controller = new Controller();
>
> void execute(int pPort) {
> tcpHandler.setPort(pPort);
> controller.setSelectorHandler(tcpHandler);
>
> final ProtocolFilter myFilter = this;
> controller.setProtocolChainInstanceHandler(new
> DefaultProtocolChainInstanceHandler() {
> public ProtocolChain poll() {
> ProtocolChain protocolChain =
> protocolChains.poll();
> if (protocolChain == null) {
> protocolChain = new DefaultProtocolChain();
> protocolChain.addFilter(read);
> protocolChain.addFilter(myFilter);
> }
> return protocolChain;
> }
>
> });
> new Thread(controller).start();
> }
>
> public boolean execute(Context ctx) throws IOException {
>
> try {
> final WorkerThread workerThread =
> ((WorkerThread)Thread.currentThread());
> String message = "";
> ByteBuffer buffer = workerThread.getByteBuffer();
> buffer.flip();
>
> /**
> * assemble packets from the stream - yep this is
> wrong but done to
> * break up the conjoined way they are received
> */
> while (buffer.hasRemaining()) {
> byte[] header = new byte[HEADER_BYTES.length];
> ByteBuffer byteBuffer = buffer.get(header);
> int packetLength = buffer.getInt();
> byte[] packet = new byte[packetLength];
> buffer.get(packet);
>
> receivedCount++;
> // if (receivedCount % 1000 == 0) {
> System.err.println(receivedCount + " Got
> Packet:" + new String(packet));
> // }
> }
>
> buffer.clear();
>
> } catch (Throwable t) {
> t.printStackTrace();
> }
> return false;
>
> }
>
> public boolean postExecute(Context ctx) throws IOException {
> return true;
> }
> }
>