users@grizzly.java.net

Re: Do I need connection pooling? Or am I just doing it wrong?

From: Oleksiy Stashok <oleksiy.stashok_at_oracle.com>
Date: Sat, 25 Jun 2011 09:54:25 +0200

Hi Jon,

Have you tried to fix the RpbFilter decoding (see my prev. email)?

WBR,
Alexey.

On 06/25/2011 01:27 AM, Jon Brisbin wrote:
> I updated the project to make the tests hang. To successfully execute
> them, you'd need a Riak installation locally on the default port.
>
> It will hang on the future.get() call on line 122 because the
> responses are getting out-of-sync with the requests.
>
> https://github.com/jbrisbin/riak-async-java-client/blob/master/src/test/groovy/com/jbrisbin/riak/async/RiakAsyncClientSpec.groovy
>
> Is there anything I should be doing on my end to do some
> request/response accounting? I can't tell whether the problem is in
> the client or in the server-side implementation of protobuf...
>
> Thanks for the help! :)
>
>
> Jon Brisbin
> http//jbrisbin.com
>
>
> ------------------------------------------------------------------------
>
> *From: *"Oleksiy Stashok" <oleksiy.stashok_at_oracle.com>
> *To: *users_at_grizzly.java.net
> *Sent: *Friday, June 24, 2011 4:38:54 AM
> *Subject: *Re: Do I need connection pooling? Or am I just doing it
> wrong?
>
> Hi Jon,
>
> it would help if you can commit the test, which doesn't work :)
>
> Otherwise code looks fine, except here in RpbFilter
>
> @Override public NextAction handleRead(FilterChainContext ctx)
> throws IOException {
> Buffer buffer = ctx.getMessage();
> int size = buffer.getInt();
> int code = buffer.get();
> if (buffer.remaining() < (size - 1)) {
> return ctx.getStopAction(buffer);
> }
>
>
> You'd probably need to restore original buffer position, before
> returning ctx.getStopAction(buffer), otherwise you save a Buffer
> with a shifted position, so next time when you read size and code
> it will be corrupted.
>
> WBR,
> Alexey.
>
> On 06/23/2011 09:55 PM, Jon Brisbin wrote:
>
> I'm trying to talk protocol buffers in a custom client I'm
> writing and I'm running into some kind of deadlock issue, I
> think. I could also be exhausting the thread pool unawares (I
> really want to queue this stuff somewhere...it's just not
> obvious to me how to do that). I'm able to write and read from
> the server if I block until the response is fully processed
> (request written and response processed). But if I try and
> write multiple requests on the same connection, something gets
> deadlocked somewhere and I have no idea where.
>
> I'm setting up my transport like this (workerPool is a fixed
> thread pool at availableProcessors() * 4):
>
> transport.setKeepAlive(true);
> transport.setTcpNoDelay(true);
> transport.setWorkerThreadPool(workerPool);
>
> transport.setIOStrategy(WorkerThreadIOStrategy.getInstance());
>
> My filters are:
>
> clientChainBuilder.add(new TransportFilter());
> clientChainBuilder.add(new RpbFilter(heap));
> clientChainBuilder.add(new PendingRequestFilter());
>
> RpbFilter is a protocol buffers encoder/decoder. The
> PendingRequestFilter is part of my client and is supposed to
> call the "saved" Future that was created for this request to
> pass back the value. Here's an excerpt:
>
> @Override public NextAction
> handleWrite(FilterChainContext ctx) throws IOException {
> RpbRequest pending = ctx.getMessage();
> pendingRequests.add(pending);
> ctx.setMessage(pending.getMessage());
> return ctx.getInvokeAction();
> }
>
> When a response comes back (which is, I think, where I'm
> getting messed up), I handle it like this (again, excerpted):
>
> @Override public NextAction handleRead(final
> FilterChainContext ctx) throws IOException {
> final RpbMessage<?> msg = ctx.getMessage();
> final RpbRequest pending = pendingRequests.peek();
> if (null == pendingRequests.peek()) {
> log.warn("Got a response when no pending
> requests available!");
> return ctx.getStopAction();
> } else {
> pendingRequests.remove(pending);
> }
> ...process the protobuf object and create an
> application-specific one to pass to the client requestor...
> if (null != response && null != pending.getCallback())
> pending.getCallback().completed(response);
>
> The project is here (tests pass now because I'm blocking):
>
> https://github.com/jbrisbin/riak-async-java-client
>
> Any help you could give me here would be greatly appreciated! :)
>
> Thanks!
>
> Jon Brisbin
> http//jbrisbin.com
>
>
>
>