Hi Santiago, yes, sure here are the details:
1. NIO MBW:
https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioMessageBodyWriter.java#L53
there the 1st we do is to obtain a CXF specific 'Continuation' which is
in case of HTTP is a wrapper around Servlet 3.1 AsyncContext which
enforces that the the invocation is asynchronous-aware.
then, the next 3 lines is about registering a WriteListener with the
ServletOutputStream, with the WriteListener implemented as follows:
https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java
though we continue interacting with the underlying Servlet API with
something like "cont.isReadyForWrite()"
finally, after this listener is registered, we suspend and return the
current thread to the container:
https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioMessageBodyWriter.java#L58
with the registered listener finishing the response on the callback
thread(s).
2. NIO MBR:
Sorry I forgot we do not have an MBR, it is just done here:
https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java#L410
with the actual listener being also straightforward:
https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadListenerImpl.java
which works OK with the samples you provided earlier on where the NIO
read process depends on @Suspended
Though I can imagine how one can similarly implement MBR with the Flow.
Looks like the new proposed NIO writer/reader handlers can allow for a
much nicer looking implementation.
However these are two new interfaces and we'd need to determine where do
they sit in the chain relative for example to
WriterInterceptor and ContainerResponseFilter, sorting order, etc.
Can you please give it another thought to the possibility of making it
all happen with the MBR/MBW ?
Quite likely the new interfaces will make it simpler, I'd only like to
make sure we do not add more interfaces unless absolutely needed.
Thanks, Sergey
On 07/03/17 15:16, Santiago Pericas-Geertsen wrote:
> Hi Sergey,
>
> Are you referring the NIO proposal we discussed in this context a
> while back? If the integration is at the level of MBR/MBW, how does
> application code take advantage of NIO features? Perhaps you can share
> some links to the implementation that you are referring to.
>
> — Santiago
>
>> On Mar 7, 2017, at 10:02 AM, Sergey Beryozkin <sberyozkin_at_talend.com
>> <mailto:sberyozkin_at_talend.com>> wrote:
>>
>> And just to clarify, these MBR/MBW we use to handle NIO did not
>> obviously started reading or writing immediately on the same thread
>> which entered these MBR/MBW. All we did was to make sure the same
>> existing providers are working without the core runtime doing
>> anything related to NIO
>>
>> On 07/03/17 14:59, Sergey Beryozkin wrote:
>>> Hi
>>>
>>> The way we implemented the initial NIO draft was to provide
>>> MessageBodyWriter/Reader which would use
>>> to link InputStream/OutputStream to the corresponding
>>> ServletInputStream/OutputStream callback handlers notifying our code
>>> when the write or read was possible.
>>>
>>> I.e the fact MBW or MBR accepts Input/Output stream did not stop us
>>> from implementing NIO draft.
>>>
>>> May be the introduction of the new NIO-specific MBW/MBR interfaces
>>> is justified but these are internal providers the users who would
>>> link Flow with other high-level reactive APIs won't use directly
>>> thus I'm wondering are they really needed ?
>>>
>>> Thanks, Sergey
>>> On 07/03/17 14:25, Pavel Bucek wrote:
>>>>
>>>> Dear EG members,
>>>>
>>>> please allow me to share the direction of what we are thinking
>>>> about JAX-RS NIO support.
>>>>
>>>> As stated before, SSE wasn't the only place were Flow APIs should
>>>> be utilized - NIO is another area where it can be utilized quite
>>>> heavily. And that was mostly the main idea - to see where it does
>>>> make sense to use that.
>>>>
>>>> Similarly to SSE, there is a plan to minimize / extract the Flow to
>>>> different classes, but there should always be a clear path how to
>>>> convert an instance into Flow.* (or org.reactivestreams.*)
>>>> interfaces. It is not done yet, to keep things as clear as possible.
>>>>
>>>> One of the bigger concerns is backwards compatibility. There are
>>>> interfaces, which do work directly with Input/Output stream, which
>>>> is always a problem for reactive processing, since it is blocking
>>>> by design. The specification will need to say something like "The
>>>> runtime is non-blocking as long as the application code doesn't
>>>> read from a provided InputStream or doesn't set an OutputStream."
>>>>
>>>> The motivation for doing this is to allow the JAX-RS apps to be
>>>> non-blocking and reactive. JAX-RS 2.0 entity handling is designed
>>>> around reading / producing Input / Output Stream, which is blocking
>>>> by design. Non-blocking approach should result in higher throughput
>>>> and better resource utilization of the server. Also, integration
>>>> and coexistence with modern reactive frameworks should be possible
>>>> to do without losing the advantage of having that framework (which
>>>> was almost completely lost when dealing with blocking inputs/outputs).
>>>>
>>>> Let's jump into code snippets.
>>>>
>>>>
>>>> *Server - EX1 (byte handling):*
>>>>
>>>> Snippet below shows how to process request entity body return
>>>> response entity body, using Publisher<ByteBuffer> - no
>>>> MessageBodyReader/Writer is involved. This can be used as a
>>>> low-level integration point for other frameworks, which are also
>>>> reactive and will do the processing, like serializing/deserializing
>>>> (mapping) to some java type, filtering, etc. Returning a
>>>> Publisher<ByteBuffer> is a reactive/nio alternative to
>>>> javax.ws.rs.core.StreamingOutput, consuming an entity using
>>>> Publisher<ByteBuffer> is a reactive/nio alternative to consuming
>>>> entity as an InputStream.
>>>>
>>>> @POST @Path("/ex1")
>>>> public Flow.Publisher<ByteBuffer> ex1(Flow.Publisher<ByteBuffer> entity) {
>>>> Ex1Processor processor =new Ex1Processor();
>>>>
>>>> entity.subscribe(processor);
>>>> return processor;
>>>> }
>>>>
>>>> // ex1 processor public static class Ex1Processorimplements Flow.Processor<ByteBuffer, ByteBuffer> {
>>>>
>>>> // ... }
>>>>
>>>> And there is already an issue, which is not clearly solved.
>>>>
>>>> Returning a Publisher instance from the resource method does put
>>>> some constraints on the Publisher itself - it needs to cache all
>>>> events which will be emitted prior subscription of the jax-rs
>>>> implementation Subscriber instance (which is the only way how to
>>>> get the data from a Publisher).
>>>>
>>>> This can be solved by stating that *request* entity publisher won't
>>>> produce any events until the resource method is invoked and the
>>>> implementation Subscriber subscribed to the returned *response*
>>>> entity publisher. Or the resource method can return
>>>> Consumer<Flow.Subscriber<ByteBuffer>>, which would effectively
>>>> grant control of the implementation subscription process. Any
>>>> comments or suggestions welcomed *[ref Q1]*.
>>>>
>>>>
>>>> *Server - EX2 (consuming stream of pojos):*
>>>>
>>>> The next example should be slightly more straightforward. It shows
>>>> how to process a Publisher of custom type, in this case called "POJO".
>>>>
>>>> The only limitation or rule here would be that the subscriber for
>>>> the request entity must be subscribed before the resource method
>>>> "ex2" invocation ends.
>>>>
>>>> New interface is introduced here - NioBodyReader. It has exactly
>>>> the same responsibility as good old MessageBodyReader, but without
>>>> using a blocking OutputStream to write the entity. Note that the
>>>> "core" type is the Publisher<ByteBuffer>, which is in this case
>>>> mapped (or converted) into Publisher of POJOs.
>>>>
>>>>
>>>> @POST @Path("/ex2")
>>>> @Consumes(MediaType.APPLICATION_JSON)public void ex2(Flow.Publisher<POJO> entity,
>>>> @Suspended AsyncResponse response) {
>>>>
>>>> // TODO: introduce a helper or modify AsyncResponse to support this
>>>> pattern directly? entity.subscribe(
>>>> // POJO subscriber - consumer new Flow.Subscriber<POJO>() {
>>>> @Override public void onSubscribe(Flow.Subscription subscription) {
>>>> // ... }
>>>>
>>>> @Override public void onNext(POJO item) {
>>>> // ... }
>>>>
>>>> @Override public void onError(Throwable throwable) {
>>>> response.resume(throwable);
>>>> }
>>>>
>>>> @Override public void onComplete() {
>>>> response.resume(Response.ok().build());
>>>> }
>>>> }
>>>> );
>>>> }
>>>>
>>>> @Provider
>>>> @Consumes(MediaType.APPLICATION_JSON)
>>>> public static class Ex2NioBodyReaderimplements NioBodyReader<POJO> {
>>>> @Override public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
>>>> return true;
>>>> }
>>>>
>>>> @Override public Flow.Publisher<POJO> readFrom(Flow.Publisher<ByteBuffer> entity,
>>>> Class<POJO> type,
>>>> Type genericType,
>>>> Annotation[] annotations,
>>>> MediaType mediaType,
>>>> MultivaluedMap<String, String> httpHeaders) {
>>>> Ex2MappingProcessor mappingProcessor =new Ex2MappingProcessor();
>>>> entity.subscribe(mappingProcessor);
>>>> return mappingProcessor;
>>>> }
>>>> }
>>>>
>>>> // mapping Publisher<ByteBuffer> to Publisher<POJO> // ByteBuffers
>>>> are expected to contain JSON (indicated by @Consumes on the
>>>> resource method and NioBodyReader). public static class Ex2MappingProcessorimplements Flow.Subscriber<ByteBuffer>, Flow.Publisher<POJO> {
>>>> // ... }
>>>> Same issue as *[Q1]* is valid for this as well - same solution will
>>>> need to be applied for "readFrom" method.
>>>>
>>>> Another issue is about what should be passed to "isReadable" method
>>>> as "type" parameter. I'm not exactly sure whether we can safely
>>>> obtain generic type of a parameter from the resource method (public
>>>> void ex2(Flow.Publisher< *POJO*> entity, ..)). Any
>>>> comments/suggestions welcomed *[ref Q2]*.
>>>>
>>>> Note that using @Suspended shouldn't be enforced here; it should be
>>>> possible to return a Response directly and still be able to consume
>>>> the *request* entity.
>>>>
>>>>
>>>> *Server - EX3 (producing list of POJOs):*
>>>>
>>>> The last (for now) example shows how we can produce and write
>>>> POJOs. Resource method doesn't take any parameters and provides a
>>>> Publisher of POJO objects, which will be converted to JSON in
>>>> NioBodyWriter. NioBodyReader is a reactive alternative to
>>>> MessageBodyReader from older version of the specification.
>>>>
>>>> @GET @Path("/ex3")@Produces(MediaType.APPLICATION_JSON)public Flow.Publisher<POJO> ex3() {
>>>>
>>>> Flow.Publisher<POJO> pojoPublisher =null;
>>>>
>>>> // source of the POJO "stream" can be anything - database call,
>>>> client call to // another service, ... // // DB //
>>>> .getEmployees(department) // StreamPublisher<EmployeeDbModel> --
>>>> reactive stream // .map((Function<EmployeeDbModel,
>>>> EmployeeToReturn>) employeeDbModel -> { // // ... // }); return pojoPublisher;
>>>> }
>>>>
>>>> @Provider @Produces(MediaType.APPLICATION_JSON)
>>>> public static class Ex3NioBodyWriterimplements NioBodyWriter<POJO> {
>>>> @Override public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
>>>> return true;
>>>> }
>>>>
>>>> @Override public void writeTo(Flow.Publisher<POJO> entityObjectPublisher,
>>>> Flow.Subscriber<ByteBuffer> subscriber,
>>>> Class<?> type,
>>>> Type genericType,
>>>> Annotation[] annotations,
>>>> MediaType mediaType,
>>>> MultivaluedMap<String, Object> httpHeaders) {
>>>> // map Publisher<POJO> to Publisher<ByteBuffer> and subscribe
>>>> Flow.Subscriber<ByteBuffer> to it. }
>>>> }
>>>>
>>>> Resource method is minimalistic, *[Q1]* applies here as well.
>>>>
>>>> The example introduces NioBodyWriter and its isWriteable method
>>>> does have *[Q2]*, similarly to NioBodyReader. #writeTo doesn't have
>>>> any issues - *[Q1]* is mitigated there because the implementation
>>>> passes a supplier to the implementation - there doesn't need to be
>>>> anything returned. Something similar might be able to do for
>>>> NioBodyWriter as well.
>>>>
>>>>
>>>> Comment to writing multiple POJO instances:
>>>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/jaxrs-api/src/main/java/javax/ws/rs/ext/NioBodyWriter.java#L82
>>>>
>>>> Important point to mention is that even when producing multiple
>>>> instances, the intention here is still to return the single HTTP
>>>> response.
>>>>
>>>> ===
>>>>
>>>> We have more, but this email is already too long - I will post more
>>>> after there is some feedback on the presented concepts and issues.
>>>> Please let us know if this format is OK or if you'd prefer
>>>> something else - I guess I could do a screencast, hangout or
>>>> something similar.
>>>>
>>>> Source links:
>>>>
>>>> - complete server example:
>>>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioResource.java
>>>> - client (to be discussed):
>>>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioClient.java
>>>> - server side processing (including interceptors):
>>>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/ServerSideProcessing.java
>>>>
>>>> (Not including direct link to individual examples, since we will
>>>> continue working on them...)
>>>>
>>>> Looking forward to your feedback!
>>>>
>>>> Thanks and regards,
>>>> Pavel & Santiago
>>>
>>>
>>
>