Hello Neil,
basically you have 2 issues :)
1) java.io.IOException: Client disconnected, timeout:30000 attempts:11
This is Grizzly bug, which I've just fixed [1]. So if you will build
Grizzly from trunk - you should not see this problem any more.
2) bufferunderflow
This happens because under load ReadFilter doesn't read whole the
packet you need, but just half of it.
So ByteBuffer, which comes to your Server Filter contains just part of
message, and when you try to get it to the byte[] - it throws
exception, because byte[].length > ByteBuffer.remaining().
I would suggest you to take a look at ParserProtocolFilter code and
example how you can implement custom protocol parser [2]. IMHO Server
ProtocolFilter, which you have right now, should be realized as Parser.
Thanks.
WBR,
Alexey.
[1]
https://grizzly.dev.java.net/issues/show_bug.cgi?id=141
[2]
http://weblogs.java.net/blog/sdo/archive/2007/12/grizzly_protoco.html
On May 17, 2008, at 19:54 , Neil Avery wrote:
> Ive been trying to setup a one-way tcp streaming through grizzly.
> The code is below - after sending through 65-70Kmessages (in a
> couple of seconds) - I receive the exception below. Note, Ive
> increased the timeout and attempt value on the OutputWriter. On the
> serverside I can get bufferunderflows...
> Is there something I can tune or fixable?
> Thanks & Regards Neil
> java.io.IOException: Client disconnected, timeout:30000 attempts:11
> at
> com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:128)
> at
> com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:73)
> at
> com.sun.grizzly.TCPConnectorHandler.write(TCPConnectorHandler.java:
> 416)
> at
> com.stuff.net.GrizzlyTest.testGrizzlyThroughput(GrizzlyTest.java:82)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun
> .reflect
> .NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun
> .reflect
> .DelegatingMethodAccessorImpl
> .invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at junit.framework.TestCase.runTest(TestCase.java:168)
> at junit.framework.TestCase.runBare(TestCase.java:134)
> at junit.framework.TestResult$1.protect(TestResult.java:110)
>
> The test is very simple..........as follows...
>
> public class GrizzlyTest extends TestCase {
>
> int receivedCount = 0;
>
> public void testGrizzlyThroughput() throws Exception {
>
> Server s = new Server();
> s.execute(9094);
>
> Controller controller = new Controller();
> MyClient myClient = new MyClient(controller);
>
> final TCPConnectorHandler connectorHandler =
> (TCPConnectorHandler)
> controller.acquireConnectorHandler(Controller.Protocol.TCP);
> OutputWriter.setDefaultWriteTimeout(30 * 1000);
>
> connectorHandler.connect(new InetSocketAddress("localhost",
> 9094), new CallbackHandler<Context>() {
>
> public void onConnect(IOEvent<Context> e) {
> SelectionKey k = e.attachment().getSelectionKey();
> System.out.println("Callbackhandler: OnConnect...");
> try {
> connectorHandler.finishConnect(k);
> } catch (Exception ex) {
> System.out.println("exception in
> CallbackHandler:" + ex.getMessage());
> }
> e.attachment().getSelectorHandler().register(k,
> SelectionKey.OP_READ);
> }
>
> public void onRead(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnRead...");
> }
>
> public void onWrite(IOEvent<Context> e) {
> System.out.println("Callbackhandler: OnWrite...");
> }
> });
>
> ByteBuffer buf = ByteBuffer.allocate(100);
> int amount = 10000;
> long start = System.currentTimeMillis();
> for (int i = 0; i < amount ; i++) {
> buf.put(HEADER_BYTES);
> buf.putInt("stuff".getBytes().length);
> buf.put("stuff".getBytes());
> long size = connectorHandler.write(buf, true);
>
> // this doesnt help
> Thread.yield();
> buf.flip();
> buf.clear();
> }
> Thread.sleep(1000);
> long end = System.currentTimeMillis();
> long elapse = start - end;
> // double throughput = amount/(elapse/1000);
> // System.err.println("Throughput:" + throughput + "/second
> elapse:" + elapse);
> assertTrue("nothing was received", receivedCount > 0);
> assertEquals("Expected:" + amount + " but got:" +
> receivedCount, amount, receivedCount);
>
> connectorHandler.close();
> controller.stop();
>
> }
>
> public class MyClient {
> public MyClient(Controller controller) throws
> InterruptedException {
> TCPSelectorHandler tcpSelector = new
> TCPSelectorHandler(true);
> tcpSelector.setPort(9090);
> controller.addSelectorHandler(tcpSelector);
> controller.addStateListener(new
> ControllerStateListenerAdapter() {
>
> public void onException(Throwable e) {
> System.out.println("Grizzly controller
> exception:" + e.getMessage());
> }
>
> public void onReady() {
> System.out.println("ClientConnectorReady!");
> }
> });
> new Thread(controller).start();
> Thread.sleep(500);
> }
> }
>
> public static String HEADER = "LL_TCP";
> public static byte[] HEADER_BYTES = HEADER.getBytes();
> byte[] bufferSize = new byte[HEADER.length() + Integer.SIZE/8];
>
> public class Server implements ProtocolFilter {
>
> final ProtocolFilter read = new ReadFilter();
> final TCPSelectorHandler tcpHandler = new
> TCPSelectorHandler();
> final Controller controller = new Controller();
>
> void execute(int pPort) {
> tcpHandler.setPort(pPort);
> controller.setSelectorHandler(tcpHandler);
>
> final ProtocolFilter myFilter = this;
> controller.setProtocolChainInstanceHandler(new
> DefaultProtocolChainInstanceHandler() {
> public ProtocolChain poll() {
> ProtocolChain protocolChain =
> protocolChains.poll();
> if (protocolChain == null) {
> protocolChain = new DefaultProtocolChain();
> protocolChain.addFilter(read);
> protocolChain.addFilter(myFilter);
> }
> return protocolChain;
> }
>
> });
> new Thread(controller).start();
> }
>
> public boolean execute(Context ctx) throws IOException {
>
> try {
> final WorkerThread workerThread =
> ((WorkerThread)Thread.currentThread());
> String message = "";
> ByteBuffer buffer = workerThread.getByteBuffer();
> buffer.flip();
>
> /**
> * assemble packets from the stream - yep this is
> wrong but done to
> * break up the conjoined way they are received
> */
> while (buffer.hasRemaining()) {
> byte[] header = new byte[HEADER_BYTES.length];
> ByteBuffer byteBuffer = buffer.get(header);
> int packetLength = buffer.getInt();
> byte[] packet = new byte[packetLength];
> buffer.get(packet);
>
> receivedCount++;
> // if (receivedCount % 1000 == 0) {
> System.err.println(receivedCount + " Got
> Packet:" + new String(packet));
> // }
> }
>
> buffer.clear();
>
> } catch (Throwable t) {
> t.printStackTrace();
> }
> return false;
>
> }
>
> public boolean postExecute(Context ctx) throws IOException {
> return true;
> }
> }