users@jax-rs-spec.java.net

[jax-rs-spec users] Re: Non blocking IO: Publishers and Subscribers

From: Pavel Bucek <pavel.bucek_at_oracle.com>
Date: Fri, 24 Mar 2017 12:52:08 +0100

That's useful, thanks.

In jax-rs resources, there is (and most likely won't be) a requirement
that the publisher must be used as you described; I consider following
snippet as valid example:

@POST @Path("/ex2")
@Consumes(MediaType.APPLICATION_JSON)
public void ex2(Flow.Source<POJO> entity,
                 @Suspended AsyncResponse response) {

entity.subscribe(
             // POJO subscriber - consumer new Flow.Sink<POJO>() {
                 @Override public void onSubscribe(Flow.Subscription subscription) {
                     // ... }

                 @Override public void onNext(POJO item) {
                     // ... }

                 @Override public void onError(Throwable throwable) {
                     response.resume(throwable);
                 }

                 @Override public void onComplete() {
                     response.resume(Response.ok().build());
                 }
             }
     );
}

Anyway, that does not conflict with what you've described. We just need
to be ready to handle both cases.

Thanks for validating the statement about subscribers - we need to get
rid of then being exposed as an API.

Best regards,
Pavel


On 24/03/2017 11:48, mpaluch_at_paluch.biz wrote:
> There are two issues with the code below:
>
> 1. In a reactive flow, the user code never triggers subscription.
> That’s a responsibility of the resource management component
> (or simply the caller of the ex(Flow.Source) method).
> 2. A Publisher does not start doing anything before subscription
> and it does not start emitting before the subscriber requests
> data (Subscription.request(n)).
> Background: The Publisher may buffer incoming data before data
> is actually requested but that's an impl detail of a Publisher.
>
> A fixed version would look like:
> (I used Flow.Publisher to stick with Java 9's names)
>
> @POST @Path("/ex2")
> @Consumes(MediaType.APPLICATION_JSON)
> public Flow.Publisher<AnotherPOJO> ex(Flow.Publisher<POJO> entity) {
>
> Flow.Publisher<AnotherPOJO> result = …; // something like
> // result = Flux.fromPublisher(entity).map(it -> new
> AnotherEntity(it));
> // result = Flowable.fromPublisher(entity).map(it ->
> new AnotherEntity(it));
>
> return result;
> }
>
> From a framework API perspective, you work only with Publisher's.
> Everything else (Processor) is up to what users do in their code
> or what the implementation does.
>
> Cheers,
> Mark