Salut,
gustav trede wrote:
>
>
> the problem is that people must remember to have a threadsafe
> boolean, ie volatile.
>
>
> I would think we can add an interface that extends the current
> CometHandler instead of a base class. But I do agree it will makes
> the things complicated.
>
>
> and what should we do with the old comethandler implementations then ?.
> users must remember to synchronize their onevent etc.
> so people who want concurrent IO should need to remember to implement
> the non trivial logic themself or to copy paste from our class ?
> is it icefaces and their demands for backward compability at any price
> again ?.
Not only ICEFaces. All Grizzly Comet users :-)
>
> When using executor, one slow client/or large IO can if the onEvent is
> synchronized block ALL worker threads for ALL cometcontexts.
> Its better to not use executors at all then having a design that allows
> for that.
Agree it might be problematic...but note that we have now support for
asynchronous http write in 1.9.2, which means the thread will never
block (the calling thread). The Async Write Queue mechanism will handle
the slow client.
>
> Hence there is need of queueing events not only in the executor but
> per comethandler.
> That internal queue and isactive is now done in comethandler , thats
> the fastest way, only a volatile boolean access =).
>
> i dont like the idea to break the interface design and have to use
> baseclass. problem is i fail to se viable options.
> please help me out on that.
Let me look at what you committed so far...
>
> one way to do it and still keep a pure ineterface design is to have a
> wrapper object instead of baseclass, then when the comethander is to
> be added to the cometcontext, we wrap it .... thats ugly , and can cause
> problems when user does want to get a comethandler back from the
> cometcontext, should we have 2x access methods then, one wrapped and one
> unwrapped.... thats not nice, but its an option.
Right, but the price is cheap :-) It is better than breaking
compatibility IMO.
>
> please give feedback on both my code and reasoning.
>
> here is my current comethandler. still needs some testing.
The idea I would like to pursue is to add your CometHandler impl and
still continue supporting the pure interface. I"m pretty sure people wil
l like a ready to use implementation (like the current
ReflectorCometHandler). If we can reduce the number of lines peoples
needs to write, that;s perfect :-)
>
>
> /**
> * only 1 thread at a time is allowed to write data to the outputstream,
> * other threads put messages in the messageQueue and return.
> * the enqueueEvent method calls the onEVent method in unsynchronized
> context.
> * if more then messageQueueLimit messages are buffered onQueueFull is
> called,
> * and we terminate
> */
> public abstract class CometHandler<E>{
Why not doing
public abstract class OptimizedCometHandler<E> implements CometHandler<E>
>
> protected final static Logger logger = Controller.logger();
>
> /**
> * true if this {_at_link CometHandler} is still active, e.g. there is
> * still a suspended connection associated with it.
> */
> private volatile boolean isactive = true;
>
> /**
> * used for preventing othe worker threads from the executor event
> queue from adding events
> * to the comethandlers local queue or starting IO logic after
> shuttdown.
> *
> * defaultnotificationhandler sets shuttingdown = true for
> onterminate .
> * this way we dont need subclasses to remember to do super calls
> in the onXX methods
> * todo: CometEvent.INTERRUPT should do cometHandler.shuttingdown =
> true; ?
> */
> protected volatile boolean shuttingdown;
>
> private int messageQueueLimit = 100;
>
> /**
> * true means that no thread is currently active on this
> comethandlers queue logic
> */
> private boolean isreadyforwork = true;
>
> /**
> * todo replace with non array copying list, using internal index
> to keep track of state
> */
> protected final List<CometEvent> messageQueue = new ArrayList(4);
>
> /**
> *
> */
> protected E attachment;
>
> /**
> *
> */
> public CometHandler() {
> }
>
> /**
> *
> * @param messageQueueLimit
> */
> public CometHandler(int messageQueueLimit){
> this.messageQueueLimit = messageQueueLimit;
> }
>
> /**
> * queues events, caller returns if another thread is allready
> * working on the queue. onEvent method performs the actual IO.
> * @param event
> */
> public void EnQueueEvent(CometEvent event){
> if (shuttingdown)
> return;
> synchronized(messageQueue){
> if (!isreadyforwork){
> messageQueue.add(event);
> return;
> }
> isreadyforwork = false;
> }
> boolean queuefull = false;
> while(event != null){
> if (!isactive){
> shuttingdown = true;
> return; //todo is this the desired logic ?
What are you proposing instead?
> }
> onEvent(event);
> event = null;
> synchronized(messageQueue){
> int size = messageQueue.size();
> if (size == messageQueueLimit){
> queuefull = true;
> }else
> if (size == 0){
> isreadyforwork = true;
> }else
> event = messageQueue.remove(0);
> }
> if (queuefull){
> shuttingdown = true;
> onQueueFull(event); //todo is onqeuefull needed ? or
> just terminate, it would simplify to just terminate =)
Here we resume the connection, but we don't allow people to recover
(write some content befor being resumed. I guess the onTerminate should
be invoked here.
> }
> }
> }
>
> /**
> * comethandler's local event queue is full.
> * called in unsynchronized context, not blocking other worker threads.
> * default impl. removes the comethandler from its cometcontext
> */
> protected void onQueueFull(CometEvent event){
> isactive = false;
> event.getCometContext().resumeCometHandler(this, true);
> }
>
> /**
> * Return true if this {_at_link CometHandler} is still active, e.g.
> there is
> * still a suspended connection associated with it.
> *
> * @return true if active, false if not.
> */
> public final boolean isActive() {
> return isactive;
> }
>
> /**
> *
> * @param isactive
> */
> public final void setActive(boolean isactive) {
> this.isactive = isactive;
> }
>
> /**
> *
> * @param attachment
> */
> public void attach(E attachment) {
> this.attachment = attachment;
> }
>
> /**
> * implement the IO here.
> * this method runs in unsynchronized context,
> * that is to ensure that IO does not block other worker threads.
> * @param event
> */
> public abstract void onEvent(CometEvent event);
>
> public abstract void onInitialize(CometEvent event) throws IOException;
>
> public abstract void onInterrupt(CometEvent event) throws IOException;
>
> public abstract void onTerminate(CometEvent event) throws IOException;
>
> }
>
>
>
> heres the notify0 from defaultnotificationhandler , it calls the
> enqueueevent method and sets the cometHandler.shuttingdown etc.
>
>
> protected void notify0(CometEvent cometEvent,CometHandler cometHandler) {
> try{
> switch (cometEvent.getType()) {
> case CometEvent.INTERRUPT:
> //todo cometHandler.shuttingdown = true; ?
> cometHandler.onInterrupt(cometEvent);
> break;
> case CometEvent.NOTIFY:
> cometHandler.EnQueueEvent(cometEvent);
> break;
> case CometEvent.READ:
> cometHandler.EnQueueEvent(cometEvent);
> break;
> case CometEvent.WRITE:
> cometHandler.EnQueueEvent(cometEvent);
> break;
> case CometEvent.INITIALIZE:
> cometHandler.onInitialize(cometEvent);
> break;
> case CometEvent.TERMINATE:
> cometHandler.shuttingdown = true;
> cometHandler.onTerminate(cometEvent);
> break;
> default:
> throw ISEempty;
> }
> } catch (Throwable ex) {
> try {
>
> cometEvent.getCometContext().resumeCometHandler(cometHandler, true);
> } catch (Throwable t) {
> logger.log(Level.FINE, "Resume phase failed: ", t);
> }
> logger.log(Level.WARNING, "Notification failed: ", ex);
> }
> }
That looks very promising. If we can continue supporting current users
and add your proposal, that would be perfect!
A+
-- Jeanfrancois
>
> regards
> gustav trede