On 06/25/2011 02:41 PM, Jon Brisbin wrote:
> I put a rewind in there. Was that not enough?
That should be fine.
Jon, can I ask you to provide some build instructions for the async client?
I'm trying to build project using
./gradlew build
but see a lot of package misses (see example bellow). Probably some of
them are protobuf classes, but not sure if all.
Thanks.
WBR,
Alexey.
/home/oleksiys/Projects/riak-async-java-client/src/main/java/com/jbrisbin/riak/async/RiakAsyncClient.java:17:
com.basho.riak.pbc.RiakMessageCodes is not public in com.basho.riak.pbc;
cannot be accessed from outside package
import static com.basho.riak.pbc.RiakMessageCodes.*;
^
/home/oleksiys/Projects/riak-async-java-client/src/main/java/com/jbrisbin/riak/async/RiakAsyncClient.java:37:
cannot find symbol
symbol : class IRiakObject
location: package com.basho.riak.client
import com.basho.riak.client.IRiakObject;
^
/home/oleksiys/Projects/riak-async-java-client/src/main/java/com/jbrisbin/riak/async/RiakAsyncClient.java:38:
package com.basho.riak.client.bucket does not exist
import com.basho.riak.client.bucket.BucketProperties;
>
> Sent from my iPhone
>
> On Jun 25, 2011, at 2:54 AM, Oleksiy Stashok
> <oleksiy.stashok_at_oracle.com <mailto:oleksiy.stashok_at_oracle.com>> wrote:
>
>> Hi Jon,
>>
>> Have you tried to fix the RpbFilter decoding (see my prev. email)?
>>
>> WBR,
>> Alexey.
>>
>> On 06/25/2011 01:27 AM, Jon Brisbin wrote:
>>> I updated the project to make the tests hang. To successfully
>>> execute them, you'd need a Riak installation locally on the default
>>> port.
>>>
>>> It will hang on the future.get() call on line 122 because the
>>> responses are getting out-of-sync with the requests.
>>>
>>> https://github.com/jbrisbin/riak-async-java-client/blob/master/src/test/groovy/com/jbrisbin/riak/async/RiakAsyncClientSpec.groovy
>>>
>>> Is there anything I should be doing on my end to do some
>>> request/response accounting? I can't tell whether the problem is in
>>> the client or in the server-side implementation of protobuf...
>>>
>>> Thanks for the help! :)
>>>
>>>
>>> Jon Brisbin
>>> http//jbrisbin.com
>>>
>>>
>>> ------------------------------------------------------------------------
>>>
>>> *From: *"Oleksiy Stashok" <oleksiy.stashok_at_oracle.com
>>> <mailto:oleksiy.stashok_at_oracle.com>>
>>> *To: *users_at_grizzly.java.net <mailto:users_at_grizzly.java.net>
>>> *Sent: *Friday, June 24, 2011 4:38:54 AM
>>> *Subject: *Re: Do I need connection pooling? Or am I just doing
>>> it wrong?
>>>
>>> Hi Jon,
>>>
>>> it would help if you can commit the test, which doesn't work :)
>>>
>>> Otherwise code looks fine, except here in RpbFilter
>>>
>>> @Override public NextAction handleRead(FilterChainContext ctx)
>>> throws IOException {
>>> Buffer buffer = ctx.getMessage();
>>> int size = buffer.getInt();
>>> int code = buffer.get();
>>> if (buffer.remaining() < (size - 1)) {
>>> return ctx.getStopAction(buffer);
>>> }
>>>
>>>
>>> You'd probably need to restore original buffer position, before
>>> returning ctx.getStopAction(buffer), otherwise you save a Buffer
>>> with a shifted position, so next time when you read size and
>>> code it will be corrupted.
>>>
>>> WBR,
>>> Alexey.
>>>
>>> On 06/23/2011 09:55 PM, Jon Brisbin wrote:
>>>
>>> I'm trying to talk protocol buffers in a custom client I'm
>>> writing and I'm running into some kind of deadlock issue, I
>>> think. I could also be exhausting the thread pool unawares
>>> (I really want to queue this stuff somewhere...it's just not
>>> obvious to me how to do that). I'm able to write and read
>>> from the server if I block until the response is fully
>>> processed (request written and response processed). But if I
>>> try and write multiple requests on the same connection,
>>> something gets deadlocked somewhere and I have no idea where.
>>>
>>> I'm setting up my transport like this (workerPool is a fixed
>>> thread pool at availableProcessors() * 4):
>>>
>>> transport.setKeepAlive(true);
>>> transport.setTcpNoDelay(true);
>>> transport.setWorkerThreadPool(workerPool);
>>>
>>> transport.setIOStrategy(WorkerThreadIOStrategy.getInstance());
>>>
>>> My filters are:
>>>
>>> clientChainBuilder.add(new TransportFilter());
>>> clientChainBuilder.add(new RpbFilter(heap));
>>> clientChainBuilder.add(new PendingRequestFilter());
>>>
>>> RpbFilter is a protocol buffers encoder/decoder. The
>>> PendingRequestFilter is part of my client and is supposed to
>>> call the "saved" Future that was created for this request to
>>> pass back the value. Here's an excerpt:
>>>
>>> @Override public NextAction
>>> handleWrite(FilterChainContext ctx) throws IOException {
>>> RpbRequest pending = ctx.getMessage();
>>> pendingRequests.add(pending);
>>> ctx.setMessage(pending.getMessage());
>>> return ctx.getInvokeAction();
>>> }
>>>
>>> When a response comes back (which is, I think, where I'm
>>> getting messed up), I handle it like this (again, excerpted):
>>>
>>> @Override public NextAction handleRead(final
>>> FilterChainContext ctx) throws IOException {
>>> final RpbMessage<?> msg = ctx.getMessage();
>>> final RpbRequest pending = pendingRequests.peek();
>>> if (null == pendingRequests.peek()) {
>>> log.warn("Got a response when no pending
>>> requests available!");
>>> return ctx.getStopAction();
>>> } else {
>>> pendingRequests.remove(pending);
>>> }
>>> ...process the protobuf object and create an
>>> application-specific one to pass to the client requestor...
>>> if (null != response && null !=
>>> pending.getCallback())
>>> pending.getCallback().completed(response);
>>>
>>> The project is here (tests pass now because I'm blocking):
>>>
>>> https://github.com/jbrisbin/riak-async-java-client
>>>
>>> Any help you could give me here would be greatly appreciated! :)
>>>
>>> Thanks!
>>>
>>> Jon Brisbin
>>> http//jbrisbin.com
>>>
>>>
>>>
>>>
>>