Thanks for the update guys, Ive just synced up with grizzly/trunk and
have rerun the test. Its gets further (191K msgs), however it still
fails with the same exception,
Ill sort out the protocol handling once I can get the messages flowing
through seemlessly! - seems very close....
Can I verify the change you made and that Im seeing it in my svn checkout?
Thanks again, Neil.
java.io.IOException: Client disconnected
at com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:123)
at com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:73)
at com.sun.grizzly.TCPConnectorHandler.write(TCPConnectorHandler.java:416)
at com.liquidlabs.net.GrizzlyTest.testGrizzlyThroughput(GrizzlyTest.java:73)
==========================================================
Date: Mon, 19 May 2008 16:39:16 +0200
From: Oleksiy Stashok <Oleksiy.Stashok_at_Sun.COM>
Content-type: multipart/alternative;
boundary="Boundary_(ID_bODhTAP90MizXacjMQDsvw)"
Subject: Oneway TCP throughput test - timeout and bufferunderflow
Hello Neil,
basically you have 2 issues :)
1) java.io.IOException: Client disconnected, timeout:30000 attempts:11
This is Grizzly bug, which I've just fixed [1]. So if you will build
Grizzly from trunk - you should not see this problem any more.
2) bufferunderflow
This happens because under load ReadFilter doesn't read whole the
packet you need, but just half of it.
So ByteBuffer, which comes to your Server Filter contains just part of
message, and when you try to get it to the byte[] - it throws
exception, because byte[].length > ByteBuffer.remaining().
I would suggest you to take a look at ParserProtocolFilter code and
example how you can implement custom protocol parser [2]. IMHO Server
ProtocolFilter, which you have right now, should be realized as Parser.
Thanks.
WBR,
Alexey.
[1]
https://grizzly.dev.java.net/issues/show_bug.cgi?id=141
[2]
http://weblogs.java.net/blog/sdo/archive/2007/12/grizzly_protoco.html
2008/5/17 Neil Avery <neil.avery_at_gmail.com>:
> Ive been trying to setup a one-way tcp streaming through grizzly. The code
> is below - after sending through 65-70Kmessages (in a couple of seconds) - I
> receive the exception below. Note, Ive increased the timeout and attempt
> value on the OutputWriter. On the serverside I can get bufferunderflows...
> Is there something I can tune or fixable?
> Thanks & Regards Neil
> java.io.IOException: Client disconnected, timeout:30000 attempts:11
> at
> com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:128)
> at com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:73)
> at
> com.sun.grizzly.TCPConnectorHandler.write(TCPConnectorHandler.java:416)
> at com.stuff.net.GrizzlyTest.testGrizzlyThroughput(GrizzlyTest.java:82)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at junit.framework.TestCase.runTest(TestCase.java:168)
> at junit.framework.TestCase.runBare(TestCase.java:134)
> at junit.framework.TestResult$1.protect(TestResult.java:110)
>
> The test is very simple..........as follows...
>
> public class GrizzlyTest extends TestCase {
>
> int receivedCount = 0;
>
> public void testGrizzlyThroughput() throws Exception {
>
> Server s = new Server();
> s.execute(9094);
>
> Controller controller = new Controller();
> MyClient myClient = new MyClient(controller);
>
> final TCPConnectorHandler connectorHandler = (TCPConnectorHandler)
> controller.acquireConnectorHandler(Controller.Protocol.TCP);
> OutputWriter.setDefaultWriteTimeout(30 * 1000);
>
> connectorHandler.connect(new InetSocketAddress("localhost", 9094),
> new CallbackHandler<Context>() {
>
> public void onConnect(IOEvent<Context> e) {
> SelectionKey k = e.attachment().getSelectionKey();
> System.out.println("Callbackhandler: OnConnect...");
> try {
> connectorHandler.finishConnect(k);
> } catch (Exception ex) {
> System.out.println("exception in CallbackHandler:" +
> ex.getMessage());
> }
> e.attachment().getSelectorHandler().register(k,
> SelectionKey.OP_READ);
> }
>
> public void onRead(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnRead...");
> }
>
> public void onWrite(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnWrite...");
> }
> });
>
> ByteBuffer buf = ByteBuffer.allocate(100);
> int amount = 10000;
> long start = System.currentTimeMillis();
> for (int i = 0; i < amount ; i++) {
> buf.put(HEADER_BYTES);
> buf.putInt("stuff".getBytes().length);
> buf.put("stuff".getBytes());
> long size = connectorHandler.write(buf, true);
>
> // this doesnt help
> Thread.yield();
> buf.flip();
> buf.clear();
> }
> Thread.sleep(1000);
> long end = System.currentTimeMillis();
> long elapse = start - end;
> // double throughput = amount/(elapse/1000);
> // System.err.println("Throughput:" + throughput + "/second elapse:"
> + elapse);
> assertTrue("nothing was received", receivedCount > 0);
> assertEquals("Expected:" + amount + " but got:" + receivedCount,
> amount, receivedCount);
>
> connectorHandler.close();
> controller.stop();
>
> }
>
> public class MyClient {
> public MyClient(Controller controller) throws InterruptedException
> {
> TCPSelectorHandler tcpSelector = new TCPSelectorHandler(true);
> tcpSelector.setPort(9090);
> controller.addSelectorHandler(tcpSelector);
> controller.addStateListener(new
> ControllerStateListenerAdapter() {
>
> public void onException(Throwable e) {
> System.out.println("Grizzly controller exception:" +
> e.getMessage());
> }
>
> public void onReady() {
> System.out.println("ClientConnectorReady!");
> }
> });
> new Thread(controller).start();
> Thread.sleep(500);
> }
> }
>
> public static String HEADER = "LL_TCP";
> public static byte[] HEADER_BYTES = HEADER.getBytes();
> byte[] bufferSize = new byte[HEADER.length() + Integer.SIZE/8];
>
> public class Server implements ProtocolFilter {
>
> final ProtocolFilter read = new ReadFilter();
> final TCPSelectorHandler tcpHandler = new TCPSelectorHandler();
> final Controller controller = new Controller();
>
> void execute(int pPort) {
> tcpHandler.setPort(pPort);
> controller.setSelectorHandler(tcpHandler);
>
> final ProtocolFilter myFilter = this;
> controller.setProtocolChainInstanceHandler(new
> DefaultProtocolChainInstanceHandler() {
> public ProtocolChain poll() {
> ProtocolChain protocolChain = protocolChains.poll();
> if (protocolChain == null) {
> protocolChain = new DefaultProtocolChain();
> protocolChain.addFilter(read);
> protocolChain.addFilter(myFilter);
> }
> return protocolChain;
> }
>
> });
> new Thread(controller).start();
> }
>
> public boolean execute(Context ctx) throws IOException {
>
> try {
> final WorkerThread workerThread =
> ((WorkerThread)Thread.currentThread());
> String message = "";
> ByteBuffer buffer = workerThread.getByteBuffer();
> buffer.flip();
>
> /**
> * assemble packets from the stream - yep this is wrong
> but done to
> * break up the conjoined way they are received
> */
> while (buffer.hasRemaining()) {
> byte[] header = new byte[HEADER_BYTES.length];
> ByteBuffer byteBuffer = buffer.get(header);
> int packetLength = buffer.getInt();
> byte[] packet = new byte[packetLength];
> buffer.get(packet);
>
> receivedCount++;
> // if (receivedCount % 1000 == 0) {
> System.err.println(receivedCount + " Got
> Packet:" + new String(packet));
> // }
> }
>
> buffer.clear();
>
> } catch (Throwable t) {
> t.printStackTrace();
> }
> return false;
>
> }
>
> public boolean postExecute(Context ctx) throws IOException {
> return true;
> }
> }
>