Ahh...yes. I'm actually using the snapshot version of the Riak java driver. You'll have to check that out separately and make sure it's installed in your local maven repo:
https://github.com/basho/riak-java-client/
The tests should fail on the last one (the throughput test).
I was troubleshooting it a little yesterday and it seems I'm not getting all the responses I'm expecting or something along those lines. I tried counting requests and responses and they don't match.
You'll also want to do a fresh git pull of my source to get the latest changes.
Thanks!
Jon Brisbin
http//jbrisbin.com
----- Original Message -----
> From: "Oleksiy Stashok" <oleksiy.stashok_at_oracle.com>
> To: users_at_grizzly.java.net
> Sent: Monday, June 27, 2011 11:48:15 AM
> Subject: Re: Do I need connection pooling? Or am I just doing it
> wrong?
> On 06/25/2011 02:41 PM, Jon Brisbin wrote:
> > I put a rewind in there. Was that not enough?
>
> That should be fine.
> Jon, can I ask you to provide some build instructions for the async
> client?
> I'm trying to build project using
> ./gradlew build
> but see a lot of package misses (see example bellow). Probably some
> of them are protobuf classes, but not sure if all.
> Thanks.
> WBR,
> Alexey.
> /home/oleksiys/Projects/riak-async-java-client/src/main/java/com/jbrisbin/riak/async/RiakAsyncClient.java:17:
> com.basho.riak.pbc.RiakMessageCodes is not public in
> com.basho.riak.pbc; cannot be accessed from outside package
> import static com.basho.riak.pbc.RiakMessageCodes.*;
> ^
> /home/oleksiys/Projects/riak-async-java-client/src/main/java/com/jbrisbin/riak/async/RiakAsyncClient.java:37:
> cannot find symbol
> symbol : class IRiakObject
> location: package com.basho.riak.client
> import com.basho.riak.client.IRiakObject;
> ^
> /home/oleksiys/Projects/riak-async-java-client/src/main/java/com/jbrisbin/riak/async/RiakAsyncClient.java:38:
> package com.basho.riak.client.bucket does not exist
> import com.basho.riak.client.bucket.BucketProperties;
> > Sent from my iPhone
>
> > On Jun 25, 2011, at 2:54 AM, Oleksiy Stashok <
> > oleksiy.stashok_at_oracle.com > wrote:
>
> > > Hi Jon,
> >
>
> > > Have you tried to fix the RpbFilter decoding (see my prev.
> > > email)?
> >
>
> > > WBR,
> >
>
> > > Alexey.
> >
>
> > > On 06/25/2011 01:27 AM, Jon Brisbin wrote:
> >
>
> > > > I updated the project to make the tests hang. To successfully
> > > > execute
> > > > them, you'd need a Riak installation locally on the default
> > > > port.
> > >
> >
>
> > > > It will hang on the future.get() call on line 122 because the
> > > > responses are getting out-of-sync with the requests.
> > >
> >
>
> > > > https://github.com/jbrisbin/riak-async-java-client/blob/master/src/test/groovy/com/jbrisbin/riak/async/RiakAsyncClientSpec.groovy
> > >
> >
>
> > > > Is there anything I should be doing on my end to do some
> > > > request/response accounting? I can't tell whether the problem
> > > > is
> > > > in
> > > > the client or in the server-side implementation of protobuf...
> > >
> >
>
> > > > Thanks for the help! :)
> > >
> >
>
> > > > Jon Brisbin
> > >
> >
>
> > > > http//jbrisbin.com
> > >
> >
>
> > > > ----- Original Message -----
> > >
> >
>
> > > > > From: "Oleksiy Stashok" < oleksiy.stashok_at_oracle.com >
> > > >
> > >
> >
>
> > > > > To: users_at_grizzly.java.net
> > > >
> > >
> >
>
> > > > > Sent: Friday, June 24, 2011 4:38:54 AM
> > > >
> > >
> >
>
> > > > > Subject: Re: Do I need connection pooling? Or am I just doing
> > > > > it
> > > > > wrong?
> > > >
> > >
> >
>
> > > > > Hi Jon,
> > > >
> > >
> >
>
> > > > > it would help if you can commit the test, which doesn't work
> > > > > :)
> > > >
> > >
> >
>
> > > > > Otherwise code looks fine, except here in RpbFilter
> > > >
> > >
> >
>
> > > > > @Override public NextAction handleRead ( FilterChainContext
> > > > > ctx
> > > > > )
> > > > > throws IOException {
> > > >
> > >
> >
>
> > > > > Buffer buffer = ctx . getMessage ();
> > > >
> > >
> >
>
> > > > > int size = buffer . getInt ();
> > > >
> > >
> >
>
> > > > > int code = buffer . get ();
> > > >
> > >
> >
>
> > > > > if ( buffer . remaining () < ( size - 1 )) {
> > > >
> > >
> >
>
> > > > > return ctx . getStopAction ( buffer );
> > > >
> > >
> >
>
> > > > > }
> > > >
> > >
> >
>
> > > > > You'd probably need to restore original buffer position,
> > > > > before
> > > > > returning ctx.getStopAction(buffer), otherwise you save a
> > > > > Buffer
> > > > > with a shifted position, so next time when you read size and
> > > > > code
> > > > > it
> > > > > will be corrupted.
> > > >
> > >
> >
>
> > > > > WBR,
> > > >
> > >
> >
>
> > > > > Alexey.
> > > >
> > >
> >
>
> > > > > On 06/23/2011 09:55 PM, Jon Brisbin wrote:
> > > >
> > >
> >
>
> > > > > > I'm trying to talk protocol buffers in a custom client I'm
> > > > > > writing
> > > > > > and I'm running into some kind of deadlock issue, I think.
> > > > > > I
> > > > > > could
> > > > > > also be exhausting the thread pool unawares (I really want
> > > > > > to
> > > > > > queue
> > > > > > this stuff somewhere...it's just not obvious to me how to
> > > > > > do
> > > > > > that).
> > > > > > I'm able to write and read from the server if I block until
> > > > > > the
> > > > > > response is fully processed (request written and response
> > > > > > processed). But if I try and write multiple requests on the
> > > > > > same
> > > > > > connection, something gets deadlocked somewhere and I have
> > > > > > no
> > > > > > idea
> > > > > > where.
> > > > >
> > > >
> > >
> >
>
> > > > > > I'm setting up my transport like this (workerPool is a
> > > > > > fixed
> > > > > > thread
> > > > > > pool at availableProcessors() * 4):
> > > > >
> > > >
> > >
> >
>
> > > > > > transport.setKeepAlive(true);
> > > > >
> > > >
> > >
> >
>
> > > > > > transport.setTcpNoDelay(true);
> > > > >
> > > >
> > >
> >
>
> > > > > > transport.setWorkerThreadPool(workerPool);
> > > > >
> > > >
> > >
> >
>
> > > > > > transport.setIOStrategy(WorkerThreadIOStrategy.getInstance());
> > > > >
> > > >
> > >
> >
>
> > > > > > My filters are:
> > > > >
> > > >
> > >
> >
>
> > > > > > clientChainBuilder.add(new TransportFilter());
> > > > >
> > > >
> > >
> >
>
> > > > > > clientChainBuilder.add(new RpbFilter(heap));
> > > > >
> > > >
> > >
> >
>
> > > > > > clientChainBuilder.add(new PendingRequestFilter());
> > > > >
> > > >
> > >
> >
>
> > > > > > RpbFilter is a protocol buffers encoder/decoder. The
> > > > > > PendingRequestFilter is part of my client and is supposed
> > > > > > to
> > > > > > call
> > > > > > the "saved" Future that was created for this request to
> > > > > > pass
> > > > > > back
> > > > > > the value. Here's an excerpt:
> > > > >
> > > >
> > >
> >
>
> > > > > > @Override public NextAction handleWrite(FilterChainContext
> > > > > > ctx)
> > > > > > throws IOException {
> > > > >
> > > >
> > >
> >
>
> > > > > > RpbRequest pending = ctx.getMessage();
> > > > >
> > > >
> > >
> >
>
> > > > > > pendingRequests.add(pending);
> > > > >
> > > >
> > >
> >
>
> > > > > > ctx.setMessage(pending.getMessage());
> > > > >
> > > >
> > >
> >
>
> > > > > > return ctx.getInvokeAction();
> > > > >
> > > >
> > >
> >
>
> > > > > > }
> > > > >
> > > >
> > >
> >
>
> > > > > > When a response comes back (which is, I think, where I'm
> > > > > > getting
> > > > > > messed up), I handle it like this (again, excerpted):
> > > > >
> > > >
> > >
> >
>
> > > > > > @Override public NextAction handleRead(final
> > > > > > FilterChainContext
> > > > > > ctx)
> > > > > > throws IOException {
> > > > >
> > > >
> > >
> >
>
> > > > > > final RpbMessage<?> msg = ctx.getMessage();
> > > > >
> > > >
> > >
> >
>
> > > > > > final RpbRequest pending = pendingRequests.peek();
> > > > >
> > > >
> > >
> >
>
> > > > > > if (null == pendingRequests.peek()) {
> > > > >
> > > >
> > >
> >
>
> > > > > > log.warn("Got a response when no pending requests
> > > > > > available!");
> > > > >
> > > >
> > >
> >
>
> > > > > > return ctx.getStopAction();
> > > > >
> > > >
> > >
> >
>
> > > > > > } else {
> > > > >
> > > >
> > >
> >
>
> > > > > > pendingRequests.remove(pending);
> > > > >
> > > >
> > >
> >
>
> > > > > > }
> > > > >
> > > >
> > >
> >
>
> > > > > > ...process the protobuf object and create an
> > > > > > application-specific
> > > > > > one
> > > > > > to pass to the client requestor...
> > > > >
> > > >
> > >
> >
>
> > > > > > if (null != response && null != pending.getCallback())
> > > > >
> > > >
> > >
> >
>
> > > > > > pending.getCallback().completed(response);
> > > > >
> > > >
> > >
> >
>
> > > > > > The project is here (tests pass now because I'm blocking):
> > > > >
> > > >
> > >
> >
>
> > > > > > https://github.com/jbrisbin/riak-async-java-client
> > > > >
> > > >
> > >
> >
>
> > > > > > Any help you could give me here would be greatly
> > > > > > appreciated!
> > > > > > :)
> > > > >
> > > >
> > >
> >
>
> > > > > > Thanks!
> > > > >
> > > >
> > >
> >
>
> > > > > > Jon Brisbin
> > > > >
> > > >
> > >
> >
>
> > > > > > http//jbrisbin.com
> > > > >
> > > >
> > >
> >
>