Ive been trying to setup a one-way tcp streaming through grizzly. The code
is below - after sending through 65-70Kmessages (in a couple of seconds) - I
receive the exception below. Note, Ive increased the timeout and attempt
value on the OutputWriter. On the serverside I can get bufferunderflows...
Is there something I can tune or fixable?
Thanks & Regards Neil
java.io.IOException: Client disconnected, timeout:30000 attempts:11
at com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:128)
at com.sun.grizzly.util.OutputWriter.flushChannel(OutputWriter.java:73)
at
com.sun.grizzly.TCPConnectorHandler.write(TCPConnectorHandler.java:416)
at com.stuff.net.GrizzlyTest.testGrizzlyThroughput(GrizzlyTest.java:82)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at junit.framework.TestCase.runTest(TestCase.java:168)
at junit.framework.TestCase.runBare(TestCase.java:134)
at junit.framework.TestResult$1.protect(TestResult.java:110)
The test is very simple..........as follows...
public class GrizzlyTest extends TestCase {
int receivedCount = 0;
public void testGrizzlyThroughput() throws Exception {
Server s = new Server();
s.execute(9094);
Controller controller = new Controller();
MyClient myClient = new MyClient(controller);
final TCPConnectorHandler connectorHandler = (TCPConnectorHandler)
controller.acquireConnectorHandler(Controller.Protocol.TCP);
OutputWriter.setDefaultWriteTimeout(30 * 1000);
connectorHandler.connect(new InetSocketAddress("localhost", 9094),
new CallbackHandler<Context>() {
public void onConnect(IOEvent<Context> e) {
SelectionKey k = e.attachment().getSelectionKey();
System.out.println("Callbackhandler: OnConnect...");
try {
connectorHandler.finishConnect(k);
} catch (Exception ex) {
System.out.println("exception in CallbackHandler:" +
ex.getMessage());
}
e.attachment().getSelectorHandler().register(k,
SelectionKey.OP_READ);
}
public void onRead(IOEvent<Context> e) {
System.out.println("Callbackhandler: OnRead...");
}
public void onWrite(IOEvent<Context> e) {
System.out.println("Callbackhandler: OnWrite...");
}
});
ByteBuffer buf = ByteBuffer.allocate(100);
int amount = 10000;
long start = System.currentTimeMillis();
for (int i = 0; i < amount ; i++) {
buf.put(HEADER_BYTES);
buf.putInt("stuff".getBytes().length);
buf.put("stuff".getBytes());
long size = connectorHandler.write(buf, true);
// this doesnt help
Thread.yield();
buf.flip();
buf.clear();
}
Thread.sleep(1000);
long end = System.currentTimeMillis();
long elapse = start - end;
// double throughput = amount/(elapse/1000);
// System.err.println("Throughput:" + throughput + "/second elapse:"
+ elapse);
assertTrue("nothing was received", receivedCount > 0);
assertEquals("Expected:" + amount + " but got:" + receivedCount,
amount, receivedCount);
connectorHandler.close();
controller.stop();
}
public class MyClient {
public MyClient(Controller controller) throws InterruptedException {
TCPSelectorHandler tcpSelector = new TCPSelectorHandler(true);
tcpSelector.setPort(9090);
controller.addSelectorHandler(tcpSelector);
controller.addStateListener(new ControllerStateListenerAdapter()
{
public void onException(Throwable e) {
System.out.println("Grizzly controller exception:" +
e.getMessage());
}
public void onReady() {
System.out.println("ClientConnectorReady!");
}
});
new Thread(controller).start();
Thread.sleep(500);
}
}
public static String HEADER = "LL_TCP";
public static byte[] HEADER_BYTES = HEADER.getBytes();
byte[] bufferSize = new byte[HEADER.length() + Integer.SIZE/8];
public class Server implements ProtocolFilter {
final ProtocolFilter read = new ReadFilter();
final TCPSelectorHandler tcpHandler = new TCPSelectorHandler();
final Controller controller = new Controller();
void execute(int pPort) {
tcpHandler.setPort(pPort);
controller.setSelectorHandler(tcpHandler);
final ProtocolFilter myFilter = this;
controller.setProtocolChainInstanceHandler(new
DefaultProtocolChainInstanceHandler() {
public ProtocolChain poll() {
ProtocolChain protocolChain = protocolChains.poll();
if (protocolChain == null) {
protocolChain = new DefaultProtocolChain();
protocolChain.addFilter(read);
protocolChain.addFilter(myFilter);
}
return protocolChain;
}
});
new Thread(controller).start();
}
public boolean execute(Context ctx) throws IOException {
try {
final WorkerThread workerThread =
((WorkerThread)Thread.currentThread());
String message = "";
ByteBuffer buffer = workerThread.getByteBuffer();
buffer.flip();
/**
* assemble packets from the stream - yep this is wrong
but done to
* break up the conjoined way they are received
*/
while (buffer.hasRemaining()) {
byte[] header = new byte[HEADER_BYTES.length];
ByteBuffer byteBuffer = buffer.get(header);
int packetLength = buffer.getInt();
byte[] packet = new byte[packetLength];
buffer.get(packet);
receivedCount++;
// if (receivedCount % 1000 == 0) {
System.err.println(receivedCount + " Got Packet:"
+ new String(packet));
// }
}
buffer.clear();
} catch (Throwable t) {
t.printStackTrace();
}
return false;
}
public boolean postExecute(Context ctx) throws IOException {
return true;
}
}