users@grizzly.java.net

Re: Upload a large file without oom with Grizzly

From: Ryan Lubke <ryan.lubke_at_oracle.com>
Date: Thu, 29 Aug 2013 14:47:24 -0700

Please disregard.

Ryan Lubke wrote:
> Sébastien,
>
> Could you also provide a sample of how you're performing your feed?
>
> Thanks,
> -rl
>
> Ryan Lubke wrote:
>> Sébastien,
>>
>> I'd recommend looking at Connection.canWrite() [1] and
>> Connection.notifyCanWrite(WriteListener) [1]
>>
>> By default, Grizzly will configure the async write queue length to be
>> four times the write buffer size (which is based off the socket write
>> buffer).
>> When this queue exceeds this value, canWrite() will return false.
>>
>> When this occurs, you can register a WriteListener to be notified
>> when the queue length is below the configured max and then simulate
>> blocking
>> until the onWritePossible() callback has been invoked.
>>
>> ----------------------------------------------------------------
>>
>> final FutureImpl<Boolean> future = Futures.createSafeFuture();
>>
>> // Connection may be obtained by calling
>> FilterChainContext.getConnection().
>> connection.notifyCanWrite(new WriteHandler() {
>>
>> @Override
>> public void onWritePossible() throws Exception {
>> future.result(Boolean.TRUE);
>> }
>>
>> @Override
>> public void onError(Throwable t) {
>> future.failure(Exceptions.makeIOException(t));
>> }
>> });
>>
>> try {
>> final long writeTimeout = 30;
>> future.get(writeTimeout, TimeUnit.MILLISECONDS);
>> } catch (ExecutionException e) {
>> HttpTransactionContext httpCtx =
>> HttpTransactionContext.get(connection);
>> httpCtx.abort(e.getCause());
>> } catch (Exception e) {
>> HttpTransactionContext httpCtx = HttpTransactionContext.get(connection);
>> httpCtx.abort(e);
>> }
>>
>> ---------------------------------------------------------------------
>>
>> [1]
>> http://grizzly.java.net/docs/2.3/apidocs/org/glassfish/grizzly/OutputSink.html.
>>
>> Sébastien Lorber wrote:
>>> Ryan, I've did some other tests.
>>>
>>>
>>> It seems that using a blocking queue in the FeedableBodyGenerator is
>>> totally useless because the thread consuming it is not blocking and
>>> the queue never blocks the feeding, which was my intention in the
>>> beginning. Maybe it depends on the IO strategy used?
>>> I use AHC default which seems to use SameThreadIOStrategy so I don't
>>> think it's related to the IO strategy.
>>>
>>>
>>> So in the end I can upload a 70m file with a heap of 50m, but I have
>>> to put a Thread.sleep(30) between each 100k Buffer send to the
>>> FeedableBodyGenerator
>>>
>>> The connection with the server is not good here, but in production
>>> it is normally a lot better as far as I know.
>>>
>>>
>>>
>>> I've tried things
>>> like clientTransport.getAsyncQueueIO().getWriter().setMaxPendingBytesPerConnection(100000);
>>> but it doesn't seem to work for me.
>>>
>>> I'd like the Grizzly internals to block when there are too much
>>> pending bytes to send. Is it possible?
>>>
>>>
>>>
>>>
>>> PS: I've just been able to send a 500mo file with 100mo heap, but it
>>> needed a sleep of 100ms between each 100k Buffer sent to the
>>> bodygenerator
>>>
>>>
>>>
>>>
>>> 2013/8/29 Sébastien Lorber <lorber.sebastien_at_gmail.com
>>> <mailto:lorber.sebastien_at_gmail.com>>
>>>
>>>
>>>
>>> By chance do you if I can remove the MessageCloner used in the
>>> SSL filter?
>>> SSLBaseFilter$OnWriteCopyCloner
>>>
>>> It seems to allocate a lot of memory.
>>> I don't really understand why messages have to be cloned, can I
>>> remove this? How?
>>>
>>>
>>> 2013/8/29 Sébastien Lorber <lorber.sebastien_at_gmail.com
>>> <mailto:lorber.sebastien_at_gmail.com>>
>>>
>>>
>>> I'm trying to send a 500m file for my tests with a heap of 400m.
>>>
>>> In our real use cases we would probably have files under
>>> 20mo but we want to reduce the memory consumption because we
>>> can have x parallel uploads on the same server according to
>>> the user activity.
>>>
>>> I'll try to check if using this BodyGenerator reduced the
>>> memory footprint or if it's almost like before.
>>>
>>>
>>> 2013/8/28 Ryan Lubke <ryan.lubke_at_oracle.com
>>> <mailto:ryan.lubke_at_oracle.com>>
>>>
>>> At this point in time, as far as the SSL buffer
>>> allocation is concerned, it's untunable.
>>>
>>> That said, feel free to open a feature request.
>>>
>>> As to your second question, there is no suggested size.
>>> This is all very application specific.
>>>
>>> I'm curious, how large of a file are you sending?
>>>
>>>
>>>
>>> Sébastien Lorber wrote:
>>>> I have seen a lot of buffers which have a size of 33842
>>>> and it seems the limit is near half the capacity.
>>>>
>>>> Perhaps there's a way to tune that buffer size so that
>>>> it consumes less memory?
>>>> Is there an ideal Buffer size to send to the feed method?
>>>>
>>>>
>>>> 2013/8/28 Ryan Lubke <ryan.lubke_at_oracle.com
>>>> <mailto:ryan.lubke_at_oracle.com>>
>>>>
>>>> I'll be reviewing the PR today, thanks again!
>>>>
>>>> Regarding the OOM: as it stands now, for each new
>>>> buffer that is passed to the SSLFilter, we allocate
>>>> a buffer twice the size in order to
>>>> accommodate the encrypted result. So there's an
>>>> increase.
>>>>
>>>> Depending on the socket configurations of both
>>>> endpoints, and how fast the remote is reading data,
>>>> it could
>>>> be the write queue is becoming too large. We do
>>>> have a way to detect this situation, but I'm pretty
>>>> sure
>>>> the Grizzly internals are currently shielded here.
>>>> I will see what I can do to allow users to leverage
>>>> this.
>>>>
>>>>
>>>>
>>>>
>>>> Sébastien Lorber wrote:
>>>>> Hello,
>>>>>
>>>>> I've made my pull request.
>>>>> https://github.com/AsyncHttpClient/async-http-client/pull/367
>>>>>
>>>>> With my usecase it works, the file is uploaded
>>>>> like before.
>>>>>
>>>>>
>>>>>
>>>>> But I didn't notice a big memory improvement.
>>>>>
>>>>> Is it possible that SSL doesn't allow to stream
>>>>> the body or something like that?
>>>>>
>>>>>
>>>>>
>>>>> In memory, I have a lot of:
>>>>> - HeapByteBuffer
>>>>> Which are hold by SSLUtils$3
>>>>> Which are hold by BufferBuffers
>>>>> Which are hold by WriteResult
>>>>> Which are hold by AsyncWriteQueueRecord
>>>>>
>>>>>
>>>>> Here is an exemple of the OOM stacktrace:
>>>>>
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>> at
>>>>> java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>>> at
>>>>> org.glassfish.grizzly.ssl.SSLUtils.allocateOutputBuffer(SSLUtils.java:342)
>>>>> at
>>>>> org.glassfish.grizzly.ssl.SSLBaseFilter$2.grow(SSLBaseFilter.java:117)
>>>>> at
>>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.ensureBufferSize(SSLConnectionContext.java:392)
>>>>> at
>>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.wrap(SSLConnectionContext.java:272)
>>>>> at
>>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.wrapAll(SSLConnectionContext.java:227)
>>>>> at
>>>>> org.glassfish.grizzly.ssl.SSLBaseFilter.wrapAll(SSLBaseFilter.java:404)
>>>>> at
>>>>> org.glassfish.grizzly.ssl.SSLBaseFilter.handleWrite(SSLBaseFilter.java:319)
>>>>> at
>>>>> org.glassfish.grizzly.ssl.SSLFilter.accurateWrite(SSLFilter.java:255)
>>>>> at
>>>>> org.glassfish.grizzly.ssl.SSLFilter.handleWrite(SSLFilter.java:143)
>>>>> at
>>>>> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$SwitchingSSLFilter.handleWrite(GrizzlyAsyncHttpProvider.java:2503)
>>>>> at
>>>>> org.glassfish.grizzly.filterchain.ExecutorResolver$8.execute(ExecutorResolver.java:111)
>>>>> at
>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:288)
>>>>> at
>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:206)
>>>>> at
>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:136)
>>>>> at
>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:114)
>>>>> at
>>>>> org.glassfish.grizzly.ProcessorExecutor.execute(ProcessorExecutor.java:77)
>>>>> at
>>>>> org.glassfish.grizzly.filterchain.FilterChainContext.write(FilterChainContext.java:853)
>>>>> at
>>>>> org.glassfish.grizzly.filterchain.FilterChainContext.write(FilterChainContext.java:720)
>>>>> at
>>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.flushQueue(FeedableBodyGenerator.java:132)
>>>>> at
>>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.feed(FeedableBodyGenerator.java:101)
>>>>> at
>>>>> com.ning.http.client.providers.grizzly.MultipartBodyGeneratorFeeder$FeedBodyGeneratorOutputStream.write(MultipartBodyGeneratorFeeder.java:222)
>>>>> at
>>>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>>>>> at
>>>>> java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
>>>>> at
>>>>> com.ning.http.multipart.FilePart.sendData(FilePart.java:179)
>>>>> at com.ning.http.multipart.Part.send(Part.java:331)
>>>>> at
>>>>> com.ning.http.multipart.Part.sendParts(Part.java:397)
>>>>> at
>>>>> com.ning.http.client.providers.grizzly.MultipartBodyGeneratorFeeder.feed(MultipartBodyGeneratorFeeder.java:144)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Any idea?
>>>>>
>>>>>
>>>>>
>>>>> 2013/8/27 Ryan Lubke <ryan.lubke_at_oracle.com
>>>>> <mailto:ryan.lubke_at_oracle.com>>
>>>>>
>>>>> Excellent! Looking forward to the pull request!
>>>>>
>>>>>
>>>>> Sébastien Lorber wrote:
>>>>>> Ryan thanks, it works fine, I'll make a pull
>>>>>> request on AHC tomorrow with a better code
>>>>>> using the same Part classes that already exist.
>>>>>>
>>>>>> I created an OutputStream that redirects to
>>>>>> the BodyGenerator feeder.
>>>>>>
>>>>>> The problem I currently have is that the
>>>>>> feeder feeds the queue faster than the async
>>>>>> thread polling it :)
>>>>>> I need to expose a limit to that queue size
>>>>>> or something, will work on that, it will be
>>>>>> better than a thread sleep to slow down the
>>>>>> filepart reading
>>>>>>
>>>>>>
>>>>>> 2013/8/27 Ryan Lubke <ryan.lubke_at_oracle.com
>>>>>> <mailto:ryan.lubke_at_oracle.com>>
>>>>>>
>>>>>> Yes, something like that. I was going to
>>>>>> tackle adding something like this today.
>>>>>> I'll follow up with something you can
>>>>>> test out.
>>>>>>
>>>>>>
>>>>>> Sébastien Lorber wrote:
>>>>>>> Ok thanks!
>>>>>>>
>>>>>>> I think I see what I could do, probably
>>>>>>> something like that:
>>>>>>>
>>>>>>>
>>>>>>> FeedableBodyGenerator bodyGenerator
>>>>>>> = new FeedableBodyGenerator();
>>>>>>> MultipartBodyGeneratorFeeder
>>>>>>> bodyGeneratorFeeder = new
>>>>>>> MultipartBodyGeneratorFeeder(bodyGenerator);
>>>>>>> Request uploadRequest1 = new
>>>>>>> RequestBuilder("POST")
>>>>>>> .setUrl("url")
>>>>>>> .setBody(bodyGenerator)
>>>>>>> .build();
>>>>>>>
>>>>>>> ListenableFuture<Response> asyncRes
>>>>>>> = asyncHttpClient
>>>>>>> .prepareRequest(uploadRequest1)
>>>>>>> .execute(new
>>>>>>> AsyncCompletionHandlerBase());
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> bodyGeneratorFeeder.append("param1","value1");
>>>>>>>
>>>>>>> bodyGeneratorFeeder.append("param2","value2");
>>>>>>>
>>>>>>> bodyGeneratorFeeder.append("fileToUpload",fileInputStream);
>>>>>>> bodyGeneratorFeeder.end();
>>>>>>>
>>>>>>> Response uploadResponse =
>>>>>>> asyncRes.get();
>>>>>>>
>>>>>>>
>>>>>>> Does it seem ok to you?
>>>>>>>
>>>>>>> I guess it could be interesting to
>>>>>>> provide that
>>>>>>> MultipartBodyGeneratorFeeder class to
>>>>>>> AHC or Grizzly since some other people
>>>>>>> may want to achieve the same thing
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2013/8/26 Ryan Lubke
>>>>>>> <ryan.lubke_at_oracle.com
>>>>>>> <mailto:ryan.lubke_at_oracle.com>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Sébastien Lorber wrote:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I would like to know if it's
>>>>>>> possible to upload a file with
>>>>>>> AHC / Grizzly in streaming, I
>>>>>>> mean without loading the whole
>>>>>>> file bytes in memory.
>>>>>>>
>>>>>>> The default behavior seems to
>>>>>>> allocate a byte[] which contans
>>>>>>> the whole file, so it means that
>>>>>>> my server can be OOM if too many
>>>>>>> users upload a large file in the
>>>>>>> same time.
>>>>>>>
>>>>>>>
>>>>>>> I've tryied with a Heap and
>>>>>>> ByteBuffer memory managers, with
>>>>>>> reallocate=true/false but no
>>>>>>> more success.
>>>>>>>
>>>>>>> It seems the whole file content
>>>>>>> is appended wto the
>>>>>>> BufferOutputStream, and then the
>>>>>>> underlying buffer is written.
>>>>>>>
>>>>>>> At least this seems to be the
>>>>>>> case with AHC integration:
>>>>>>> https://github.com/AsyncHttpClient/async-http-client/blob/6faf1f316e5546110b0779a5a42fd9d03ba6bc15/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/bodyhandler/PartsBodyHandler.java
>>>>>>>
>>>>>>>
>>>>>>> So, is there a way to patch AHC
>>>>>>> to stream the file so that I
>>>>>>> could eventually consume only
>>>>>>> 20mo of heap while uploading a
>>>>>>> 500mo file?
>>>>>>> Or is this simply impossible
>>>>>>> with Grizzly?
>>>>>>> I didn't notice anything related
>>>>>>> to that in the documentation.
>>>>>>>
>>>>>>> It's possible with the
>>>>>>> FeedableBodyGenerator. But if
>>>>>>> you're tied to using Multipart
>>>>>>> uploads, you'd have to convert the
>>>>>>> multipart data to Buffers manually
>>>>>>> and send using the
>>>>>>> FeedableBodyGenerator.
>>>>>>> I'll take a closer look to see if
>>>>>>> this area can be improved.
>>>>>>>
>>>>>>>
>>>>>>> Btw in my case it is a file
>>>>>>> upload. I receive a file with
>>>>>>> CXF and have to transmit it to a
>>>>>>> storage server (like S3). CXF
>>>>>>> doesn't consume memory bevause
>>>>>>> it is streaming the large fle
>>>>>>> uploads to the file system, and
>>>>>>> then provides an input stream on
>>>>>>> that file.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>