Hi Santiago
Sounds good, thanks,
Sergey
On 07/03/17 19:08, Santiago Pericas-Geertsen wrote:
> Sergey,
>
> Got it, from an API’s perspective, this was using the extensions to
> Request and Response that were proposed originally.
>
> The new proposal has the following benefits:
>
> (1) It is not limited to using low-level byte streams
>
> (2) Can be extended to manage a stream of thingies (like Pojos, see
> examples by Pavel)
>
> (3) SSE could potentially be retrofitted as a special case where the
> thingies are messages
>
> (4) It better integrates with reactive frameworks that can produce
> Publisher<A>, for example:
>
> Publisher<Pokemon> findThem() {
> return DB.query(Pokemon.class).filter(…);
> }
>
> — Santiago
>
>> On Mar 7, 2017, at 11:40 AM, Sergey Beryozkin <sberyozkin_at_talend.com
>> <mailto:sberyozkin_at_talend.com>> wrote:
>>
>> Hi Santiago, yes, sure here are the details:
>>
>> 1. NIO MBW:
>>
>> https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioMessageBodyWriter.java#L53
>>
>> there the 1st we do is to obtain a CXF specific 'Continuation' which
>> is in case of HTTP is a wrapper around Servlet 3.1 AsyncContext which
>> enforces that the the invocation is asynchronous-aware.
>>
>> then, the next 3 lines is about registering a WriteListener with the
>> ServletOutputStream, with the WriteListener implemented as follows:
>> https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java
>>
>> though we continue interacting with the underlying Servlet API with
>> something like "cont.isReadyForWrite()"
>>
>> finally, after this listener is registered, we suspend and return the
>> current thread to the container:
>>
>> https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioMessageBodyWriter.java#L58
>>
>> with the registered listener finishing the response on the callback
>> thread(s).
>>
>> 2. NIO MBR:
>>
>> Sorry I forgot we do not have an MBR, it is just done here:
>>
>> https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/RequestImpl.java#L410
>>
>> with the actual listener being also straightforward:
>> https://github.com/apache/cxf/blob/master/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioReadListenerImpl.java
>>
>> which works OK with the samples you provided earlier on where the NIO
>> read process depends on @Suspended
>> Though I can imagine how one can similarly implement MBR with the Flow.
>>
>> Looks like the new proposed NIO writer/reader handlers can allow for
>> a much nicer looking implementation.
>> However these are two new interfaces and we'd need to determine where
>> do they sit in the chain relative for example to
>> WriterInterceptor and ContainerResponseFilter, sorting order, etc.
>>
>> Can you please give it another thought to the possibility of making
>> it all happen with the MBR/MBW ?
>> Quite likely the new interfaces will make it simpler, I'd only like
>> to make sure we do not add more interfaces unless absolutely needed.
>>
>> Thanks, Sergey
>>
>>
>> On 07/03/17 15:16, Santiago Pericas-Geertsen wrote:
>>> Hi Sergey,
>>>
>>> Are you referring the NIO proposal we discussed in this context a
>>> while back? If the integration is at the level of MBR/MBW, how does
>>> application code take advantage of NIO features? Perhaps you can
>>> share some links to the implementation that you are referring to.
>>>
>>> — Santiago
>>>
>>>> On Mar 7, 2017, at 10:02 AM, Sergey Beryozkin
>>>> <sberyozkin_at_talend.com <mailto:sberyozkin_at_talend.com>> wrote:
>>>>
>>>> And just to clarify, these MBR/MBW we use to handle NIO did not
>>>> obviously started reading or writing immediately on the same thread
>>>> which entered these MBR/MBW. All we did was to make sure the same
>>>> existing providers are working without the core runtime doing
>>>> anything related to NIO
>>>>
>>>> On 07/03/17 14:59, Sergey Beryozkin wrote:
>>>>> Hi
>>>>>
>>>>> The way we implemented the initial NIO draft was to provide
>>>>> MessageBodyWriter/Reader which would use
>>>>> to link InputStream/OutputStream to the corresponding
>>>>> ServletInputStream/OutputStream callback handlers notifying our code
>>>>> when the write or read was possible.
>>>>>
>>>>> I.e the fact MBW or MBR accepts Input/Output stream did not stop
>>>>> us from implementing NIO draft.
>>>>>
>>>>> May be the introduction of the new NIO-specific MBW/MBR interfaces
>>>>> is justified but these are internal providers the users who would
>>>>> link Flow with other high-level reactive APIs won't use directly
>>>>> thus I'm wondering are they really needed ?
>>>>>
>>>>> Thanks, Sergey
>>>>> On 07/03/17 14:25, Pavel Bucek wrote:
>>>>>>
>>>>>> Dear EG members,
>>>>>>
>>>>>> please allow me to share the direction of what we are thinking
>>>>>> about JAX-RS NIO support.
>>>>>>
>>>>>> As stated before, SSE wasn't the only place were Flow APIs should
>>>>>> be utilized - NIO is another area where it can be utilized quite
>>>>>> heavily. And that was mostly the main idea - to see where it does
>>>>>> make sense to use that.
>>>>>>
>>>>>> Similarly to SSE, there is a plan to minimize / extract the Flow
>>>>>> to different classes, but there should always be a clear path how
>>>>>> to convert an instance into Flow.* (or org.reactivestreams.*)
>>>>>> interfaces. It is not done yet, to keep things as clear as possible.
>>>>>>
>>>>>> One of the bigger concerns is backwards compatibility. There are
>>>>>> interfaces, which do work directly with Input/Output stream,
>>>>>> which is always a problem for reactive processing, since it is
>>>>>> blocking by design. The specification will need to say something
>>>>>> like "The runtime is non-blocking as long as the application code
>>>>>> doesn't read from a provided InputStream or doesn't set an
>>>>>> OutputStream."
>>>>>>
>>>>>> The motivation for doing this is to allow the JAX-RS apps to be
>>>>>> non-blocking and reactive. JAX-RS 2.0 entity handling is designed
>>>>>> around reading / producing Input / Output Stream, which is
>>>>>> blocking by design. Non-blocking approach should result in higher
>>>>>> throughput and better resource utilization of the server. Also,
>>>>>> integration and coexistence with modern reactive frameworks
>>>>>> should be possible to do without losing the advantage of having
>>>>>> that framework (which was almost completely lost when dealing
>>>>>> with blocking inputs/outputs).
>>>>>>
>>>>>> Let's jump into code snippets.
>>>>>>
>>>>>>
>>>>>> *Server - EX1 (byte handling):*
>>>>>>
>>>>>> Snippet below shows how to process request entity body return
>>>>>> response entity body, using Publisher<ByteBuffer> - no
>>>>>> MessageBodyReader/Writer is involved. This can be used as a
>>>>>> low-level integration point for other frameworks, which are also
>>>>>> reactive and will do the processing, like
>>>>>> serializing/deserializing (mapping) to some java type, filtering,
>>>>>> etc. Returning a Publisher<ByteBuffer> is a reactive/nio
>>>>>> alternative to javax.ws.rs.core.StreamingOutput, consuming an
>>>>>> entity using Publisher<ByteBuffer> is a reactive/nio alternative
>>>>>> to consuming entity as an InputStream.
>>>>>>
>>>>>> @POST @Path("/ex1")
>>>>>> public Flow.Publisher<ByteBuffer> ex1(Flow.Publisher<ByteBuffer> entity) {
>>>>>> Ex1Processor processor =new Ex1Processor();
>>>>>>
>>>>>> entity.subscribe(processor);
>>>>>> return processor;
>>>>>> }
>>>>>>
>>>>>> // ex1 processor public static class Ex1Processorimplements Flow.Processor<ByteBuffer, ByteBuffer> {
>>>>>>
>>>>>> // ... }
>>>>>>
>>>>>> And there is already an issue, which is not clearly solved.
>>>>>>
>>>>>> Returning a Publisher instance from the resource method does put
>>>>>> some constraints on the Publisher itself - it needs to cache all
>>>>>> events which will be emitted prior subscription of the jax-rs
>>>>>> implementation Subscriber instance (which is the only way how to
>>>>>> get the data from a Publisher).
>>>>>>
>>>>>> This can be solved by stating that *request* entity publisher
>>>>>> won't produce any events until the resource method is invoked and
>>>>>> the implementation Subscriber subscribed to the returned
>>>>>> *response* entity publisher. Or the resource method can return
>>>>>> Consumer<Flow.Subscriber<ByteBuffer>>, which would effectively
>>>>>> grant control of the implementation subscription process. Any
>>>>>> comments or suggestions welcomed *[ref Q1]*.
>>>>>>
>>>>>>
>>>>>> *Server - EX2 (consuming stream of pojos):*
>>>>>>
>>>>>> The next example should be slightly more straightforward. It
>>>>>> shows how to process a Publisher of custom type, in this case
>>>>>> called "POJO".
>>>>>>
>>>>>> The only limitation or rule here would be that the subscriber for
>>>>>> the request entity must be subscribed before the resource method
>>>>>> "ex2" invocation ends.
>>>>>>
>>>>>> New interface is introduced here - NioBodyReader. It has exactly
>>>>>> the same responsibility as good old MessageBodyReader, but
>>>>>> without using a blocking OutputStream to write the entity. Note
>>>>>> that the "core" type is the Publisher<ByteBuffer>, which is in
>>>>>> this case mapped (or converted) into Publisher of POJOs.
>>>>>>
>>>>>>
>>>>>> @POST @Path("/ex2")
>>>>>> @Consumes(MediaType.APPLICATION_JSON)public void ex2(Flow.Publisher<POJO> entity,
>>>>>> @Suspended AsyncResponse response) {
>>>>>>
>>>>>> // TODO: introduce a helper or modify AsyncResponse to support
>>>>>> this pattern directly? entity.subscribe(
>>>>>> // POJO subscriber - consumer new Flow.Subscriber<POJO>() {
>>>>>> @Override public void onSubscribe(Flow.Subscription subscription) {
>>>>>> // ... }
>>>>>>
>>>>>> @Override public void onNext(POJO item) {
>>>>>> // ... }
>>>>>>
>>>>>> @Override public void onError(Throwable throwable) {
>>>>>> response.resume(throwable);
>>>>>> }
>>>>>>
>>>>>> @Override public void onComplete() {
>>>>>> response.resume(Response.ok().build());
>>>>>> }
>>>>>> }
>>>>>> );
>>>>>> }
>>>>>>
>>>>>> @Provider
>>>>>> @Consumes(MediaType.APPLICATION_JSON)
>>>>>> public static class Ex2NioBodyReaderimplements NioBodyReader<POJO> {
>>>>>> @Override public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
>>>>>> return true;
>>>>>> }
>>>>>>
>>>>>> @Override public Flow.Publisher<POJO> readFrom(Flow.Publisher<ByteBuffer> entity,
>>>>>> Class<POJO> type,
>>>>>> Type genericType,
>>>>>> Annotation[] annotations,
>>>>>> MediaType mediaType,
>>>>>> MultivaluedMap<String, String> httpHeaders) {
>>>>>> Ex2MappingProcessor mappingProcessor =new Ex2MappingProcessor();
>>>>>> entity.subscribe(mappingProcessor);
>>>>>> return mappingProcessor;
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> // mapping Publisher<ByteBuffer> to Publisher<POJO> //
>>>>>> ByteBuffers are expected to contain JSON (indicated by @Consumes
>>>>>> on the resource method and NioBodyReader). public static class Ex2MappingProcessorimplements Flow.Subscriber<ByteBuffer>, Flow.Publisher<POJO> {
>>>>>> // ... }
>>>>>> Same issue as *[Q1]* is valid for this as well - same solution
>>>>>> will need to be applied for "readFrom" method.
>>>>>>
>>>>>> Another issue is about what should be passed to "isReadable"
>>>>>> method as "type" parameter. I'm not exactly sure whether we can
>>>>>> safely obtain generic type of a parameter from the resource
>>>>>> method (public void ex2(Flow.Publisher< *POJO*> entity, ..)). Any
>>>>>> comments/suggestions welcomed *[ref Q2]*.
>>>>>>
>>>>>> Note that using @Suspended shouldn't be enforced here; it should
>>>>>> be possible to return a Response directly and still be able to
>>>>>> consume the *request* entity.
>>>>>>
>>>>>>
>>>>>> *Server - EX3 (producing list of POJOs):*
>>>>>>
>>>>>> The last (for now) example shows how we can produce and write
>>>>>> POJOs. Resource method doesn't take any parameters and provides a
>>>>>> Publisher of POJO objects, which will be converted to JSON in
>>>>>> NioBodyWriter. NioBodyReader is a reactive alternative to
>>>>>> MessageBodyReader from older version of the specification.
>>>>>>
>>>>>> @GET @Path("/ex3")@Produces(MediaType.APPLICATION_JSON)public Flow.Publisher<POJO> ex3() {
>>>>>>
>>>>>> Flow.Publisher<POJO> pojoPublisher =null;
>>>>>>
>>>>>> // source of the POJO "stream" can be anything - database call,
>>>>>> client call to // another service, ... // // DB //
>>>>>> .getEmployees(department) // StreamPublisher<EmployeeDbModel> --
>>>>>> reactive stream // .map((Function<EmployeeDbModel,
>>>>>> EmployeeToReturn>) employeeDbModel -> { // // ... // }); return pojoPublisher;
>>>>>> }
>>>>>>
>>>>>> @Provider @Produces(MediaType.APPLICATION_JSON)
>>>>>> public static class Ex3NioBodyWriterimplements NioBodyWriter<POJO> {
>>>>>> @Override public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
>>>>>> return true;
>>>>>> }
>>>>>>
>>>>>> @Override public void writeTo(Flow.Publisher<POJO> entityObjectPublisher,
>>>>>> Flow.Subscriber<ByteBuffer> subscriber,
>>>>>> Class<?> type,
>>>>>> Type genericType,
>>>>>> Annotation[] annotations,
>>>>>> MediaType mediaType,
>>>>>> MultivaluedMap<String, Object> httpHeaders) {
>>>>>> // map Publisher<POJO> to Publisher<ByteBuffer> and subscribe
>>>>>> Flow.Subscriber<ByteBuffer> to it. }
>>>>>> }
>>>>>>
>>>>>> Resource method is minimalistic, *[Q1]* applies here as well.
>>>>>>
>>>>>> The example introduces NioBodyWriter and its isWriteable method
>>>>>> does have *[Q2]*, similarly to NioBodyReader. #writeTo doesn't
>>>>>> have any issues - *[Q1]* is mitigated there because the
>>>>>> implementation passes a supplier to the implementation - there
>>>>>> doesn't need to be anything returned. Something similar might be
>>>>>> able to do for NioBodyWriter as well.
>>>>>>
>>>>>>
>>>>>> Comment to writing multiple POJO instances:
>>>>>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/jaxrs-api/src/main/java/javax/ws/rs/ext/NioBodyWriter.java#L82
>>>>>>
>>>>>> Important point to mention is that even when producing multiple
>>>>>> instances, the intention here is still to return the single HTTP
>>>>>> response.
>>>>>>
>>>>>> ===
>>>>>>
>>>>>> We have more, but this email is already too long - I will post
>>>>>> more after there is some feedback on the presented concepts and
>>>>>> issues. Please let us know if this format is OK or if you'd
>>>>>> prefer something else - I guess I could do a screencast, hangout
>>>>>> or something similar.
>>>>>>
>>>>>> Source links:
>>>>>>
>>>>>> - complete server example:
>>>>>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioResource.java
>>>>>> - client (to be discussed):
>>>>>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioClient.java
>>>>>> - server side processing (including interceptors):
>>>>>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/ServerSideProcessing.java
>>>>>>
>>>>>> (Not including direct link to individual examples, since we will
>>>>>> continue working on them...)
>>>>>>
>>>>>> Looking forward to your feedback!
>>>>>>
>>>>>> Thanks and regards,
>>>>>> Pavel & Santiago
>>>>>
>>>>>
>>>>
>>>
>>
>