You can use a TransportCustomizer [1] to set this on the transport that
will be used by the AHC instance.
The TransportCustomizer implementation may be set by:
-------------------------------------
final GrizzlyAsyncHttpProviderConfig providerConfig = new
GrizzlyAsyncHttpProviderConfig();
final TransportCustomizer transportCapture = new TransportCapture() {
public void customize(final TCPNIOTransport tcpnioTransport,
final FilterChainBuilder builder) {
tcpnioTransport.getAsyncQueueIO().getWriter().setMaxPendingBytesPerConnection(<your
value here>);
}
};
providerConfig.addProperty(Property.TRANSPORT_CUSTOMIZER, transportCapture);
final AsyncHttpClientConfig config = new AsyncHttpClientConfig
.Builder()
.setAsyncHttpClientProviderConfig(providerConfig)
.build();
final AsyncHttpClient client = new AsyncHttpClient(new
GrizzlyAsyncHttpProvider(config), config);
-------------------------------------
[1]
https://github.com/AsyncHttpClient/async-http-client/blob/ahc-1.7.x/src/main/java/com/ning/http/client/providers/grizzly/TransportCustomizer.java
Sébastien Lorber wrote:
> Thank you, it works fine!
>
>
> I just had to modify a single line after your commit.
>
> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider#initializeTransport
> ->
> clientTransport.getAsyncQueueIO().getWriter().setMaxPendingBytesPerConnection(10000);
>
>
> If I let the initial value (-1) it won't block, canWrite always
> returns true
>
>
> Btw, on AHC I didn't find any way to pass this value as a config
> attribute, neither the size of the write buffer you talked about.
>
> So in the end, is there a way with current AHC code to use this
> "canWrite = false" behavior?
> If not, can you please provide a way to set this behavior on v1.7.20 ?
> thanks
>
>
> PS: does it make sens to use the same number of bytes un the
> feed(Buffer) method and in the setMaxPendingBytesPerConnection(10000);
> ? do you have any tuning recommandation?
>
>
>
> 2013/8/29 Ryan Lubke <ryan.lubke_at_oracle.com
> <mailto:ryan.lubke_at_oracle.com>>
>
> Please disregard.
>
>
> Ryan Lubke wrote:
>> Sébastien,
>>
>> Could you also provide a sample of how you're performing your feed?
>>
>> Thanks,
>> -rl
>>
>> Ryan Lubke wrote:
>>> Sébastien,
>>>
>>> I'd recommend looking at Connection.canWrite() [1] and
>>> Connection.notifyCanWrite(WriteListener) [1]
>>>
>>> By default, Grizzly will configure the async write queue length
>>> to be four times the write buffer size (which is based off the
>>> socket write buffer).
>>> When this queue exceeds this value, canWrite() will return false.
>>>
>>> When this occurs, you can register a WriteListener to be
>>> notified when the queue length is below the configured max and
>>> then simulate blocking
>>> until the onWritePossible() callback has been invoked.
>>>
>>> ----------------------------------------------------------------
>>>
>>> final FutureImpl<Boolean> future =
>>> Futures.createSafeFuture();
>>>
>>> // Connection may be obtained by calling
>>> FilterChainContext.getConnection().
>>> connection.notifyCanWrite(new WriteHandler() {
>>>
>>> @Override
>>> public void onWritePossible() throws Exception {
>>> future.result(Boolean.TRUE);
>>> }
>>>
>>> @Override
>>> public void onError(Throwable t) {
>>> future.failure(Exceptions.makeIOException(t));
>>> }
>>> });
>>>
>>> try {
>>> final long writeTimeout = 30;
>>> future.get(writeTimeout, TimeUnit.MILLISECONDS);
>>> } catch (ExecutionException e) {
>>> HttpTransactionContext httpCtx =
>>> HttpTransactionContext.get(connection);
>>> httpCtx.abort(e.getCause());
>>> } catch (Exception e) {
>>> HttpTransactionContext httpCtx =
>>> HttpTransactionContext.get(connection);
>>> httpCtx.abort(e);
>>> }
>>>
>>> ---------------------------------------------------------------------
>>>
>>> [1]
>>> http://grizzly.java.net/docs/2.3/apidocs/org/glassfish/grizzly/OutputSink.html.
>>>
>>> Sébastien Lorber wrote:
>>>> Ryan, I've did some other tests.
>>>>
>>>>
>>>> It seems that using a blocking queue in the
>>>> FeedableBodyGenerator is totally useless because the thread
>>>> consuming it is not blocking and the queue never blocks the
>>>> feeding, which was my intention in the beginning. Maybe it
>>>> depends on the IO strategy used?
>>>> I use AHC default which seems to use SameThreadIOStrategy so I
>>>> don't think it's related to the IO strategy.
>>>>
>>>>
>>>> So in the end I can upload a 70m file with a heap of 50m, but I
>>>> have to put a Thread.sleep(30) between each 100k Buffer send to
>>>> the FeedableBodyGenerator
>>>>
>>>> The connection with the server is not good here, but in
>>>> production it is normally a lot better as far as I know.
>>>>
>>>>
>>>>
>>>> I've tried things
>>>> like clientTransport.getAsyncQueueIO().getWriter().setMaxPendingBytesPerConnection(100000);
>>>> but it doesn't seem to work for me.
>>>>
>>>> I'd like the Grizzly internals to block when there are too much
>>>> pending bytes to send. Is it possible?
>>>>
>>>>
>>>>
>>>>
>>>> PS: I've just been able to send a 500mo file with 100mo heap,
>>>> but it needed a sleep of 100ms between each 100k Buffer sent to
>>>> the bodygenerator
>>>>
>>>>
>>>>
>>>>
>>>> 2013/8/29 Sébastien Lorber <lorber.sebastien_at_gmail.com
>>>> <mailto:lorber.sebastien_at_gmail.com>>
>>>>
>>>>
>>>>
>>>> By chance do you if I can remove the MessageCloner used in
>>>> the SSL filter?
>>>> SSLBaseFilter$OnWriteCopyCloner
>>>>
>>>> It seems to allocate a lot of memory.
>>>> I don't really understand why messages have to be cloned,
>>>> can I remove this? How?
>>>>
>>>>
>>>> 2013/8/29 Sébastien Lorber <lorber.sebastien_at_gmail.com
>>>> <mailto:lorber.sebastien_at_gmail.com>>
>>>>
>>>>
>>>> I'm trying to send a 500m file for my tests with a heap
>>>> of 400m.
>>>>
>>>> In our real use cases we would probably have files
>>>> under 20mo but we want to reduce the memory consumption
>>>> because we can have x parallel uploads on the same
>>>> server according to the user activity.
>>>>
>>>> I'll try to check if using this BodyGenerator reduced
>>>> the memory footprint or if it's almost like before.
>>>>
>>>>
>>>> 2013/8/28 Ryan Lubke <ryan.lubke_at_oracle.com
>>>> <mailto:ryan.lubke_at_oracle.com>>
>>>>
>>>> At this point in time, as far as the SSL buffer
>>>> allocation is concerned, it's untunable.
>>>>
>>>> That said, feel free to open a feature request.
>>>>
>>>> As to your second question, there is no suggested
>>>> size. This is all very application specific.
>>>>
>>>> I'm curious, how large of a file are you sending?
>>>>
>>>>
>>>>
>>>> Sébastien Lorber wrote:
>>>>> I have seen a lot of buffers which have a size of
>>>>> 33842 and it seems the limit is near half the
>>>>> capacity.
>>>>>
>>>>> Perhaps there's a way to tune that buffer size so
>>>>> that it consumes less memory?
>>>>> Is there an ideal Buffer size to send to the feed
>>>>> method?
>>>>>
>>>>>
>>>>> 2013/8/28 Ryan Lubke <ryan.lubke_at_oracle.com
>>>>> <mailto:ryan.lubke_at_oracle.com>>
>>>>>
>>>>> I'll be reviewing the PR today, thanks again!
>>>>>
>>>>> Regarding the OOM: as it stands now, for each
>>>>> new buffer that is passed to the SSLFilter, we
>>>>> allocate a buffer twice the size in order to
>>>>> accommodate the encrypted result. So there's
>>>>> an increase.
>>>>>
>>>>> Depending on the socket configurations of both
>>>>> endpoints, and how fast the remote is reading
>>>>> data, it could
>>>>> be the write queue is becoming too large. We
>>>>> do have a way to detect this situation, but
>>>>> I'm pretty sure
>>>>> the Grizzly internals are currently shielded
>>>>> here. I will see what I can do to allow users
>>>>> to leverage this.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Sébastien Lorber wrote:
>>>>>> Hello,
>>>>>>
>>>>>> I've made my pull request.
>>>>>> https://github.com/AsyncHttpClient/async-http-client/pull/367
>>>>>>
>>>>>> With my usecase it works, the file is
>>>>>> uploaded like before.
>>>>>>
>>>>>>
>>>>>>
>>>>>> But I didn't notice a big memory improvement.
>>>>>>
>>>>>> Is it possible that SSL doesn't allow to
>>>>>> stream the body or something like that?
>>>>>>
>>>>>>
>>>>>>
>>>>>> In memory, I have a lot of:
>>>>>> - HeapByteBuffer
>>>>>> Which are hold by SSLUtils$3
>>>>>> Which are hold by BufferBuffers
>>>>>> Which are hold by WriteResult
>>>>>> Which are hold by AsyncWriteQueueRecord
>>>>>>
>>>>>>
>>>>>> Here is an exemple of the OOM stacktrace:
>>>>>>
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>> at
>>>>>> java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>> at
>>>>>> java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>>>> at
>>>>>> org.glassfish.grizzly.ssl.SSLUtils.allocateOutputBuffer(SSLUtils.java:342)
>>>>>> at
>>>>>> org.glassfish.grizzly.ssl.SSLBaseFilter$2.grow(SSLBaseFilter.java:117)
>>>>>> at
>>>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.ensureBufferSize(SSLConnectionContext.java:392)
>>>>>> at
>>>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.wrap(SSLConnectionContext.java:272)
>>>>>> at
>>>>>> org.glassfish.grizzly.ssl.SSLConnectionContext.wrapAll(SSLConnectionContext.java:227)
>>>>>> at
>>>>>> org.glassfish.grizzly.ssl.SSLBaseFilter.wrapAll(SSLBaseFilter.java:404)
>>>>>> at
>>>>>> org.glassfish.grizzly.ssl.SSLBaseFilter.handleWrite(SSLBaseFilter.java:319)
>>>>>> at
>>>>>> org.glassfish.grizzly.ssl.SSLFilter.accurateWrite(SSLFilter.java:255)
>>>>>> at
>>>>>> org.glassfish.grizzly.ssl.SSLFilter.handleWrite(SSLFilter.java:143)
>>>>>> at
>>>>>> com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider$SwitchingSSLFilter.handleWrite(GrizzlyAsyncHttpProvider.java:2503)
>>>>>> at
>>>>>> org.glassfish.grizzly.filterchain.ExecutorResolver$8.execute(ExecutorResolver.java:111)
>>>>>> at
>>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:288)
>>>>>> at
>>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:206)
>>>>>> at
>>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:136)
>>>>>> at
>>>>>> org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:114)
>>>>>> at
>>>>>> org.glassfish.grizzly.ProcessorExecutor.execute(ProcessorExecutor.java:77)
>>>>>> at
>>>>>> org.glassfish.grizzly.filterchain.FilterChainContext.write(FilterChainContext.java:853)
>>>>>> at
>>>>>> org.glassfish.grizzly.filterchain.FilterChainContext.write(FilterChainContext.java:720)
>>>>>> at
>>>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.flushQueue(FeedableBodyGenerator.java:132)
>>>>>> at
>>>>>> com.ning.http.client.providers.grizzly.FeedableBodyGenerator.feed(FeedableBodyGenerator.java:101)
>>>>>> at
>>>>>> com.ning.http.client.providers.grizzly.MultipartBodyGeneratorFeeder$FeedBodyGeneratorOutputStream.write(MultipartBodyGeneratorFeeder.java:222)
>>>>>> at
>>>>>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>>>>>> at
>>>>>> java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
>>>>>> at
>>>>>> com.ning.http.multipart.FilePart.sendData(FilePart.java:179)
>>>>>> at
>>>>>> com.ning.http.multipart.Part.send(Part.java:331)
>>>>>> at
>>>>>> com.ning.http.multipart.Part.sendParts(Part.java:397)
>>>>>> at
>>>>>> com.ning.http.client.providers.grizzly.MultipartBodyGeneratorFeeder.feed(MultipartBodyGeneratorFeeder.java:144)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any idea?
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2013/8/27 Ryan Lubke <ryan.lubke_at_oracle.com
>>>>>> <mailto:ryan.lubke_at_oracle.com>>
>>>>>>
>>>>>> Excellent! Looking forward to the pull
>>>>>> request!
>>>>>>
>>>>>>
>>>>>> Sébastien Lorber wrote:
>>>>>>> Ryan thanks, it works fine, I'll make a
>>>>>>> pull request on AHC tomorrow with a
>>>>>>> better code using the same Part classes
>>>>>>> that already exist.
>>>>>>>
>>>>>>> I created an OutputStream that redirects
>>>>>>> to the BodyGenerator feeder.
>>>>>>>
>>>>>>> The problem I currently have is that the
>>>>>>> feeder feeds the queue faster than the
>>>>>>> async thread polling it :)
>>>>>>> I need to expose a limit to that queue
>>>>>>> size or something, will work on that, it
>>>>>>> will be better than a thread sleep to
>>>>>>> slow down the filepart reading
>>>>>>>
>>>>>>>
>>>>>>> 2013/8/27 Ryan Lubke
>>>>>>> <ryan.lubke_at_oracle.com
>>>>>>> <mailto:ryan.lubke_at_oracle.com>>
>>>>>>>
>>>>>>> Yes, something like that. I was
>>>>>>> going to tackle adding something
>>>>>>> like this today. I'll follow up
>>>>>>> with something you can test out.
>>>>>>>
>>>>>>>
>>>>>>> Sébastien Lorber wrote:
>>>>>>>> Ok thanks!
>>>>>>>>
>>>>>>>> I think I see what I could do,
>>>>>>>> probably something like that:
>>>>>>>>
>>>>>>>>
>>>>>>>> FeedableBodyGenerator
>>>>>>>> bodyGenerator = new
>>>>>>>> FeedableBodyGenerator();
>>>>>>>> MultipartBodyGeneratorFeeder
>>>>>>>> bodyGeneratorFeeder = new
>>>>>>>> MultipartBodyGeneratorFeeder(bodyGenerator);
>>>>>>>> Request uploadRequest1 = new
>>>>>>>> RequestBuilder("POST")
>>>>>>>> .setUrl("url")
>>>>>>>> .setBody(bodyGenerator)
>>>>>>>> .build();
>>>>>>>>
>>>>>>>> ListenableFuture<Response>
>>>>>>>> asyncRes = asyncHttpClient
>>>>>>>> .prepareRequest(uploadRequest1)
>>>>>>>> .execute(new
>>>>>>>> AsyncCompletionHandlerBase());
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> bodyGeneratorFeeder.append("param1","value1");
>>>>>>>>
>>>>>>>> bodyGeneratorFeeder.append("param2","value2");
>>>>>>>>
>>>>>>>> bodyGeneratorFeeder.append("fileToUpload",fileInputStream);
>>>>>>>> bodyGeneratorFeeder.end();
>>>>>>>>
>>>>>>>> Response uploadResponse =
>>>>>>>> asyncRes.get();
>>>>>>>>
>>>>>>>>
>>>>>>>> Does it seem ok to you?
>>>>>>>>
>>>>>>>> I guess it could be interesting to
>>>>>>>> provide that
>>>>>>>> MultipartBodyGeneratorFeeder class
>>>>>>>> to AHC or Grizzly since some other
>>>>>>>> people may want to achieve the same
>>>>>>>> thing
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2013/8/26 Ryan Lubke
>>>>>>>> <ryan.lubke_at_oracle.com
>>>>>>>> <mailto:ryan.lubke_at_oracle.com>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Sébastien Lorber wrote:
>>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I would like to know if
>>>>>>>> it's possible to upload a
>>>>>>>> file with AHC / Grizzly in
>>>>>>>> streaming, I mean without
>>>>>>>> loading the whole file
>>>>>>>> bytes in memory.
>>>>>>>>
>>>>>>>> The default behavior seems
>>>>>>>> to allocate a byte[] which
>>>>>>>> contans the whole file, so
>>>>>>>> it means that my server can
>>>>>>>> be OOM if too many users
>>>>>>>> upload a large file in the
>>>>>>>> same time.
>>>>>>>>
>>>>>>>>
>>>>>>>> I've tryied with a Heap and
>>>>>>>> ByteBuffer memory managers,
>>>>>>>> with reallocate=true/false
>>>>>>>> but no more success.
>>>>>>>>
>>>>>>>> It seems the whole file
>>>>>>>> content is appended wto the
>>>>>>>> BufferOutputStream, and
>>>>>>>> then the underlying buffer
>>>>>>>> is written.
>>>>>>>>
>>>>>>>> At least this seems to be
>>>>>>>> the case with AHC integration:
>>>>>>>> https://github.com/AsyncHttpClient/async-http-client/blob/6faf1f316e5546110b0779a5a42fd9d03ba6bc15/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/bodyhandler/PartsBodyHandler.java
>>>>>>>>
>>>>>>>>
>>>>>>>> So, is there a way to patch
>>>>>>>> AHC to stream the file so
>>>>>>>> that I could eventually
>>>>>>>> consume only 20mo of heap
>>>>>>>> while uploading a 500mo file?
>>>>>>>> Or is this simply
>>>>>>>> impossible with Grizzly?
>>>>>>>> I didn't notice anything
>>>>>>>> related to that in the
>>>>>>>> documentation.
>>>>>>>>
>>>>>>>> It's possible with the
>>>>>>>> FeedableBodyGenerator. But if
>>>>>>>> you're tied to using Multipart
>>>>>>>> uploads, you'd have to convert
>>>>>>>> the multipart data to Buffers
>>>>>>>> manually and send using the
>>>>>>>> FeedableBodyGenerator.
>>>>>>>> I'll take a closer look to see
>>>>>>>> if this area can be improved.
>>>>>>>>
>>>>>>>>
>>>>>>>> Btw in my case it is a file
>>>>>>>> upload. I receive a file
>>>>>>>> with CXF and have to
>>>>>>>> transmit it to a storage
>>>>>>>> server (like S3). CXF
>>>>>>>> doesn't consume memory
>>>>>>>> bevause it is streaming the
>>>>>>>> large fle uploads to the
>>>>>>>> file system, and then
>>>>>>>> provides an input stream on
>>>>>>>> that file.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>