Hello Sergey,
ad synchronization / thread safety: SseBroadcaster must be thread safe
anyway. The field I put into my resource is final, so no synchronization
is necessary in that case.
ad param injection: the case you mentioned is indeed valid and doesn't
provide maybe expected behavior, but consider something like:
private AtomicReference<SseBroadcaster>broadcasterReference =new AtomicReference<>();
@GET @Path("eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public SseEventOutput eventStream(@Context SseContext sseContext) {
SseEventOutput sseEventOutput = sseContext.newOutput();
SseBroadcaster broadcaster = getBroadcaster(sseContext::newBroadcaster);
broadcaster.subscribe(sseEventOutput);
broadcaster.broadcast(sseContext.newDataEvent("new event stream created!"));
return sseEventOutput;
}
private SseBroadcaster getBroadcaster(Supplier<SseBroadcaster> supplier) {
SseBroadcaster result =broadcasterReference.get();
if (result ==null) {
result = supplier.get();
if (!broadcasterReference.compareAndSet(null, result)) {
return broadcasterReference.get();
}
}
return result;
}
I know it might seem little artificial, but I hope it conveys the
message that creating the broadcaster from context which is received as
a method parameter might be valid option, so forbidding that explicitly
would fix the usecase you mentioned, but disallow usecase above. (Also,
there is no way how to get a information about how the SseContext
instance was retrieved - there is no
"SseContext#canProvideBroadcaster()" method).
(note to a random reader - AtomicReference could really use
computeIfAbsent...)
Did this convince you? :)
Best regards, Pavel
On 08/02/2017 15:47, Sergey Beryozkin wrote:
> Hi Pavel Thanks for the fast feedback, SSEContext name is fine IMHO,
> nearly everything it creates is specific to a given client SSE
> connection, so I'd keep the name, the only thing that it creates and
> is not specific to a current SSEContext is SSEBroadcaster. The code
> you typed below is fine, which is what I was implying. The following
> code though looks strange: @GET public Response(@Context SSEContext
> sseContext) { SSEBroadcaster broadcaster =
> sseContext.newBroadcaster()
> broadcaster.subscribe(sseContext.newOutput()); } It is a valid JAXRS
> SSE code but it does not make sense given that the broadcaster is a
> method local property which indeed defeats the idea of the broadcaster
> and if it is made a class level property then it will have to be
> synchronized ? Thus the code you typed below is OK, but the code I
> typed above is not hence I suggested to make it invalid by having
> sseContext.newBroadcaster() throwing ISE if this method is called from
> a SSEContext injected as a method parameter (same as for example in
> your code below you'd try to subscribe a sseContext.newOutput to the
> broadcaster during the property injection time) Or may be introduce
> SSEBroadcaster context Thanks, Sergey On 08/02/17 13:37, Pavel Bucek
> wrote:
>>
>> Hi Sergey,
>>
>> thanks for digging in your email archive and bringing this up.
>>
>> I just tried to write minimal SseBroadcaster usecase and this was the
>> result:
>>
>> @Singleton @Path("/") public class SseResource { private final
>> SseContext sseContext; private final SseBroadcaster sseBroadcaster;
>> @Inject public SseResource(SseContext sseContext) { this.sseContext =
>> sseContext; this.sseBroadcaster = sseContext.newBroadcaster(); } @GET
>> @Path("eventStream") @Produces(MediaType.SERVER_SENT_EVENTS) public
>> SseEventOutput eventStream() { final SseEventOutput eventOutput =
>> sseContext.newOutput(); sseBroadcaster.subscribe(eventOutput); return
>> eventOutput; } public void onEvent(String event) {
>> sseBroadcaster.broadcast(sseContext.newDataEvent(event)); } }
>> I see what You and Andriy mean about broadcaster not being really
>> contextual, but .. I believe that's valid for all methods. Let's
>> review SseContext:
>> public interface SseContext { SseEventOutput
>> newOutput();OutboundSseEvent.Builder newEvent();SseBroadcaster
>> newBroadcaster(); // this is not pushed to the public repo yet :)
>> default OutboundSseEvent newDataEvent(String data) {}default
>> OutboundSseEvent newDataEvent(String data, String name) {}}
>> It's not really a context, it feels more like a factory. I don't see
>> anything related to current context - thread/request/application. I
>> can imagine an app (war) which would create new outbout event and
>> pass it to another war. And it would still work. Also, the naming of
>> the methods (new*) suggests that the "SseContext" always produces new
>> instances, it doesn't do a lookup in thread local or any other store,
>> it just creates instances. Would rename to SseFactory (or any better
>> name), help with the issue raised by Andriy? TBH, it doesn't feel
>> right to limit the API in a way you've proposed - a method with
>> SseContext as a parameter would work / not work based on from where
>> it was called. I'm not saying there are no schemes like that already,
>> but here it feels more artificial and doesn't really help (the error
>> is still visible in the runtime, not before). Also, I belive the sole
>> purpose of having SseContext is to not have builders for
>> OutboundEvent, Broadcaster and EventOutput. We might even consider
>> removing SseContext and introduce those builders, but it would make
>> the connection from the API to the implementation slightly more
>> difficult. Or we can improve that, make it a "broadcaster store" -
>> something like "SseContext#getOrCreateBroadcaster(String Id)"..
>> (that's just me thinking out loud :). What do you think? Thanks and
>> regards, Pavel
>> On 08/02/2017 12:47, Sergey Beryozkin wrote:
>>> Hi Pavel Can you review the thread text below please (see a note
>>> from Marek). SSEContexts can be passed as method parameters, so when
>>> one creates a broadcaster it gives the impression the broadcaster is
>>> the current SSEContext specific Would it make sense to enforce that
>>> SSEBroadcaster can only be created if SSEContext is injected as a
>>> thread local proxy as opposed to a case where the broadcaster is
>>> created from SSEContext passed as a parameter
>>> (SSEContext.newBroadcaster() would throw the IllegalStateException
>>> is this case) ?. Thanks, Sergey On 15/08/16 12:58, Sergey Beryozkin
>>> wrote:
>>>> Hi Marek, Andriy, All FYI I tried to 'justify' the current design
>>>> by the spec leads by the requirement to have an implementation
>>>> specific SSEBroadcaster introduced without having to introduce an
>>>> SSEBroadcaster Context - the latter would make it possible to pass
>>>> SSEBroadcaster as a method parameter which might not be ideal. I
>>>> agree with the Andriy's observation though that it does look like
>>>> it is a current SSEContext/event stream dependent if SSEContext is
>>>> passed as a method parameter. Perhaps a neater way to work with
>>>> broadcasters can be found :-) Sergey On 15/08/16 08:49, Marek
>>>> Potociar wrote:
>>>>> Hi Andriy,
>>>>> This is a good observation - thank you for spotting this. I’ll
>>>>> give it some thought and come up with a fix proposal.
>>>>> Cheers,
>>>>> Marek
>>>>>> On 14 Aug 2016, at 04:36, Andriy Redko <drreta_at_gmail.com
>>>>>> <mailto:drreta_at_gmail.com>> wrote:
>>>>>> Early feedback about JAX-RS 2.1 / SSE specification
>>>>>> Hey JAX-RS leads, As some of you may already know, the Apache CXF
>>>>>> project is one of the early adopters of the JAX-RS 2.1
>>>>>> specification. In particular, we are currently actively working
>>>>>> on implementing SSE support, based on drafted JAX-RS 2.1 API.
>>>>>> Along the way we stumbled upon some design decisions which may
>>>>>> require a bit of clarification or (hopefuly) a second thought.
>>>>>> The use case in question is related to SseBroadcaster.
>>>>>> Essentially, the idea sounds pretty simple: events into multiple
>>>>>> SSE streams could be easily broadcasted. F.e. here is a
>>>>>> pseudo-code for typical scenario: SseBroadcaster broadcaster =
>>>>>> new SseBroadcasterImlp(); broadcaster.register(stream1);
>>>>>> broadcaster.register(stream2); ... broadcaster.broadcast(event1);
>>>>>> broadcaster.broadcast(event2); In nutshell, current Jersey's SSE
>>>>>> implementation does exactly that. However, in JAX-RS 2.1 API
>>>>>> draft SseBroadcaster could be created only from SseContext, f.e.:
>>>>>> Response stream(@Context SseContext context) { SseBroadcaster
>>>>>> broadcaster = context.newBroadcaster();
>>>>>> broadcaster.register(context.newOutput()); ... } It kind of
>>>>>> works but totally defeats the idea of broadcaster as being
>>>>>> independent from SseContext or/and streams it serves. Currently,
>>>>>> every time new instance of the SseBroadcaster will be created,
>>>>>> with no straightforward way to share it across multiple SSE
>>>>>> streams. How it could work though may look like that (the
>>>>>> implementation details could vary, just trying to present the
>>>>>> idea): private SseBroadcaster broadcaster =
>>>>>> SseBroadcaster.newBroadcaster(); Response stream1(@Context
>>>>>> SseContext context) {
>>>>>> broadcaster.register(context.newOutput()); ... } Response
>>>>>> stream2(@Context SseContext context) {
>>>>>> broadcaster.register(context.newOutput()); ... } In this case,
>>>>>> the SseBroadcaster is truly context-independent, and serves it
>>>>>> purpose very well. We were trying to come up with some realistic
>>>>>> scenarios for SseBroadcaster utilizing the current specification
>>>>>> draft but did not really succeeded. Surely, there are a few way
>>>>>> to make it work but they do not look good. Does it make sense?
>>>>>> Whould you mind guys to give some clarifications/guidance on
>>>>>> that? Thank you a lot! Best Regards, Andriy Redko
>