Hi Markus,
seems like you indeed missed it, but this one is easy to miss - it's
only about single method for now, called "addProcessor".
Please see
https://github.com/pavelbucek/jax-rs/blob/nio/examples/src/main/java/jaxrs/examples/nio/ServerSideProcessing.java
There are several methods added on Filters and interceptors, which add
the similar functionality as are added for resource methods and Body
Readers/Writers.
Regards,
Pavel
On 10/03/2017 18:16, Markus KARG wrote:
>
> Pavel,
>
> maybe I missed it, but is it planned to also provide a non-blocking
> replacement for the InputStream / OutputStream methods of Filters and
> other streaming components?
>
> Thanks
>
> -Markus
>
> *From:*Pavel Bucek [mailto:pavel.bucek_at_oracle.com]
> *Sent:* Dienstag, 7. März 2017 15:26
> *To:* jsr370-experts_at_jax-rs-spec.java.net
> *Subject:* NIO API review / request for feedback
>
> Dear EG members,
>
> please allow me to share the direction of what we are thinking about
> JAX-RS NIO support.
>
> As stated before, SSE wasn't the only place were Flow APIs should be
> utilized - NIO is another area where it can be utilized quite heavily.
> And that was mostly the main idea - to see where it does make sense to
> use that.
>
> Similarly to SSE, there is a plan to minimize / extract the Flow to
> different classes, but there should always be a clear path how to
> convert an instance into Flow.* (or org.reactivestreams.*) interfaces.
> It is not done yet, to keep things as clear as possible.
>
> One of the bigger concerns is backwards compatibility. There are
> interfaces, which do work directly with Input/Output stream, which is
> always a problem for reactive processing, since it is blocking by
> design. The specification will need to say something like "The runtime
> is non-blocking as long as the application code doesn't read from a
> provided InputStream or doesn't set an OutputStream."
>
> The motivation for doing this is to allow the JAX-RS apps to be
> non-blocking and reactive. JAX-RS 2.0 entity handling is designed
> around reading / producing Input / Output Stream, which is blocking by
> design. Non-blocking approach should result in higher throughput and
> better resource utilization of the server. Also, integration and
> coexistence with modern reactive frameworks should be possible to do
> without losing the advantage of having that framework (which was
> almost completely lost when dealing with blocking inputs/outputs).
>
> Let's jump into code snippets.
>
> *Server - EX1 (byte handling):*
>
> Snippet below shows how to process request entity body return response
> entity body, using Publisher<ByteBuffer> - no MessageBodyReader/Writer
> is involved. This can be used as a low-level integration point for
> other frameworks, which are also reactive and will do the processing,
> like serializing/deserializing (mapping) to some java type, filtering,
> etc. Returning a Publisher<ByteBuffer> is a reactive/nio alternative
> to javax.ws.rs.core.StreamingOutput, consuming an entity using
> Publisher<ByteBuffer> is a reactive/nio alternative to consuming
> entity as an InputStream.
>
> @POST
> @Path(*"/ex1"*)
> *public *Flow.Publisher<ByteBuffer> ex1(Flow.Publisher<ByteBuffer>
> entity) {
> Ex1Processor processor = *new *Ex1Processor();
> entity.subscribe(processor);
> *return *processor;
> }
> /// ex1 processor/
> *public static class *Ex1Processor *implements *Flow.Processor<ByteBuffer, ByteBuffer> {
> /// .../
> }
>
> And there is already an issue, which is not clearly solved.
>
> Returning a Publisher instance from the resource method does put some
> constraints on the Publisher itself - it needs to cache all events
> which will be emitted prior subscription of the jax-rs implementation
> Subscriber instance (which is the only way how to get the data from a
> Publisher).
>
> This can be solved by stating that *request* entity publisher won't
> produce any events until the resource method is invoked and the
> implementation Subscriber subscribed to the returned *response* entity
> publisher. Or the resource method can return
> Consumer<Flow.Subscriber<ByteBuffer>>, which would effectively grant
> control of the implementation subscription process. Any comments or
> suggestions welcomed *[ref Q1]*.
>
> *Server - EX2 (consuming stream of pojos):*
>
> The next example should be slightly more straightforward. It shows how
> to process a Publisher of custom type, in this case called "POJO".
>
> The only limitation or rule here would be that the subscriber for the
> request entity must be subscribed before the resource method "ex2"
> invocation ends.
>
> New interface is introduced here - NioBodyReader. It has exactly the
> same responsibility as good old MessageBodyReader, but without using a
> blocking OutputStream to write the entity. Note that the "core" type
> is the Publisher<ByteBuffer>, which is in this case mapped (or
> converted) into Publisher of POJOs.
>
> @POST
> @Path(*"/ex2"*)
> @Consumes(MediaType.*/APPLICATION_JSON/*)**
> *public void *ex2(Flow.Publisher<POJO> entity,
> @Suspended AsyncResponse response) {
> /// /*/TODO: introduce a helper or modify AsyncResponse to support this
> pattern directly?/*
> *//*entity.subscribe(
> /// POJO subscriber - consumer/
> //*new *Flow.Subscriber<POJO>() {
> @Override
> *public void *onSubscribe(Flow.Subscription subscription) {
> /// .../
> //}
> @Override
> *public void *onNext(POJO item) {
> /// .../
> //}
> @Override
> *public void *onError(Throwable throwable) {
> response.resume(throwable);
> }
> @Override
> *public void *onComplete() {
> response.resume(Response./ok/().build());
> }
> }
> );
> }
> @Provider
> @Consumes(MediaType.*/APPLICATION_JSON/*)
> *public static class *Ex2NioBodyReader *implements *NioBodyReader<POJO> {
> @Override
> *public boolean *isReadable(Class<?> type, Type genericType,
> Annotation[] annotations, MediaType mediaType) {
> *return true*;
> }
> @Override
> *public *Flow.Publisher<POJO> readFrom(Flow.Publisher<ByteBuffer> entity,
> Class<POJO> type,
> Type genericType,
> Annotation[] annotations,
> MediaType mediaType,
> MultivaluedMap<String,
> String> httpHeaders) {
> Ex2MappingProcessor mappingProcessor = *new *Ex2MappingProcessor();
> entity.subscribe(mappingProcessor);
> *return *mappingProcessor;
> }
> }
> /// mapping Publisher<ByteBuffer> to Publisher<POJO>/
> /// ByteBuffers are expected to contain JSON (indicated by @Consumes on
> the resource method and NioBodyReader)./
> *public static class *Ex2MappingProcessor *implements *Flow.Subscriber<ByteBuffer>, Flow.Publisher<POJO> {
> /// .../
> }
>
> Same issue as *[Q1]* is valid for this as well - same solution will
> need to be applied for "readFrom" method.
>
> Another issue is about what should be passed to "isReadable" method as
> "type" parameter. I'm not exactly sure whether we can safely obtain
> generic type of a parameter from the resource method (public void
> ex2(Flow.Publisher< *POJO*> entity, ..)). Any comments/suggestions
> welcomed *[ref Q2]*.
>
> Note that using @Suspended shouldn't be enforced here; it should be
> possible to return a Response directly and still be able to consume
> the *request* entity.
>
> *Server - EX3 (producing list of POJOs):*
>
> The last (for now) example shows how we can produce and write POJOs.
> Resource method doesn't take any parameters and provides a Publisher
> of POJO objects, which will be converted to JSON in NioBodyWriter.
> NioBodyReader is a reactive alternative to MessageBodyReader from
> older version of the specification.
>
> @GET
> @Path(*"/ex3"*)
> @Produces(MediaType.*/APPLICATION_JSON/*)//
> *public *Flow.Publisher<POJO> ex3() {
> Flow.Publisher<POJO> pojoPublisher = *null*;
> /// source of the POJO "stream" can be anything - database call, client
> call to/
> / // another service, .../
> / ///
> / // DB/
> / // .getEmployees(department) //
> StreamPublisher<EmployeeDbModel> -- reactive stream/
> / // .map((Function<EmployeeDbModel, EmployeeToReturn>)
> employeeDbModel -> {/
> / // // .../
> / // });/
> //
> //*return *pojoPublisher;
> }
> @Provider
> @Produces(MediaType.*/APPLICATION_JSON/*)
> *public static class *Ex3NioBodyWriter *implements *NioBodyWriter<POJO> {
> @Override
> *public boolean *isWriteable(Class<?> type, Type genericType,
> Annotation[] annotations, MediaType mediaType) {
> *return true*;
> }
> @Override
> *public void *writeTo(Flow.Publisher<POJO> entityObjectPublisher,
> Flow.Subscriber<ByteBuffer> subscriber,
> Class<?> type,
> Type genericType,
> Annotation[] annotations,
> MediaType mediaType,
> MultivaluedMap<String, Object> httpHeaders) {
> /// map Publisher<POJO> to Publisher<ByteBuffer> and subscribe
> Flow.Subscriber<ByteBuffer> to it./
> //}
> }
>
> Resource method is minimalistic, *[Q1]* applies here as well.
>
> The example introduces NioBodyWriter and its isWriteable method does
> have *[Q2]*, similarly to NioBodyReader. #writeTo doesn't have any
> issues - *[Q1]* is mitigated there because the implementation passes a
> supplier to the implementation - there doesn't need to be anything
> returned. Something similar might be able to do for NioBodyWriter as well.
>
> Comment to writing multiple POJO instances:
> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/jaxrs-api/src/main/java/javax/ws/rs/ext/NioBodyWriter.java#L82
> <https://github.com/pavelbucek/jax-rs/blob/8304c658c923b520ed34201b1f1e6741660daa0b/jaxrs-api/src/main/java/javax/ws/rs/ext/NioBodyWriter.java#L82>
>
> Important point to mention is that even when producing multiple
> instances, the intention here is still to return the single HTTP response.
>
> ===
>
> We have more, but this email is already too long - I will post more
> after there is some feedback on the presented concepts and issues.
> Please let us know if this format is OK or if you'd prefer something
> else - I guess I could do a screencast, hangout or something similar.
>
> Source links:
>
> - complete server example:
> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioResource.java
> <https://github.com/pavelbucek/jax-rs/blob/8304c658c923b520ed34201b1f1e6741660daa0b/examples/src/main/java/jaxrs/examples/nio/NioResource.java>
> - client (to be discussed):
> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/NioClient.java
> <https://github.com/pavelbucek/jax-rs/blob/8304c658c923b520ed34201b1f1e6741660daa0b/examples/src/main/java/jaxrs/examples/nio/NioClient.java>-
> server side processing (including interceptors):
> https://github.com/pavelbucek/jax-rs/blob/bfc5b3d6caecab2f6304f92ac7b44a7ad6a5fdff/examples/src/main/java/jaxrs/examples/nio/ServerSideProcessing.java
>
> (Not including direct link to individual examples, since we will
> continue working on them...)
>
> Looking forward to your feedback!
>
> Thanks and regards,
> Pavel & Santiago
>