Hmmmm in the end it seems that this sessionCode story has no effect at all!
forget my previous messages.
The workflow of my parallel tasks is:
1) Get a sessionCode (through an AHC request, or cached)
2) Upload a document, using that sessionCode
The problem is that when I use a sessionCode cache, the very first AHC
request done is a document upload through the FeedableBodyGenerator, which
as we previously saw, could lead to an OOM if the
initializeAsynchronousTransfer is set too late and too much Buffers are
already written to the queue.
When I don't use a sessionCode cache, before the upload, the application
thread does an AHC request to get this sessionCode. Thus I guess the AHC is
"warm". So at the time of the upload, everything in AHC is probably
initialized correctly, and the initializeAsynchronousTransfer is called
faster.
This AHC things that seems to take time to initialize seems to be bound to
the applicative thread that make the request.
In my test I simulate 10 applicative threads performing the same Runnable
task:
private Runnable uploadSingleDocumentRunnable = new Runnable() {
@Override
public void run() {
try {
uploadSingleDocument(); // This retrieves the sessionCode from
cache and uploads the doc using the sessionCode
} catch ( Exception e ) {
//throw new RuntimeException("file upload failed",e);
}
}
};
The above doesn't work because only 1 of the 10 threads will make an AHC
request to retrieve the sessionCode, the result being cached (Guava Cache)
If before starting the 10 threads I add a loop that perform 10 AHC requests
to get 10 sessionCode (that won't be used anyway), it doesn't work either.
But if I rework the code above and do:
private Runnable uploadSingleDocumentRunnable = new Runnable() {
@Override
public void run() {
try {
getSessionCodeWithAHCRequestThatWontBeUsed();
uploadSingleDocument(); // This retrieves the sessionCode from
cache and uploads the doc using the sessionCode
} catch ( Exception e ) {
//throw new RuntimeException("file upload failed",e);
}
}
};
Then it works perfectly fine.
It seems that the object that takes time to initialize may be
the org.glassfish.grizzly.ThreadCache right? The ThreadLocalPool seems to
be bound to worker threads while my applicative has a ThreadCache with
buffers in ThreadLocal.
Anyway it's not a problem, I think the FeedableBodyGenerator.feed() method
just has to block until a context has been (and ThreadCache initialized) to
avoid OOM errors
2013/9/5 Sébastien Lorber <lorber.sebastien_at_gmail.com>
>
> What is very strange is that I tested with/without the same sessionCode
> with our previous code, the one not using FeedableBodyGenerator, which has
> a high memory consumption.
> Despites the fact it had high memory consumption, it seems work fine to
> upload multiple documents if allocated with a large heap, and the
> sessionCode seems to have no effect.
>
> On the new impl using the FeedableBodyGenerator, the sessionCode sent as a
> multipart bodypart seems to have an effect.
>
> I have tried to feed the queue before sending the request to AHC, but this
> leads to this exception (with/without sessionCode switching)
> Caused by: java.util.concurrent.TimeoutException: Timeout exceeded
> at
> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider.timeout(GrizzlyAsyncHttpProvider.java:528)
> at
> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$3.onTimeout(GrizzlyAsyncHttpProvider.java:361)
> at
> org.glassfish.grizzly.utils.IdleTimeoutFilter$DefaultWorker.doWork(IdleTimeoutFilter.java:383)
> at
> org.glassfish.grizzly.utils.IdleTimeoutFilter$DefaultWorker.doWork(IdleTimeoutFilter.java:362)
> at
> org.glassfish.grizzly.utils.DelayedExecutor$DelayedRunnable.run(DelayedExecutor.java:158)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
>
>
>
>
>
> 2013/9/5 Sébastien Lorber <lorber.sebastien_at_gmail.com>
>
>> By the way, by using a low timeout with the same sessioncode, I got the
>> following NPE:
>>
>> Caused by: java.lang.NullPointerException
>> at
>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.block(FeedableBodyGenerator.java:184)
>> at
>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.blockUntilQueueFree(FeedableBodyGenerator.java:167)
>> at
>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.flushQueue(FeedableBodyGenerator.java:124)
>> at
>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.feed(FeedableBodyGenerator.java:94)
>>
>> GrizzlyAsyncHttpProvider.HttpTransactionContext httpCtx =
>> getHttpTransactionContext(c);
>> httpCtx.abort(e.getCause());
>>
>> I guess the httpCtx is not already available to be aborted
>>
>>
>> 2013/9/5 Sébastien Lorber <lorber.sebastien_at_gmail.com>
>>
>>>
>>>
>>> This is right, here's a log I have when I use the same session code, ie
>>> the remote host is blocking the data or something.
>>> This is obtained by running 5 parallel uploads.
>>>
>>> *Flushing queue of size 0 with allowBlocking = false*
>>> Flushing queue of size 1 with allowBlocking = true
>>> *Flushing queue of size 97 with allowBlocking = false*
>>> *Flushing queue of size 100 with allowBlocking = false*
>>> *Flushing queue of size 160 with allowBlocking = true*
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 0 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> Dumping heap to /ome/lorber/ureau/om ...
>>> Unable to create /ome/lorber/ureau/om: Le fichier existe
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Disconnected from the target VM, address: '127.0.0.1:49268', transport:
>>> 'socket'
>>>
>>>
>>>
>>>
>>>
>>> Otherwise, with different session codes, I get the following:
>>>
>>> *Flushing queue of size 0 with allowBlocking = false*
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> *Flushing queue of size 0 with allowBlocking = false*
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> *Flushing queue of size 0 with allowBlocking = false*
>>> *Flushing queue of size 0 with allowBlocking = false*
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> Flushing queue of size 1 with allowBlocking = true
>>> ... and this continues without OOM
>>>
>>>
>>> So, this seems to be the problem.
>>>
>>>
>>>
>>> I think it would be great to be able to be able to choose the queue impl
>>> behind that FeedableBodyGenerator, like I suggested in my pull request.
>>>
>>> See here:
>>>
>>> https://github.com/slorber/async-http-client/blob/79b0c3b28a61b0aa4c4b055bca8f6be11d9ed1e6/src/main/java/com/ning/http/client/providers/grizzly/FeedableBodyGenerator.java
>>>
>>> Using a LinkedBlockingQueue seems to be a nice idea in this context, and
>>> in my case I would probably use a queue of size 1
>>>
>>>
>>> This would handle the blocking of the feed method, without having to use
>>> this:
>>> if (context != null) {
>>> flushQueue(true);
>>> }
>>>
>>>
>>> Or perhaps the feed() method have to wait until a context is set in the
>>> BodyGenerator ?
>>>
>>>
>>> I think it would be more clear if the initializeAsynchronousTransfer
>>> simply didn't flush the queue but just setup the context.
>>> Then the feed method would block until there's a context set, and then
>>> flush the queue with blocking behavior.
>>>
>>> This is probably the next step, but as we are using AHC for async, it
>>> would probably be great if that blocking feed() method was called in a
>>> worker thread instead of our main thread.
>>> I won't use this but someone who really wants a non-blocking impl of
>>> performant multipart fileupload would probably need this, or will use an
>>> ExecutorService for the feeding operations as a workaround.
>>>
>>>
>>>
>>> Thanks again for your reactivity
>>>
>>>
>>>
>>>
>>>
>>> 2013/9/4 Ryan Lubke <ryan.lubke_at_oracle.com>
>>>
>>>>
>>>>
>>>> Sébastien Lorber wrote:
>>>>
>>>> I've integrated this change and it works fine except a little detail.
>>>>
>>>>
>>>>
>>>>
>>>> I'm uploading files to a third party API (a bit like S3).
>>>> This API requires a "sessionCode" in each request. So there is a
>>>> multipart StringPart with that SessionCode.
>>>>
>>>> We used to have a cache which holds the sessionCode 30min per user so
>>>> that we do not need to init a new session each time.
>>>>
>>>> I had troubles in this very specific case: when I upload 5 docs with
>>>> the same session code.
>>>> When I remove the cache and use 5 different session codes, it works
>>>> fine.
>>>>
>>>> I guess the remote service is blocking concurrent uploads with the same
>>>> session code. I don't know at all.
>>>>
>>>>
>>>>
>>>> Where I want to go is that I wouldn't have expected Grizzly to OOM
>>>>
>>>>
>>>> Avertissement: Exception during FilterChain execution
>>>> java.lang.OutOfMemoryError: Java heap space
>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLUtils.allocateOutputBuffer(SSLUtils.java:342)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLBaseFilter$2.grow(SSLBaseFilter.java:117)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.ensureBufferSize(SSLConnectionContext.java:392)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.wrap(SSLConnectionContext.java:272)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.wrapAll(SSLConnectionContext.java:238)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLBaseFilter.wrapAll(SSLBaseFilter.java:405)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLBaseFilter.handleWrite(SSLBaseFilter.java:320)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLFilter.accurateWrite(SSLFilter.java:263)
>>>> at org.glassfish.grizzly.ssl.SSLFilter.handleWrite(SSLFilter.java:143)
>>>> at
>>>> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$SwitchingSSLFilter.handleWrite(GrizzlyAsyncHttpProvider.java:2500)
>>>> at
>>>> org.glassfish.grizzly.filterchain.ExecutorResolver$8.execute(ExecutorResolver.java:111)
>>>> at
>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:288)
>>>> at
>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:206)
>>>> at
>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:136)
>>>> at
>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:114)
>>>> at
>>>> org.glassfish.grizzly.ProcessorExecutor.execute(ProcessorExecutor.java:77)
>>>> at
>>>> org.glassfish.grizzly.filterchain.FilterChainContext$1.run(FilterChainContext.java:196)
>>>> at
>>>> org.glassfish.grizzly.filterchain.FilterChainContext.resume(FilterChainContext.java:220)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLFilter$SSLHandshakeContext.completed(SSLFilter.java:383)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLFilter.notifyHandshakeComplete(SSLFilter.java:278)
>>>> at
>>>> org.glassfish.grizzly.ssl.SSLBaseFilter.handleRead(SSLBaseFilter.java:275)
>>>> at
>>>> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$SwitchingSSLFilter.handleRead(GrizzlyAsyncHttpProvider.java:2490)
>>>> at
>>>> org.glassfish.grizzly.filterchain.ExecutorResolver$9.execute(ExecutorResolver.java:119)
>>>> at
>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:288)
>>>> at
>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:206)
>>>> at
>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:136)
>>>> at
>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:114)
>>>> at
>>>> org.glassfish.grizzly.ProcessorExecutor.execute(ProcessorExecutor.java:77)
>>>> at
>>>> org.glassfish.grizzly.nio.transport.TCPNIOTransport.fireIOEvent(TCPNIOTransport.java:546)
>>>> at
>>>> org.glassfish.grizzly.strategies.AbstractIOStrategy.fireIOEvent(AbstractIOStrategy.java:113)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Caused by: java.util.concurrent.TimeoutException: null
>>>> at
>>>> org.glassfish.grizzly.impl.SafeFutureImpl$Sync.innerGet(SafeFutureImpl.java:367)
>>>> ~[grizzly-framework-2.3.5.jar:2.3.5]
>>>> at
>>>> org.glassfish.grizzly.impl.SafeFutureImpl.get(SafeFutureImpl.java:274)
>>>> ~[grizzly-framework-2.3.5.jar:2.3.5]
>>>> at
>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.block(FeedableBodyGenerator.java:177)
>>>> ~[async-http-client-1.7.20-204092c.jar:na]
>>>> at
>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.blockUntilQueueFree(FeedableBodyGenerator.java:167)
>>>> ~[async-http-client-1.7.20-204092c.jar:na]
>>>> at
>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.flushQueue(FeedableBodyGenerator.java:124)
>>>> ~[async-http-client-1.7.20-204092c.jar:na]
>>>> at
>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.feed(FeedableBodyGenerator.java:94)
>>>> ~[async-http-client-1.7.20-204092c.jar:na]
>>>>
>>>>
>>>>
>>>> multipart.body.generator.feeder.buffer=100000 -> size of each Buffer
>>>> sent to the FeedableBodyGenerator
>>>> transport.max.pending.bytes=1000000
>>>> (I tried other settings, including AUTO_SIZE)
>>>>
>>>>
>>>>
>>>> Do you have any idea why is there an OOM with these settings?
>>>>
>>>>
>>>>
>>>>
>>>> Perhaps it is because the feed() method of FeedableBodyGenerator
>>>> doesn't block until the context is initialized.
>>>> I guess the initializeAsynchronousTransfer is called only once the
>>>> connection is established, and perhaps a lot of Buffer are added to the
>>>> queue...
>>>>
>>>> Yes, it's invoked once the request has been dispatched, so if the
>>>> generator is fed a lot before the request, I could see this happening.
>>>> I'll see what I can do to alleviate that case.
>>>>
>>>>
>>>> But I'm not sure at all because the session code is transmitted as a
>>>> BodyPart and I get the same problem if i put it as the first or last
>>>> multipart.
>>>>
>>>> It's not a big deal, perhaps I should always use a different session
>>>> code for concurrent operations but I'd like to be sure that we won't have
>>>> this issue in production...
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 2013/9/3 Ryan Lubke <ryan.lubke_at_oracle.com>
>>>>
>>>>> Good catch. Fixed.
>>>>>
>>>>>
>>>>> Sébastien Lorber wrote:
>>>>>
>>>>>
>>>>>
>>>>> Hello,
>>>>>
>>>>>
>>>>> There's a little mistake in the grizzly ahc provider relative to the
>>>>> write queue size.
>>>>>
>>>>>
>>>>>
>>>>> https://github.com/AsyncHttpClient/async-http-client/blob/b5d97efe9fe14113ea92fb1f7db192a2d090fad7/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java#L419
>>>>>
>>>>> As you can see, the TransportCustomizer is called, and then the write
>>>>> queue size (among other things) is set to AUTO_SIZE (instead of previously
>>>>> UNLIMITED)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> clientTransport.getAsyncQueueIO().getWriter()
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> .setMaxPendingBytesPerConnection(AsyncQueueWriter.AUTO_SIZE);
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I think the default settings like this AUTO_SIZE attribute should be
>>>>> set before the customization of the transport, or they would override the
>>>>> value we customized.
>>>>>
>>>>>
>>>>> This is actually my case, since I can't reproduce my "bug" which is
>>>>> "high memory consumption", even when using -1 / UNLIMITED in the
>>>>> TransportCustomizer.
>>>>>
>>>>>
>>>>> This could work fine for me with AUTO_SIZE, but I'd rather be able to
>>>>> tune this parameter during load tests to see the effect.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 2013/8/31 Sebastien Lorber <lorber.sebastien_at_gmail.com>
>>>>>
>>>>>> Thanks i will ckeck that on monday. I can now upload a 500m file with
>>>>>> 40m heap size ;)
>>>>>>
>>>>>> Envoyé de mon iPhone
>>>>>>
>>>>>> Le 30 août 2013 à 20:49, Ryan Lubke <ryan.lubke_at_oracle.com> a écrit :
>>>>>>
>>>>>> I'm going to be updating the Grizzly provider such that AUTO_SIZE
>>>>>> (not AUTO_TUNE) is the default, so you can avoid the use of the
>>>>>> TransportCustomizer.
>>>>>>
>>>>>> Ryan Lubke wrote:
>>>>>>
>>>>>> Regarding your tuning question, I would probably set the value to
>>>>>> AsyncQueueWriter.AUTO_TUNE (this will be four times the socket write
>>>>>> buffer) and see how that works.
>>>>>>
>>>>>> Ryan Lubke wrote:
>>>>>>
>>>>>> A question first. With these changes, your memory usage is more
>>>>>> inline with what you were looking for?
>>>>>>
>>>>>> Sébastien Lorber wrote:
>>>>>>
>>>>>> By the way, do you have any idea when the 1.7.20 will be released
>>>>>> (with these new improvements?)
>>>>>>
>>>>>> We would like to know if we wait for a release or if we install our
>>>>>> own temp release on Nexus :)
>>>>>>
>>>>>>
>>>>>> 2013/8/30 Sébastien Lorber <lorber.sebastien_at_gmail.com>
>>>>>>
>>>>>>> Thank you, it works fine!
>>>>>>>
>>>>>>>
>>>>>>> I just had to modify a single line after your commit.
>>>>>>>
>>>>>>>
>>>>>>> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider#initializeTransport
>>>>>>> ->
>>>>>>> clientTransport.getAsyncQueueIO().getWriter().setMaxPendingBytesPerConnection(10000);
>>>>>>>
>>>>>>>
>>>>>>> If I let the initial value (-1) it won't block, canWrite always
>>>>>>> returns true
>>>>>>>
>>>>>>>
>>>>>>> Btw, on AHC I didn't find any way to pass this value as a config
>>>>>>> attribute, neither the size of the write buffer you talked about.
>>>>>>>
>>>>>>> So in the end, is there a way with current AHC code to use this
>>>>>>> "canWrite = false" behavior?
>>>>>>> If not, can you please provide a way to set this behavior on v1.7.20
>>>>>>> ? thanks
>>>>>>>
>>>>>>>
>>>>>>> PS: does it make sens to use the same number of bytes un the
>>>>>>> feed(Buffer) method and in the setMaxPendingBytesPerConnection(10000);
>>>>>>> ? do you have any tuning recommandation?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2013/8/29 Ryan Lubke <ryan.lubke_at_oracle.com>
>>>>>>>
>>>>>>>> Please disregard.
>>>>>>>>
>>>>>>>>
>>>>>>>> Ryan Lubke wrote:
>>>>>>>>
>>>>>>>> Sébastien,
>>>>>>>>
>>>>>>>> Could you also provide a sample of how you're performing your
>>>>>>>> feed?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -rl
>>>>>>>>
>>>>>>>> Ryan Lubke wrote:
>>>>>>>>
>>>>>>>> Sébastien,
>>>>>>>>
>>>>>>>> I'd recommend looking at Connection.canWrite() [1] and
>>>>>>>> Connection.notifyCanWrite(WriteListener) [1]
>>>>>>>>
>>>>>>>> By default, Grizzly will configure the async write queue length to
>>>>>>>> be four times the write buffer size (which is based off the socket write
>>>>>>>> buffer).
>>>>>>>> When this queue exceeds this value, canWrite() will return false.
>>>>>>>>
>>>>>>>> When this occurs, you can register a WriteListener to be notified
>>>>>>>> when the queue length is below the configured max and then simulate blocking
>>>>>>>> until the onWritePossible() callback has been invoked.
>>>>>>>>
>>>>>>>> ----------------------------------------------------------------
>>>>>>>>
>>>>>>>> final FutureImpl<Boolean> future =
>>>>>>>> Futures.createSafeFuture();
>>>>>>>>
>>>>>>>> // Connection may be obtained by calling
>>>>>>>> FilterChainContext.getConnection().
>>>>>>>> connection.notifyCanWrite(new WriteHandler() {
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public void onWritePossible() throws Exception {
>>>>>>>> future.result(Boolean.TRUE);
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public void onError(Throwable t) {
>>>>>>>> future.failure(Exceptions.makeIOException(t));
>>>>>>>> }
>>>>>>>> });
>>>>>>>>
>>>>>>>> try {
>>>>>>>> final long writeTimeout = 30;
>>>>>>>> future.get(writeTimeout, TimeUnit.MILLISECONDS);
>>>>>>>> } catch (ExecutionException e) {
>>>>>>>> HttpTransactionContext httpCtx =
>>>>>>>> HttpTransactionContext.get(connection);
>>>>>>>> httpCtx.abort(e.getCause());
>>>>>>>> } catch (Exception e) {
>>>>>>>> HttpTransactionContext httpCtx =
>>>>>>>> HttpTransactionContext.get(connection);
>>>>>>>> httpCtx.abort(e);
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> http://grizzly.java.net/docs/2.3/apidocs/org/glassfish/grizzly/OutputSink.html
>>>>>>>> .
>>>>>>>>
>>>>>>>> Sébastien Lorber wrote:
>>>>>>>>
>>>>>>>> Ryan, I've did some other tests.
>>>>>>>>
>>>>>>>>
>>>>>>>> It seems that using a blocking queue in the FeedableBodyGenerator
>>>>>>>> is totally useless because the thread consuming it is not blocking and the
>>>>>>>> queue never blocks the feeding, which was my intention in the beginning.
>>>>>>>> Maybe it depends on the IO strategy used?
>>>>>>>> I use AHC default which seems to use SameThreadIOStrategy so I
>>>>>>>> don't think it's related to the IO strategy.
>>>>>>>>
>>>>>>>>
>>>>>>>> So in the end I can upload a 70m file with a heap of 50m, but I
>>>>>>>> have to put a Thread.sleep(30) between each 100k Buffer send to the
>>>>>>>> FeedableBodyGenerator
>>>>>>>>
>>>>>>>> The connection with the server is not good here, but in production
>>>>>>>> it is normally a lot better as far as I know.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I've tried things
>>>>>>>> like clientTransport.getAsyncQueueIO().getWriter().setMaxPendingBytesPerConnection(100000);
>>>>>>>> but it doesn't seem to work for me.
>>>>>>>>
>>>>>>>> I'd like the Grizzly internals to block when there are too much
>>>>>>>> pending bytes to send. Is it possible?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> PS: I've just been able to send a 500mo file with 100mo heap, but
>>>>>>>> it needed a sleep of 100ms between each 100k Buffer sent to the
>>>>>>>> bodygenerator
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2013/8/29 Sébastien Lorber <lorber.sebastien_at_gmail.com>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> By chance do you if I can remove the MessageCloner used in the SSL
>>>>>>>>> filter?
>>>>>>>>> SSLBaseFilter$OnWriteCopyCloner
>>>>>>>>>
>>>>>>>>> It seems to allocate a lot of memory.
>>>>>>>>> I don't really understand why messages have to be cloned, can I
>>>>>>>>> remove this? How?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2013/8/29 Sébastien Lorber <lorber.sebastien_at_gmail.com>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I'm trying to send a 500m file for my tests with a heap of 400m.
>>>>>>>>>>
>>>>>>>>>> In our real use cases we would probably have files under 20mo but
>>>>>>>>>> we want to reduce the memory consumption because we can have x parallel
>>>>>>>>>> uploads on the same server according to the user activity.
>>>>>>>>>>
>>>>>>>>>> I'll try to check if using this BodyGenerator reduced the memory
>>>>>>>>>> footprint or if it's almost like before.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2013/8/28 Ryan Lubke <ryan.lubke_at_oracle.com>
>>>>>>>>>>
>>>>>>>>>>> At this point in time, as far as the SSL buffer allocation is
>>>>>>>>>>> concerned, it's untunable.
>>>>>>>>>>>
>>>>>>>>>>> That said, feel free to open a feature request.
>>>>>>>>>>>
>>>>>>>>>>> As to your second question, there is no suggested size. This is
>>>>>>>>>>> all very application specific.
>>>>>>>>>>>
>>>>>>>>>>> I'm curious, how large of a file are you sending?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Sébastien Lorber wrote:
>>>>>>>>>>>
>>>>>>>>>>> I have seen a lot of buffers which have a size of 33842 and it
>>>>>>>>>>> seems the limit is near half the capacity.
>>>>>>>>>>>
>>>>>>>>>>> Perhaps there's a way to tune that buffer size so that it
>>>>>>>>>>> consumes less memory?
>>>>>>>>>>> Is there an ideal Buffer size to send to the feed method?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2013/8/28 Ryan Lubke <ryan.lubke_at_oracle.com>
>>>>>>>>>>>
>>>>>>>>>>>> I'll be reviewing the PR today, thanks again!
>>>>>>>>>>>>
>>>>>>>>>>>> Regarding the OOM: as it stands now, for each new buffer that
>>>>>>>>>>>> is passed to the SSLFilter, we allocate a buffer twice the size in order to
>>>>>>>>>>>> accommodate the encrypted result. So there's an increase.
>>>>>>>>>>>>
>>>>>>>>>>>> Depending on the socket configurations of both endpoints, and
>>>>>>>>>>>> how fast the remote is reading data, it could
>>>>>>>>>>>> be the write queue is becoming too large. We do have a way to
>>>>>>>>>>>> detect this situation, but I'm pretty sure
>>>>>>>>>>>> the Grizzly internals are currently shielded here. I will see
>>>>>>>>>>>> what I can do to allow users to leverage this.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Sébastien Lorber wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> I've made my pull request.
>>>>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client/pull/367
>>>>>>>>>>>>
>>>>>>>>>>>> With my usecase it works, the file is uploaded like before.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> But I didn't notice a big memory improvement.
>>>>>>>>>>>>
>>>>>>>>>>>> Is it possible that SSL doesn't allow to stream the body or
>>>>>>>>>>>> something like that?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> In memory, I have a lot of:
>>>>>>>>>>>> - HeapByteBuffer
>>>>>>>>>>>> Which are hold by SSLUtils$3
>>>>>>>>>>>> Which are hold by BufferBuffers
>>>>>>>>>>>> Which are hold by WriteResult
>>>>>>>>>>>> Which are hold by AsyncWriteQueueRecord
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Here is an exemple of the OOM stacktrace:
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>>>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>>>>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ssl.SSLUtils.allocateOutputBuffer(SSLUtils.java:342)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ssl.SSLBaseFilter$2.grow(SSLBaseFilter.java:117)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.ensureBufferSize(SSLConnectionContext.java:392)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.wrap(SSLConnectionContext.java:272)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.wrapAll(SSLConnectionContext.java:227)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ssl.SSLBaseFilter.wrapAll(SSLBaseFilter.java:404)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ssl.SSLBaseFilter.handleWrite(SSLBaseFilter.java:319)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ssl.SSLFilter.accurateWrite(SSLFilter.java:255)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ssl.SSLFilter.handleWrite(SSLFilter.java:143)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$SwitchingSSLFilter.handleWrite(GrizzlyAsyncHttpProvider.java:2503)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.filterchain.ExecutorResolver$8.execute(ExecutorResolver.java:111)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:288)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:206)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:136)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:114)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.ProcessorExecutor.execute(ProcessorExecutor.java:77)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.filterchain.FilterChainContext.write(FilterChainContext.java:853)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.glassfish.grizzly.filterchain.FilterChainContext.write(FilterChainContext.java:720)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.flushQueue(FeedableBodyGenerator.java:132)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.feed(FeedableBodyGenerator.java:101)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.ning.http.client.providers.grizzly.MultipartBodyGeneratorFeeder$FeedBodyGeneratorOutputStream.write(MultipartBodyGeneratorFeeder.java:222)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.ning.http.multipart.FilePart.sendData(FilePart.java:179)
>>>>>>>>>>>> at com.ning.http.multipart.Part.send(Part.java:331)
>>>>>>>>>>>> at com.ning.http.multipart.Part.sendParts(Part.java:397)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.ning.http.client.providers.grizzly.MultipartBodyGeneratorFeeder.feed(MultipartBodyGeneratorFeeder.java:144)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Any idea?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2013/8/27 Ryan Lubke <ryan.lubke_at_oracle.com>
>>>>>>>>>>>>
>>>>>>>>>>>>> Excellent! Looking forward to the pull request!
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Sébastien Lorber wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Ryan thanks, it works fine, I'll make a pull request on AHC
>>>>>>>>>>>>> tomorrow with a better code using the same Part classes that already exist.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I created an OutputStream that redirects to the BodyGenerator
>>>>>>>>>>>>> feeder.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The problem I currently have is that the feeder feeds the
>>>>>>>>>>>>> queue faster than the async thread polling it :)
>>>>>>>>>>>>> I need to expose a limit to that queue size or something, will
>>>>>>>>>>>>> work on that, it will be better than a thread sleep to slow down the
>>>>>>>>>>>>> filepart reading
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2013/8/27 Ryan Lubke <ryan.lubke_at_oracle.com>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, something like that. I was going to tackle adding
>>>>>>>>>>>>>> something like this today. I'll follow up with something you can test out.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Sébastien Lorber wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ok thanks!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think I see what I could do, probably something like that:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> FeedableBodyGenerator bodyGenerator = new
>>>>>>>>>>>>>> FeedableBodyGenerator();
>>>>>>>>>>>>>> MultipartBodyGeneratorFeeder bodyGeneratorFeeder = new
>>>>>>>>>>>>>> MultipartBodyGeneratorFeeder(bodyGenerator);
>>>>>>>>>>>>>> Request uploadRequest1 = new RequestBuilder("POST")
>>>>>>>>>>>>>> .setUrl("url")
>>>>>>>>>>>>>> .setBody(bodyGenerator)
>>>>>>>>>>>>>> .build();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ListenableFuture<Response> asyncRes = asyncHttpClient
>>>>>>>>>>>>>> .prepareRequest(uploadRequest1)
>>>>>>>>>>>>>> .execute(new AsyncCompletionHandlerBase());
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> bodyGeneratorFeeder.append("param1","value1");
>>>>>>>>>>>>>> bodyGeneratorFeeder.append("param2","value2");
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> bodyGeneratorFeeder.append("fileToUpload",fileInputStream);
>>>>>>>>>>>>>> bodyGeneratorFeeder.end();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Response uploadResponse = asyncRes.get();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Does it seem ok to you?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I guess it could be interesting to provide that
>>>>>>>>>>>>>> MultipartBodyGeneratorFeeder class to AHC or Grizzly since some other
>>>>>>>>>>>>>> people may want to achieve the same thing
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2013/8/26 Ryan Lubke <ryan.lubke_at_oracle.com>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Sébastien Lorber wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would like to know if it's possible to upload a file with
>>>>>>>>>>>>>>>> AHC / Grizzly in streaming, I mean without loading the whole file bytes in
>>>>>>>>>>>>>>>> memory.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The default behavior seems to allocate a byte[] which
>>>>>>>>>>>>>>>> contans the whole file, so it means that my server can be OOM if too many
>>>>>>>>>>>>>>>> users upload a large file in the same time.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I've tryied with a Heap and ByteBuffer memory managers,
>>>>>>>>>>>>>>>> with reallocate=true/false but no more success.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It seems the whole file content is appended wto the
>>>>>>>>>>>>>>>> BufferOutputStream, and then the underlying buffer is written.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> At least this seems to be the case with AHC integration:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/AsyncHttpClient/async-http-client/blob/6faf1f316e5546110b0779a5a42fd9d03ba6bc15/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/bodyhandler/PartsBodyHandler.java
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So, is there a way to patch AHC to stream the file so that
>>>>>>>>>>>>>>>> I could eventually consume only 20mo of heap while uploading a 500mo file?
>>>>>>>>>>>>>>>> Or is this simply impossible with Grizzly?
>>>>>>>>>>>>>>>> I didn't notice anything related to that in the
>>>>>>>>>>>>>>>> documentation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It's possible with the FeedableBodyGenerator. But if you're
>>>>>>>>>>>>>>> tied to using Multipart uploads, you'd have to convert the multipart data
>>>>>>>>>>>>>>> to Buffers manually and send using the FeedableBodyGenerator.
>>>>>>>>>>>>>>> I'll take a closer look to see if this area can be improved.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Btw in my case it is a file upload. I receive a file with
>>>>>>>>>>>>>>>> CXF and have to transmit it to a storage server (like S3). CXF doesn't
>>>>>>>>>>>>>>>> consume memory bevause it is streaming the large fle uploads to the file
>>>>>>>>>>>>>>>> system, and then provides an input stream on that file.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>