users@jax-rs-spec.java.net

[jax-rs-spec users] Re: NIO API review / request for feedback

From: Sergey Beryozkin <sberyozkin_at_talend.com>
Date: Tue, 7 Mar 2017 15:02:27 +0000

And just to clarify, these MBR/MBW we use to handle NIO did not
obviously started reading or writing immediately on the same thread
which entered these MBR/MBW. All we did was to make sure the same
existing providers are working without the core runtime doing anything
related to NIO

On 07/03/17 14:59, Sergey Beryozkin wrote:
> Hi
>
> The way we implemented the initial NIO draft was to provide
> MessageBodyWriter/Reader which would use
> to link InputStream/OutputStream to the corresponding
> ServletInputStream/OutputStream callback handlers notifying our code
> when the write or read was possible.
>
> I.e the fact MBW or MBR accepts Input/Output stream did not stop us
> from implementing NIO draft.
>
> May be the introduction of the new NIO-specific MBW/MBR interfaces is
> justified but these are internal providers the users who would link
> Flow with other high-level reactive APIs won't use directly thus I'm
> wondering are they really needed ?
>
> Thanks, Sergey
> On 07/03/17 14:25, Pavel Bucek wrote:
>>
>> Dear EG members,
>>
>> please allow me to share the direction of what we are thinking about
>> JAX-RS NIO support.
>>
>> As stated before, SSE wasn't the only place were Flow APIs should be
>> utilized - NIO is another area where it can be utilized quite
>> heavily. And that was mostly the main idea - to see where it does
>> make sense to use that.
>>
>> Similarly to SSE, there is a plan to minimize / extract the Flow to
>> different classes, but there should always be a clear path how to
>> convert an instance into Flow.* (or org.reactivestreams.*)
>> interfaces. It is not done yet, to keep things as clear as possible.
>>
>> One of the bigger concerns is backwards compatibility. There are
>> interfaces, which do work directly with Input/Output stream, which is
>> always a problem for reactive processing, since it is blocking by
>> design. The specification will need to say something like "The
>> runtime is non-blocking as long as the application code doesn't read
>> from a provided InputStream or doesn't set an OutputStream."
>>
>> The motivation for doing this is to allow the JAX-RS apps to be
>> non-blocking and reactive. JAX-RS 2.0 entity handling is designed
>> around reading / producing Input / Output Stream, which is blocking
>> by design. Non-blocking approach should result in higher throughput
>> and better resource utilization of the server. Also, integration and
>> coexistence with modern reactive frameworks should be possible to do
>> without losing the advantage of having that framework (which was
>> almost completely lost when dealing with blocking inputs/outputs).
>>
>> Let's jump into code snippets.
>>
>>
>> *Server - EX1 (byte handling):*
>>
>> Snippet below shows how to process request entity body return
>> response entity body, using Publisher<ByteBuffer> - no
>> MessageBodyReader/Writer is involved. This can be used as a low-level
>> integration point for other frameworks, which are also reactive and
>> will do the processing, like serializing/deserializing (mapping) to
>> some java type, filtering, etc. Returning a Publisher<ByteBuffer> is
>> a reactive/nio alternative to javax.ws.rs.core.StreamingOutput,
>> consuming an entity using Publisher<ByteBuffer> is a reactive/nio
>> alternative to consuming entity as an InputStream.
>>
>> @POST @Path("/ex1")
>> public Flow.Publisher<ByteBuffer> ex1(Flow.Publisher<ByteBuffer> entity) {
>> Ex1Processor processor =new Ex1Processor();
>>
>> entity.subscribe(processor);
>> return processor;
>> }
>>
>> // ex1 processor public static class Ex1Processorimplements Flow.Processor<ByteBuffer, ByteBuffer> {
>>
>> // ... }
>>
>> And there is already an issue, which is not clearly solved.
>>
>> Returning a Publisher instance from the resource method does put some
>> constraints on the Publisher itself - it needs to cache all events
>> which will be emitted prior subscription of the jax-rs implementation
>> Subscriber instance (which is the only way how to get the data from a
>> Publisher).
>>
>> This can be solved by stating that *request* entity publisher won't
>> produce any events until the resource method is invoked and the
>> implementation Subscriber subscribed to the returned *response*
>> entity publisher. Or the resource method can return
>> Consumer<Flow.Subscriber<ByteBuffer>>, which would effectively grant
>> control of the implementation subscription process. Any comments or
>> suggestions welcomed *[ref Q1]*.
>>
>>
>> *Server - EX2 (consuming stream of pojos):*
>>
>> The next example should be slightly more straightforward. It shows
>> how to process a Publisher of custom type, in this case called "POJO".
>>
>> The only limitation or rule here would be that the subscriber for the
>> request entity must be subscribed before the resource method "ex2"
>> invocation ends.
>>
>> New interface is introduced here - NioBodyReader. It has exactly the
>> same responsibility as good old MessageBodyReader, but without using
>> a blocking OutputStream to write the entity. Note that the "core"
>> type is the Publisher<ByteBuffer>, which is in this case mapped (or
>> converted) into Publisher of POJOs.
>>
>> @POST @Path("/ex2")
>> @Consumes(MediaType.APPLICATION_JSON)public void ex2(Flow.Publisher<POJO> entity,
>> @Suspended AsyncResponse response) {
>>
>> // TODO: introduce a helper or modify AsyncResponse to support this
>> pattern directly? entity.subscribe(
>> // POJO subscriber - consumer new Flow.Subscriber<POJO>() {
>> @Override public void onSubscribe(Flow.Subscription subscription) {
>> // ... }
>>
>> @Override public void onNext(POJO item) {
>> // ... }
>>
>> @Override public void onError(Throwable throwable) {
>> response.resume(throwable);
>> }
>>
>> @Override public void onComplete() {
>> response.resume(Response.ok().build());
>> }
>> }
>> );
>> }
>>
>> @Provider
>> @Consumes(MediaType.APPLICATION_JSON)
>> public static class Ex2NioBodyReaderimplements NioBodyReader<POJO> {
>> @Override public boolean isReadable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
>> return true;
>> }
>>
>> @Override public Flow.Publisher<POJO> readFrom(Flow.Publisher<ByteBuffer> entity,
>> Class<POJO> type,
>> Type genericType,
>> Annotation[] annotations,
>> MediaType mediaType,
>> MultivaluedMap<String, String> httpHeaders) {
>> Ex2MappingProcessor mappingProcessor =new Ex2MappingProcessor();
>> entity.subscribe(mappingProcessor);
>> return mappingProcessor;
>> }
>> }
>>
>> // mapping Publisher<ByteBuffer> to Publisher<POJO> // ByteBuffers
>> are expected to contain JSON (indicated by @Consumes on the resource
>> method and NioBodyReader). public static class Ex2MappingProcessorimplements Flow.Subscriber<ByteBuffer>, Flow.Publisher<POJO> {
>> // ... }
>> Same issue as *[Q1]* is valid for this as well - same solution will
>> need to be applied for "readFrom" method.
>>
>> Another issue is about what should be passed to "isReadable" method
>> as "type" parameter. I'm not exactly sure whether we can safely
>> obtain generic type of a parameter from the resource method (public
>> void ex2(Flow.Publisher< *POJO*> entity, ..)). Any
>> comments/suggestions welcomed *[ref Q2]*.
>>
>> Note that using @Suspended shouldn't be enforced here; it should be
>> possible to return a Response directly and still be able to consume
>> the *request* entity.
>>
>>
>> *Server - EX3 (producing list of POJOs):*
>>
>> The last (for now) example shows how we can produce and write POJOs.
>> Resource method doesn't take any parameters and provides a Publisher
>> of POJO objects, which will be converted to JSON in NioBodyWriter.
>> NioBodyReader is a reactive alternative to MessageBodyReader from
>> older version of the specification.
>>
>> @GET @Path("/ex3")@Produces(MediaType.APPLICATION_JSON)public Flow.Publisher<POJO> ex3() {
>>
>> Flow.Publisher<POJO> pojoPublisher =null;
>>
>> // source of the POJO "stream" can be anything - database call,
>> client call to // another service, ... // // DB //
>> .getEmployees(department) // StreamPublisher<EmployeeDbModel> --
>> reactive stream // .map((Function<EmployeeDbModel, EmployeeToReturn>)
>> employeeDbModel -> { // // ... // }); return pojoPublisher;
>> }
>>
>> @Provider @Produces(MediaType.APPLICATION_JSON)
>> public static class Ex3NioBodyWriterimplements NioBodyWriter<POJO> {
>> @Override public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
>> return true;
>> }
>>
>> @Override public void writeTo(Flow.Publisher<POJO> entityObjectPublisher,
>> Flow.Subscriber<ByteBuffer> subscriber,
>> Class<?> type,
>> Type genericType,
>> Annotation[] annotations,
>> MediaType mediaType,
>> MultivaluedMap<String, Object> httpHeaders) {
>> // map Publisher<POJO> to Publisher<ByteBuffer> and subscribe
>> Flow.Subscriber<ByteBuffer> to it. }
>> }
>>
>> Resource method is minimalistic, *[Q1]* applies here as well.
>>
>> The example introduces NioBodyWriter and its isWriteable method does
>> have *[Q2]*, similarly to NioBodyReader. #writeTo doesn't have any
>> issues - *[Q1]* is mitigated there because the implementation passes
>> a supplier to the implementation - there doesn't need to be anything
>> returned. Something similar might be able to do for NioBodyWriter as
>> well.
>>
>>
>> Comment to writing multiple POJO instances:
>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/jaxrs-api/src/main/java/javax/ws/rs/ext/NioBodyWriter.java#L82
>>
>> Important point to mention is that even when producing multiple
>> instances, the intention here is still to return the single HTTP
>> response.
>>
>> ===
>>
>> We have more, but this email is already too long - I will post more
>> after there is some feedback on the presented concepts and issues.
>> Please let us know if this format is OK or if you'd prefer something
>> else - I guess I could do a screencast, hangout or something similar.
>>
>> Source links:
>>
>> - complete server example:
>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioResource.java
>> - client (to be discussed):
>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioClient.java
>> - server side processing (including interceptors):
>> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/ServerSideProcessing.java
>>
>> (Not including direct link to individual examples, since we will
>> continue working on them...)
>>
>> Looking forward to your feedback!
>>
>> Thanks and regards,
>> Pavel & Santiago
>
>