>
>> the problem is that people must remember to have a threadsafe boolean, ie
>> volatile.
>>
>
> I would think we can add an interface that extends the current CometHandler
> instead of a base class. But I do agree it will makes the things
> complicated.
>
and what should we do with the old comethandler implementations then ?.
users must remember to synchronize their onevent etc.
so people who want concurrent IO should need to remember to implement the
non trivial logic themself or to copy paste from our class ?
is it icefaces and their demands for backward compability at any price again
?.
When using executor, one slow client/or large IO can if the onEvent is
synchronized block ALL worker threads for ALL cometcontexts.
Its better to not use executors at all then having a design that allows for
that.
Hence there is need of queueing events not only in the executor but per
comethandler.
That internal queue and isactive is now done in comethandler , thats the
fastest way, only a volatile boolean access =).
i dont like the idea to break the interface design and have to use
baseclass. problem is i fail to se viable options.
please help me out on that.
one way to do it and still keep a pure ineterface design is to have a
wrapper object instead of baseclass, then when the comethander is to be
added to the cometcontext, we wrap it .... thats ugly , and can cause
problems when user does want to get a comethandler back from the
cometcontext, should we have 2x access methods then, one wrapped and one
unwrapped.... thats not nice, but its an option.
please give feedback on both my code and reasoning.
here is my current comethandler. still needs some testing.
/**
* only 1 thread at a time is allowed to write data to the outputstream,
* other threads put messages in the messageQueue and return.
* the enqueueEvent method calls the onEVent method in unsynchronized
context.
* if more then messageQueueLimit messages are buffered onQueueFull is
called,
* and we terminate
*/
public abstract class CometHandler<E>{
protected final static Logger logger = Controller.logger();
/**
* true if this {_at_link CometHandler} is still active, e.g. there is
* still a suspended connection associated with it.
*/
private volatile boolean isactive = true;
/**
* used for preventing othe worker threads from the executor event
queue from adding events
* to the comethandlers local queue or starting IO logic after
shuttdown.
*
* defaultnotificationhandler sets shuttingdown = true for onterminate
.
* this way we dont need subclasses to remember to do super calls in
the onXX methods
* todo: CometEvent.INTERRUPT should do cometHandler.shuttingdown =
true; ?
*/
protected volatile boolean shuttingdown;
private int messageQueueLimit = 100;
/**
* true means that no thread is currently active on this comethandlers
queue logic
*/
private boolean isreadyforwork = true;
/**
* todo replace with non array copying list, using internal index to
keep track of state
*/
protected final List<CometEvent> messageQueue = new ArrayList(4);
/**
*
*/
protected E attachment;
/**
*
*/
public CometHandler() {
}
/**
*
* @param messageQueueLimit
*/
public CometHandler(int messageQueueLimit){
this.messageQueueLimit = messageQueueLimit;
}
/**
* queues events, caller returns if another thread is allready
* working on the queue. onEvent method performs the actual IO.
* @param event
*/
public void EnQueueEvent(CometEvent event){
if (shuttingdown)
return;
synchronized(messageQueue){
if (!isreadyforwork){
messageQueue.add(event);
return;
}
isreadyforwork = false;
}
boolean queuefull = false;
while(event != null){
if (!isactive){
shuttingdown = true;
return; //todo is this the desired logic ?
}
onEvent(event);
event = null;
synchronized(messageQueue){
int size = messageQueue.size();
if (size == messageQueueLimit){
queuefull = true;
}else
if (size == 0){
isreadyforwork = true;
}else
event = messageQueue.remove(0);
}
if (queuefull){
shuttingdown = true;
onQueueFull(event); //todo is onqeuefull needed ? or just
terminate, it would simplify to just terminate =)
}
}
}
/**
* comethandler's local event queue is full.
* called in unsynchronized context, not blocking other worker threads.
* default impl. removes the comethandler from its cometcontext
*/
protected void onQueueFull(CometEvent event){
isactive = false;
event.getCometContext().resumeCometHandler(this, true);
}
/**
* Return true if this {_at_link CometHandler} is still active, e.g. there
is
* still a suspended connection associated with it.
*
* @return true if active, false if not.
*/
public final boolean isActive() {
return isactive;
}
/**
*
* @param isactive
*/
public final void setActive(boolean isactive) {
this.isactive = isactive;
}
/**
*
* @param attachment
*/
public void attach(E attachment) {
this.attachment = attachment;
}
/**
* implement the IO here.
* this method runs in unsynchronized context,
* that is to ensure that IO does not block other worker threads.
* @param event
*/
public abstract void onEvent(CometEvent event);
public abstract void onInitialize(CometEvent event) throws IOException;
public abstract void onInterrupt(CometEvent event) throws IOException;
public abstract void onTerminate(CometEvent event) throws IOException;
}
heres the notify0 from defaultnotificationhandler , it calls the
enqueueevent method and sets the cometHandler.shuttingdown etc.
protected void notify0(CometEvent cometEvent,CometHandler cometHandler) {
try{
switch (cometEvent.getType()) {
case CometEvent.INTERRUPT:
//todo cometHandler.shuttingdown = true; ?
cometHandler.onInterrupt(cometEvent);
break;
case CometEvent.NOTIFY:
cometHandler.EnQueueEvent(cometEvent);
break;
case CometEvent.READ:
cometHandler.EnQueueEvent(cometEvent);
break;
case CometEvent.WRITE:
cometHandler.EnQueueEvent(cometEvent);
break;
case CometEvent.INITIALIZE:
cometHandler.onInitialize(cometEvent);
break;
case CometEvent.TERMINATE:
cometHandler.shuttingdown = true;
cometHandler.onTerminate(cometEvent);
break;
default:
throw ISEempty;
}
} catch (Throwable ex) {
try {
cometEvent.getCometContext().resumeCometHandler(cometHandler, true);
} catch (Throwable t) {
logger.log(Level.FINE, "Resume phase failed: ", t);
}
logger.log(Level.WARNING, "Notification failed: ", ex);
}
}
regards
gustav trede