users@grizzly.java.net

Do I need connection pooling? Or am I just doing it wrong?

From: Jon Brisbin <jon_at_jbrisbin.com>
Date: Thu, 23 Jun 2011 14:55:58 -0500 (CDT)

I'm trying to talk protocol buffers in a custom client I'm writing and I'm running into some kind of deadlock issue, I think. I could also be exhausting the thread pool unawares (I really want to queue this stuff somewhere...it's just not obvious to me how to do that). I'm able to write and read from the server if I block until the response is fully processed (request written and response processed). But if I try and write multiple requests on the same connection, something gets deadlocked somewhere and I have no idea where.

I'm setting up my transport like this (workerPool is a fixed thread pool at availableProcessors() * 4):

transport.setKeepAlive(true);
transport.setTcpNoDelay(true);
transport.setWorkerThreadPool(workerPool);
transport.setIOStrategy(WorkerThreadIOStrategy.getInstance());

My filters are:

clientChainBuilder.add(new TransportFilter());
clientChainBuilder.add(new RpbFilter(heap));
clientChainBuilder.add(new PendingRequestFilter());

RpbFilter is a protocol buffers encoder/decoder. The PendingRequestFilter is part of my client and is supposed to call the "saved" Future that was created for this request to pass back the value. Here's an excerpt:

@Override public NextAction handleWrite(FilterChainContext ctx) throws IOException {
RpbRequest pending = ctx.getMessage();
pendingRequests.add(pending);
ctx.setMessage(pending.getMessage());
return ctx.getInvokeAction();
}

When a response comes back (which is, I think, where I'm getting messed up), I handle it like this (again, excerpted):

@Override public NextAction handleRead(final FilterChainContext ctx) throws IOException {
final RpbMessage<?> msg = ctx.getMessage();
final RpbRequest pending = pendingRequests.peek();
if (null == pendingRequests.peek()) {
log.warn("Got a response when no pending requests available!");
return ctx.getStopAction();
} else {
pendingRequests.remove(pending);
}
...process the protobuf object and create an application-specific one to pass to the client requestor...
if (null != response && null != pending.getCallback())
pending.getCallback().completed(response);

The project is here (tests pass now because I'm blocking):

https://github.com/jbrisbin/riak-async-java-client

Any help you could give me here would be greatly appreciated! :)

Thanks!

Jon Brisbin
http//jbrisbin.com