Property changes on: . ___________________________________________________________________ Name: svn:ignore + .BayeuxCometHandler.java.swp .CometdNotificationHandler.java.swp .DataHandler.java.swp .BayeuxCometHandler.java.swo .BayeuxCometHandler.java.orig.swp Index: CometdNotificationHandler.java =================================================================== --- CometdNotificationHandler.java (revision 728) +++ CometdNotificationHandler.java (working copy) @@ -39,7 +39,7 @@ import com.sun.grizzly.comet.CometEvent; import com.sun.grizzly.comet.CometHandler; import com.sun.grizzly.comet.DefaultNotificationHandler; -import com.sun.grizzly.cometd.bayeux.Data; +import com.sun.grizzly.cometd.bayeux.DeliverResponse; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -64,8 +64,8 @@ String activeChannel = ""; String channel = ""; - if (o instanceof Data){ - activeChannel = ((Data)o).getChannel(); + if (o instanceof DeliverResponse){ + activeChannel = ((DeliverResponse)o).getChannel(); } else if (o instanceof DataHandler){ activeChannel = ((DataHandler)o).getChannel(); } Index: BayeuxCometHandler.java =================================================================== --- BayeuxCometHandler.java (revision 728) +++ BayeuxCometHandler.java (working copy) @@ -42,10 +42,13 @@ import com.sun.grizzly.cometd.bayeux.ConnectResponse; import com.sun.grizzly.cometd.bayeux.Advice; import com.sun.grizzly.cometd.bayeux.Data; +import com.sun.grizzly.cometd.bayeux.DeliverResponse; import com.sun.grizzly.cometd.bayeux.DisconnectRequest; import com.sun.grizzly.cometd.bayeux.DisconnectResponse; import com.sun.grizzly.cometd.bayeux.HandshakeRequest; import com.sun.grizzly.cometd.bayeux.HandshakeResponse; +import com.sun.grizzly.cometd.bayeux.PublishRequest; +import com.sun.grizzly.cometd.bayeux.PublishResponse; import com.sun.grizzly.cometd.bayeux.ReconnectRequest; import com.sun.grizzly.cometd.bayeux.ReconnectResponse; import com.sun.grizzly.cometd.bayeux.SubscribeRequest; @@ -195,12 +198,6 @@ if (!subscribeReq.isValid()) { subscribeRes.setError("501::invalid subscribe"); } - Data data = subscribeReq.getData(); - if (data != null){ - subscribeRes.setData(data); - subscribeReq.setDataId(data.getId()); - event.getCometContext().notify(data); - } activeChannels.put(subscribeReq.getClientId(),subscribeReq.getSubscription()); res.setContentType(DEFAULT_CONTENT_TYPE); @@ -220,13 +217,6 @@ if (!unsubscribeReq.isValid()) { unsubscribeRes.setError("501::invalid unsubscribe"); } - Data data = unsubscribeReq.getData(); - if (data != null){ - unsubscribeRes.setData(data); - unsubscribeRes.setDataId(data.getId()); - activeChannels.remove(unsubscribeReq.getClientId()); - event.getCometContext().notify(data); - } res.setContentType(DEFAULT_CONTENT_TYPE); res.write(unsubscribeRes.toJSON()); res.flush(); @@ -234,18 +224,45 @@ @SuppressWarnings("unchecked") - public void onData(CometEvent event) throws IOException { + public void onPublish(CometEvent event) throws IOException { CometdContext cometdContext = (CometdContext)event.attachment(); CometdRequest req = cometdContext.getRequest(); CometdResponse res = cometdContext.getResponse(); - Data data = (Data)cometdContext.getVerb(); + PublishRequest publishReq = (PublishRequest)cometdContext.getVerb(); + PublishResponse publishRes = new PublishResponse(publishReq); + DeliverResponse deliverRes = null; + if (publishReq.isValid()) { + publishRes.setSuccessful(true); + Data data = publishReq.getData(); + if (data != null) { + deliverRes = new DeliverResponse(publishReq); + if (publishReq.isFirst()) { + deliverRes.setFirst(false); + deliverRes.setFollow(true); + } + if (publishReq.isLast()) { + publishRes.setLast(false); + deliverRes.setFollow(true); + deliverRes.setLast(true); + } + } + } else { + publishRes.setSuccessful(false); + publishRes.setError("501:: invalid publish"); + } + res.setContentType(DEFAULT_CONTENT_TYPE); - res.write(data.toJSON()); + res.write(publishRes.toJSON()); + if (deliverRes != null) { + res.write(deliverRes.toJSON()); + } res.flush(); - event.getCometContext().notify(data); + if (deliverRes != null) { + event.getCometContext().notify(deliverRes); + } } Index: EventRouterImpl.java =================================================================== --- EventRouterImpl.java (revision 728) +++ EventRouterImpl.java (working copy) @@ -42,6 +42,7 @@ import com.sun.grizzly.cometd.bayeux.VerbUtils; import com.sun.grizzly.cometd.util.JSONParser; import java.io.IOException; +import java.util.List; /** * From the Spec, an EventRouter is: @@ -88,13 +89,15 @@ String[] messages = req.getParameterValues(JSON_MESSAGES); if (messages != null && messages.length > 0){ for(String message: messages){ - final Verb verb = VerbUtils.parse(JSONParser.parse(message)); + List verbs = VerbUtils.parseRequest(JSONParser.parse(message)); // Notify our listener; - getCometContext().notify(BayeuxCometHandler.newCometdContext + for (Verb verb : verbs) { + getCometContext().notify(BayeuxCometHandler.newCometdContext (req,res,verb),CometEvent.NOTIFY, (Integer)cometContext.getAttribute( - BayeuxCometHandler.BAYEUX_COMET_HANDLER)); + BayeuxCometHandler.BAYEUX_COMET_HANDLER)); + } } } } Index: DataHandler.java =================================================================== --- DataHandler.java (revision 728) +++ DataHandler.java (working copy) @@ -40,11 +40,12 @@ import com.sun.grizzly.comet.CometEvent; import com.sun.grizzly.comet.CometHandler; import com.sun.grizzly.comet.CometInputStream; -import com.sun.grizzly.cometd.bayeux.Data; +import com.sun.grizzly.cometd.bayeux.DeliverResponse; import com.sun.grizzly.cometd.bayeux.Verb; import com.sun.grizzly.cometd.bayeux.VerbUtils; import com.sun.grizzly.cometd.util.JSONParser; import java.io.IOException; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -77,14 +78,12 @@ public void onEvent(CometEvent event) throws IOException{ Object obj = event.attachment(); try{ - if (obj instanceof Data){ - Data data = (Data)obj; - if (data.getClientId().equals(getClientId())){ + if (obj instanceof DeliverResponse){ + DeliverResponse deliverRes = (DeliverResponse)obj; + if (deliverRes.getClientId().equals(getClientId())){ return; } - - String sdata = "," + data.toData() + "]*/"; - res.write(sdata); + res.write(deliverRes.toJSON()); res.flush(); event.getCometContext().resumeCometHandler(this); } @@ -105,15 +104,17 @@ if (sdata.length() <=1) return; try{ - final Verb verb = - VerbUtils.parse(JSONParser.parse(sdata)); + List verbs = + VerbUtils.parseRequest(JSONParser.parse(sdata)); // Notify our listener; CometContext cometContext = event.getCometContext(); - cometContext.notify( + for (Verb verb : verbs) { + cometContext.notify( BayeuxCometHandler.newCometdContext(req,res,verb), CometEvent.NOTIFY, (Integer)cometContext.getAttribute( BayeuxCometHandler.BAYEUX_COMET_HANDLER)); + } event.getCometContext().removeAttribute(this); } catch (Throwable t){ logger.log(Level.SEVERE,"Data.onEvent",t); @@ -149,4 +150,4 @@ public void setClientId(String clientId) { this.clientId = clientId; } -} \ No newline at end of file +} Index: CometdHandler.java =================================================================== --- CometdHandler.java (revision 728) +++ CometdHandler.java (working copy) @@ -65,7 +65,7 @@ public void onUnsubscribe(CometEvent event) throws IOException; - public void onData(CometEvent event) throws IOException; + public void onPublish(CometEvent event) throws IOException; public abstract String getChannel(); Index: bayeux/ReconnectResponse.java =================================================================== --- bayeux/ReconnectResponse.java (revision 728) +++ bayeux/ReconnectResponse.java (working copy) @@ -36,11 +36,14 @@ package com.sun.grizzly.cometd.bayeux; +import com.sun.grizzly.util.http.FastHttpDateFormat; + /** * @author Jeanfrancois Arcand * @author Shing Wai Chan */ public class ReconnectResponse extends Reconnect { + private String timestamp = FastHttpDateFormat.getCurrentDate(); public ReconnectResponse() { super(); @@ -51,6 +54,9 @@ channel = req.getChannel(); clientId = req.getClientId(); id = req.getId(); + first = req.isFirst(); + follow = req.isFollow(); + last = req.isLast(); } public Boolean getSuccessful() { @@ -68,4 +74,16 @@ public void setError(String error) { this.error = error; } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String toJSON() { + return toJSON(timestamp); + } } Index: bayeux/SubscribeResponse.java =================================================================== --- bayeux/SubscribeResponse.java (revision 728) +++ bayeux/SubscribeResponse.java (working copy) @@ -52,6 +52,9 @@ subscription = req.getSubscription(); clientId = req.getClientId(); id = req.getId(); + first = req.isFirst(); + follow = req.isFollow(); + last = req.isLast(); } public Boolean getSuccessful() { @@ -70,7 +73,11 @@ this.error = error; } + public boolean isValid() { + return hasValidAdvice() && super.isValid(); + } + public String toJSON() { - return getBody(true, false); + return getBody(true, null); } } Index: bayeux/Ext.java =================================================================== --- bayeux/Ext.java (revision 728) +++ bayeux/Ext.java (working copy) @@ -40,7 +40,7 @@ } - public Map getExtensionMap() { + public Map getExtensionMap() { return extensionMap; } @@ -48,8 +48,11 @@ public void setExtensionMap(Map extensionMap) { this.extensionMap = extensionMap; } + + public boolean isValid() { + return true; + } - public String toJSON() { StringBuffer buf = new StringBuffer(); Iterator iterator = extensionMap.keySet().iterator(); Index: bayeux/DisconnectRequest.java =================================================================== --- bayeux/DisconnectRequest.java (revision 728) +++ bayeux/DisconnectRequest.java (working copy) @@ -56,7 +56,7 @@ public String toJSON() { StringBuilder sb = new StringBuilder( - "/*[{" + getJSONPrefix() + "{" + "\"channel\":\"" + channel + "\"" + ",\"clientId\":\"" + clientId + "\"" ); @@ -66,7 +66,7 @@ if (id != null) { sb.append(",\"id\":\"" + id + "\""); } - sb.append("}]*/\n"); + sb.append("}" + getJSONPostfix()); return sb.toString(); } } Index: bayeux/ConnectRequest.java =================================================================== --- bayeux/ConnectRequest.java (revision 728) +++ bayeux/ConnectRequest.java (working copy) @@ -61,7 +61,7 @@ public String toJSON() { StringBuilder sb = new StringBuilder( - "/*[{" + getJSONPrefix() + "{" + "\"channel\":\"" + channel + "\"" + ",\"clientId\":\"" + clientId + "\"" + ",\"connectionType\":\"" + connectionType + "\"" @@ -72,7 +72,7 @@ if (id != null) { sb.append(",\"id\":\"" + id + "\""); } - sb.append("}]*/\n"); + sb.append("}" + getJSONPostfix()); return sb.toString(); } } Index: bayeux/UnsubscribeResponse.java =================================================================== --- bayeux/UnsubscribeResponse.java (revision 728) +++ bayeux/UnsubscribeResponse.java (working copy) @@ -36,11 +36,14 @@ package com.sun.grizzly.cometd.bayeux; +import com.sun.grizzly.util.http.FastHttpDateFormat; + /** * @author Jeanfrancois Arcand * @author Shing Wai Chan */ public class UnsubscribeResponse extends Unsubscribe { + private String timestamp = FastHttpDateFormat.getCurrentDate(); public UnsubscribeResponse() { super(); @@ -52,6 +55,9 @@ subscription = req.getSubscription(); clientId = req.getClientId(); id = req.getId(); + first = req.isFirst(); + follow = req.isFollow(); + last = req.isLast(); } public Boolean getSuccessful() { @@ -70,7 +76,19 @@ this.error = error; } + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public boolean isValid() { + return hasValidAdvice() && super.isValid(); + } + public String toJSON() { - return getBody(false, true); + return getBody(false, timestamp); } } Index: bayeux/HandshakeResponse.java =================================================================== --- bayeux/HandshakeResponse.java (revision 728) +++ bayeux/HandshakeResponse.java (working copy) @@ -51,6 +51,9 @@ super(); channel = req.getChannel(); id = req.getId(); + first = req.isFirst(); + follow = req.isFollow(); + last = req.isLast(); } public String getClientId() { @@ -87,7 +90,8 @@ @Override public boolean isValid() { - return (clientId != null || error != null) && super.isValid(); + return (clientId != null || error != null) && hasValidAdvice() && + super.isValid(); } public String toJSON() { Index: bayeux/ConnectResponse.java =================================================================== --- bayeux/ConnectResponse.java (revision 728) +++ bayeux/ConnectResponse.java (working copy) @@ -43,6 +43,7 @@ * @author Shing Wai Chan */ public class ConnectResponse extends Connect { + private String timestamp = FastHttpDateFormat.getCurrentDate(); public ConnectResponse() { super(); @@ -53,6 +54,9 @@ channel = req.getChannel(); clientId = req.getClientId(); id = req.getId(); + first = req.isFirst(); + follow = req.isFollow(); + last = req.isLast(); } public Boolean getSuccessful() { @@ -71,8 +75,20 @@ this.error = error; } + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public boolean isValid() { + return hasValidAdvice() && super.isValid(); + } + public String toJSON() { - return getBody().append("}]*/\n").toString(); + return getBody().append("}" + getJSONPostfix()).toString(); } public String toLongPolledJSON() { @@ -81,7 +97,7 @@ private StringBuilder getBody() { StringBuilder sb = new StringBuilder( - "/*[{" + getJSONPrefix() + "{" + "\"channel\":\"" + channel + "\"" + ",\"clientId\":\"" + clientId + "\"" + ",\"successful\":" + successful @@ -98,7 +114,7 @@ if (advice != null) { sb.append("," + advice.toJSON()); } - sb.append(",\"timestamp\":\"" + FastHttpDateFormat.getCurrentDate() + "\""); + sb.append(",\"timestamp\":\"" + getTimestamp() + "\""); return sb; } } Index: bayeux/DisconnectResponse.java =================================================================== --- bayeux/DisconnectResponse.java (revision 728) +++ bayeux/DisconnectResponse.java (working copy) @@ -52,6 +52,9 @@ channel = req.getChannel(); clientId = req.getClientId(); id = req.getId(); + first = req.isFirst(); + follow = req.isFollow(); + last = req.isLast(); } public Boolean getSuccessful() { @@ -72,7 +75,7 @@ public String toJSON() { StringBuilder sb = new StringBuilder( - "/*[{" + getJSONPrefix() + "{" + "\"channel\":\"" + channel + "\"" + ",\"clientId\":\"" + clientId + "\"" + ",\"successful\":\"" + successful + "\"" @@ -86,7 +89,7 @@ if (error != null) { sb.append(",\"error\":\"" + error + "\""); } - sb.append("}]*/\n"); + sb.append("}" + getJSONPostfix()); return sb.toString(); } } Index: bayeux/Data.java =================================================================== --- bayeux/Data.java (revision 728) +++ bayeux/Data.java (working copy) @@ -36,7 +36,7 @@ package com.sun.grizzly.cometd.bayeux; -import java.util.HashMap; +import java.util.Map; import java.util.Iterator; import java.util.Random; import com.sun.grizzly.util.http.FastHttpDateFormat; @@ -78,7 +78,7 @@ private Random random = new Random(); - private HashMap data; + private Map data; private String connectionId; @@ -90,32 +90,15 @@ type = Verb.Type.DATA; } - public String toData() { - return "{\"id\":\"" + id + "\"," - + "\"timestamp\":\"" + FastHttpDateFormat.getCurrentDate() + "\"," - + getJSONData() + "," - + "\"channel\":\"" + channel + "\"}"; - } - - public String toJSON(){ - return "/*[{" - + "\"successful\":" + successful + "," - + "\"channel\":\"" + channel + "\"," - + "\"timestamp\":\"" + FastHttpDateFormat.getCurrentDate() + "\"," - + "\"id\":\"" + id + "\"}," - + toData() - + "]*/\n" ; - } - - public HashMap getMapData() { + public Map getMapData() { return data; } - public void setMapData(HashMap data) { + public void setMapData(Map data) { this.data = data; } - public String getJSONData(){ + public String toJSON(){ StringBuffer response = new StringBuffer(); response.append("\"data\":{"); Index: bayeux/VerbUtils.java =================================================================== --- bayeux/VerbUtils.java (revision 728) +++ bayeux/VerbUtils.java (working copy) @@ -37,7 +37,9 @@ package com.sun.grizzly.cometd.bayeux; import java.lang.reflect.Array; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -59,157 +61,265 @@ public VerbUtils() { } + + public static List parseRequest(Object verb) { + return parse(verb, true); + } + + public static List parseResponse(Object verb) { + return parse(verb, false); + } - - public static Verb parse(Object verb){ + private static List parse(Object verb, boolean isRequest){ + List verbs = new ArrayList(); VerbBase wellFormedVerb = null; if (verb.getClass().isArray()){ int length = Array.getLength(verb); for (int i=0; i < length; i++){ wellFormedVerb = - parseMap((Map)Array.get(verb,i), wellFormedVerb); - } + parseMap((Map)Array.get(verb,i), isRequest); + if (wellFormedVerb == null){ + throw new RuntimeException("Wrong type"); + } + if (i == 0) { + wellFormedVerb.setFirst(true); + } else { + wellFormedVerb.setFollow(true); + } + verbs.add(wellFormedVerb); + } + wellFormedVerb.setLast(true); + } else { // single JSON object case + wellFormedVerb = parseMap((Map)verb, isRequest); + if (wellFormedVerb == null){ + throw new RuntimeException("Wrong type"); + } + wellFormedVerb.setFirst(true); + wellFormedVerb.setLast(true); + verbs.add(wellFormedVerb); } - if (wellFormedVerb == null){ - throw new RuntimeException("Wrong type"); - } - return wellFormedVerb; + return verbs; } - protected static VerbBase parseMap(Map map, VerbBase wellFormedVerb){ - + protected static VerbBase parseMap(Map map, boolean isRequest) { String channel = (String)map.get("channel"); - //System.out.println("Map: " + map); - if (map.get(DATA) != null){ - Data data = newData(map); - if (wellFormedVerb != null) { - wellFormedVerb.setData(data); - return wellFormedVerb; - } else { - return data; - } - } - VerbBase vb = null; if (channel.indexOf(HANDSHAKE) != -1){ - vb = newHandshake(map); + vb = (isRequest)? newHandshakeRequest(map) : newHandshakeResponse(map); } else if (channel.indexOf(CONNECT) != -1){ - vb = newConnect(map); + vb = (isRequest)? newConnectRequest(map) : newConnectResponse(map); } else if (channel.indexOf(DISCONNECT) != -1){ - vb = newDisconnect(map); + vb = (isRequest)? newDisconnectRequest(map) : newDisconnectResponse(map); } else if (channel.indexOf(RECONNECT) != -1){ - vb = newReconnect(map); + vb = (isRequest)? newReconnectRequest(map) : newReconnectResponse(map); } else if (channel.indexOf(SUBSCRIBE) != -1){ - vb = newSubscribe(map); + vb = (isRequest)? newSubscribeRequest(map) : newSubscribeResponse(map); } else if (channel.indexOf(UNSUBSCRIBE) != -1){ - vb = newUnsubscribe(map); + vb = (isRequest)? newUnsubscribeRequest(map) : newUnsubscribeResponse(map); } else if (channel.indexOf(PING) != -1){ vb = newPing(map); } else if (channel.indexOf(STATUS) != -1){ vb = newStatus(map); + } else { // publish request, publish response, deliver message + if (isRequest) { + vb = newPublishRequest(map); + } else if (map.get(DATA) != null) { + vb = newDeliverResponse(map); + } else { + vb = newPublishResponse(map); + } } configureExt(vb,map); return vb; } - private final static HandshakeRequest newHandshake(Map map){ - HandshakeRequest handshake = new HandshakeRequest(); + private final static HandshakeRequest newHandshakeRequest(Map map){ + HandshakeRequest handshakeReq = new HandshakeRequest(); - handshake.setAuthScheme((String)map.get("authScheme")); - handshake.setAuthUser((String)map.get("authUser")); - handshake.setAuthToken((String)map.get("authToken")); - handshake.setChannel((String)map.get("channel")); - handshake.setVersion((String)map.get("version")); - handshake.setMinimumVersion((String)map.get("minimumVersion")); - handshake.setId((String)map.get("id")); - handshake.setSupportedConnectionTypes((String[])map.get("supportedConnectionTypes")); - handshake.setAdvice(new Advice()); + handshakeReq.setAuthScheme((String)map.get("authScheme")); + handshakeReq.setAuthUser((String)map.get("authUser")); + handshakeReq.setAuthToken((String)map.get("authToken")); + handshakeReq.setChannel((String)map.get("channel")); + handshakeReq.setVersion((String)map.get("version")); + handshakeReq.setMinimumVersion((String)map.get("minimumVersion")); + handshakeReq.setId((String)map.get("id")); + handshakeReq.setSupportedConnectionTypes(getSupportedConnectionTypes(map)); - return handshake; + + return handshakeReq; } + + + private final static HandshakeResponse newHandshakeResponse(Map map) { + HandshakeResponse handshakeRes = new HandshakeResponse(); + + handshakeRes.setAuthScheme((String)map.get("authScheme")); + handshakeRes.setAuthUser((String)map.get("authUser")); + handshakeRes.setAuthToken((String)map.get("authToken")); + handshakeRes.setChannel((String)map.get("channel")); + handshakeRes.setVersion((String)map.get("version")); + handshakeRes.setMinimumVersion((String)map.get("minimumVersion")); + handshakeRes.setId((String)map.get("id")); + handshakeRes.setSupportedConnectionTypes(getSupportedConnectionTypes(map)); + + handshakeRes.setClientId((String)map.get("clientId")); + handshakeRes.setSuccessful((Boolean)map.get("successful")); + handshakeRes.setAuthSuccessful((Boolean)map.get("authSuccessful")); + handshakeRes.setError((String)map.get("error")); + configureAdvice(handshakeRes, map); + + return handshakeRes; + } - private final static ConnectRequest newConnect(Map map){ - ConnectRequest connect = new ConnectRequest(); + private final static ConnectRequest newConnectRequest(Map map){ + ConnectRequest connectReq = new ConnectRequest(); - connect.setAuthToken((String)map.get("authToken")); - connect.setChannel((String)map.get("channel")); - connect.setClientId((String)map.get("clientId")); - connect.setConnectionType((String)map.get("connectionType")); - connect.setId((String)map.get("id")); - connect.setAdvice(new Advice()); + connectReq.setAuthToken((String)map.get("authToken")); + connectReq.setChannel((String)map.get("channel")); + connectReq.setClientId((String)map.get("clientId")); + connectReq.setConnectionType((String)map.get("connectionType")); + connectReq.setId((String)map.get("id")); + + return connectReq; + } + + + private final static ConnectResponse newConnectResponse(Map map) { + ConnectResponse connectRes = new ConnectResponse(); - return connect; + connectRes.setAuthToken((String)map.get("authToken")); + connectRes.setChannel((String)map.get("channel")); + connectRes.setClientId((String)map.get("clientId")); + connectRes.setId((String)map.get("id")); + + connectRes.setSuccessful((Boolean)map.get("successful")); + connectRes.setError((String)map.get("error")); + connectRes.setTimestamp((String)map.get("timestamp")); + configureAdvice(connectRes, map); + return connectRes; } - private final static DisconnectRequest newDisconnect(Map map){ - DisconnectRequest disconnect = new DisconnectRequest(); + private final static DisconnectRequest newDisconnectRequest(Map map){ + DisconnectRequest disconnectReq = new DisconnectRequest(); - disconnect.setAuthToken((String)map.get("authToken")); - disconnect.setChannel((String)map.get("channel")); - disconnect.setClientId((String)map.get("clientId")); - disconnect.setId((String)map.get("id")); - disconnect.setConnectionType((String)map.get("connectionType")); + disconnectReq.setAuthToken((String)map.get("authToken")); + disconnectReq.setChannel((String)map.get("channel")); + disconnectReq.setClientId((String)map.get("clientId")); + disconnectReq.setId((String)map.get("id")); + - return disconnect; + return disconnectReq; } + + + private final static DisconnectResponse newDisconnectResponse(Map map) { + DisconnectResponse disconnectRes = new DisconnectResponse(); + + disconnectRes.setAuthToken((String)map.get("authToken")); + disconnectRes.setChannel((String)map.get("channel")); + disconnectRes.setClientId((String)map.get("clientId")); + disconnectRes.setId((String)map.get("id")); + + disconnectRes.setSuccessful((Boolean)map.get("successful")); + disconnectRes.setError((String)map.get("error")); + configureAdvice(disconnectRes, map); + + return disconnectRes; + } - private final static ReconnectRequest newReconnect(Map map){ - ReconnectRequest reconnect = new ReconnectRequest(); + private final static ReconnectRequest newReconnectRequest(Map map){ + ReconnectRequest reconnectReq = new ReconnectRequest(); - reconnect.setAuthToken((String)map.get("authToken")); - reconnect.setChannel((String)map.get("channel")); - reconnect.setClientId((String)map.get("clientId")); - reconnect.setId((String)map.get("id")); - reconnect.setConnectionType((String)map.get("connectionType")); + reconnectReq.setAuthToken((String)map.get("authToken")); + reconnectReq.setChannel((String)map.get("channel")); + reconnectReq.setClientId((String)map.get("clientId")); + reconnectReq.setId((String)map.get("id")); + - return reconnect; + return reconnectReq; } - - - @SuppressWarnings("unchecked") - private final static Data newData(Map map){ - Data data = new Data(); + + + private final static ReconnectResponse newReconnectResponse(Map map) { + ReconnectResponse reconnectRes = new ReconnectResponse(); - data.setChannel((String)map.get("channel")); - data.setClientId((String)map.get("clientId")); - data.setMapData((HashMap)map.get("data")); - data.setId((String)map.get("id")); + reconnectRes.setAuthToken((String)map.get("authToken")); + reconnectRes.setChannel((String)map.get("channel")); + reconnectRes.setClientId((String)map.get("clientId")); + reconnectRes.setId((String)map.get("id")); + + reconnectRes.setSuccessful((Boolean)map.get("successful")); + reconnectRes.setError((String)map.get("error")); - return data; + return reconnectRes; } - private final static SubscribeRequest newSubscribe(Map map){ - SubscribeRequest subscribe = new SubscribeRequest(); + private final static SubscribeRequest newSubscribeRequest(Map map){ + SubscribeRequest subscribeReq = new SubscribeRequest(); - subscribe.setChannel((String)map.get("channel")); - subscribe.setAuthToken((String)map.get("authToken")); - subscribe.setSubscription((String)map.get("subscription")); - subscribe.setClientId((String)map.get("clientId")); - subscribe.setId((String)map.get("id")); + subscribeReq.setChannel((String)map.get("channel")); + subscribeReq.setAuthToken((String)map.get("authToken")); + subscribeReq.setSubscription((String)map.get("subscription")); + subscribeReq.setClientId((String)map.get("clientId")); + subscribeReq.setId((String)map.get("id")); - return subscribe; + return subscribeReq; } + + + private final static SubscribeResponse newSubscribeResponse(Map map) { + SubscribeResponse subscribeRes = new SubscribeResponse(); + + subscribeRes.setChannel((String)map.get("channel")); + subscribeRes.setAuthToken((String)map.get("authToken")); + subscribeRes.setSubscription((String)map.get("subscription")); + subscribeRes.setClientId((String)map.get("clientId")); + subscribeRes.setId((String)map.get("id")); + + subscribeRes.setSuccessful((Boolean)map.get("successful")); + subscribeRes.setError((String)map.get("error")); + configureAdvice(subscribeRes, map); + + return subscribeRes; + } - private final static UnsubscribeRequest newUnsubscribe(Map map){ - UnsubscribeRequest unsubscribe = new UnsubscribeRequest(); + private final static UnsubscribeRequest newUnsubscribeRequest(Map map){ + UnsubscribeRequest unsubscribeReq = new UnsubscribeRequest(); - unsubscribe.setChannel((String)map.get("channel")); - unsubscribe.setAuthToken((String)map.get("authToken")); - unsubscribe.setSubscription((String)map.get("subscription")); - unsubscribe.setClientId((String)map.get("clientId")); - unsubscribe.setId((String)map.get("id")); - return unsubscribe; + unsubscribeReq.setChannel((String)map.get("channel")); + unsubscribeReq.setAuthToken((String)map.get("authToken")); + unsubscribeReq.setSubscription((String)map.get("subscription")); + unsubscribeReq.setClientId((String)map.get("clientId")); + unsubscribeReq.setId((String)map.get("id")); + return unsubscribeReq; } + private final static UnsubscribeResponse newUnsubscribeResponse(Map map){ + UnsubscribeResponse unsubscribeRes = new UnsubscribeResponse(); + + unsubscribeRes.setChannel((String)map.get("channel")); + unsubscribeRes.setAuthToken((String)map.get("authToken")); + unsubscribeRes.setSubscription((String)map.get("subscription")); + unsubscribeRes.setClientId((String)map.get("clientId")); + unsubscribeRes.setId((String)map.get("id")); + + unsubscribeRes.setSuccessful((Boolean)map.get("successful")); + unsubscribeRes.setError((String)map.get("error")); + unsubscribeRes.setTimestamp((String)map.get("timestamp")); + configureAdvice(unsubscribeRes, map); + return unsubscribeRes; + } + + private final static Ping newPing(Map map){ Ping ping = new Ping(); @@ -224,16 +334,90 @@ status.setChannel((String)map.get("channel")); return status; } + + + private final static PublishRequest newPublishRequest(Map map) { + PublishRequest publishReq = new PublishRequest(); + Map mapData = (Map)map.get("data"); + Data data = new Data(); + data.setMapData(mapData); + + publishReq.setChannel((String)map.get("channel")); + publishReq.setData(data); + publishReq.setClientId((String)map.get("clientId")); + publishReq.setId((String)map.get("id")); + return publishReq; + } + + private final static PublishResponse newPublishResponse(Map map) { + PublishResponse publishRes = new PublishResponse(); + publishRes.setChannel((String)map.get("channel")); + publishRes.setClientId((String)map.get("clientId")); + publishRes.setId((String)map.get("id")); + publishRes.setError((String)map.get("error")); + publishRes.setSuccessful((Boolean)map.get("successful")); + return publishRes; + } + + private final static DeliverResponse newDeliverResponse(Map map) { + DeliverResponse deliverRes = new DeliverResponse(); + Map mapData = (Map)map.get("data"); + Data data = new Data(); + data.setMapData(mapData); + + deliverRes.setChannel((String)map.get("channel")); + deliverRes.setData(data); + deliverRes.setClientId((String)map.get("clientId")); + deliverRes.setId((String)map.get("id")); + configureAdvice(deliverRes, map); + return deliverRes; + } @SuppressWarnings("unchecked") private static void configureExt(VerbBase vb, Map map){ - Map extMap = (Map)map.get("ext"); + Map extMap = (Map)map.get("ext"); if (extMap == null) return; Ext ext = new Ext(); ext.setExtensionMap(extMap); vb.setExt(ext); } - + + private static void configureAdvice(VerbBase vb, Map map) { + Map adviceMap = (Map)map.get("advice"); + if (adviceMap == null) return; + + Advice advice = new Advice(); + String reconnect = (String)adviceMap.get("reconnect"); + if (reconnect != null) { + advice.setReconnect(reconnect); + } + Integer interval = new Integer(((Number)adviceMap.get("interval")).intValue()); + if (interval != null) { + advice.setInterval(interval); + } + Boolean multipleClients = (Boolean)adviceMap.get("multiple-clients"); + if (multipleClients != null) { + advice.setMultipleClients(multipleClients); + } + String[] hosts = (String[])adviceMap.get("hosts"); + if (hosts != null) { + advice.setHosts(hosts); + } + + vb.setAdvice(advice); + } + + private static String[] getSupportedConnectionTypes(Map map) { + String[] types = null; + Object[] typeObjs = (Object[])map.get("supportedConnectionTypes"); + if (typeObjs != null) { + types = new String[typeObjs.length]; + for (int i = 0; i < typeObjs.length; i++) { + types[i] = (String)typeObjs[i]; + } + } + return types; + } } Index: bayeux/Reconnect.java =================================================================== --- bayeux/Reconnect.java (revision 728) +++ bayeux/Reconnect.java (working copy) @@ -99,14 +99,25 @@ type = Verb.Type.RECONNECT; } - public String toJSON() { - return "/*[{" - + "\"timestamp\":\"" + FastHttpDateFormat.getCurrentDate() + "\"," - + "\"error\":\"" + error + "\"," + public String toJSON() { + return toJSON(null); + } + + public String toJSON(String timestamp) { + StringBuilder sb = new StringBuilder( + getJSONPrefix() + "{" + "\"successful\":" + successful + "," + "\"channel\":\"" + channel + "\"" - + "}]*/\n" ; - } + ); + if (error != null) { + sb.append(",\"error\":\"" + error + "\""); + } + if (timestamp != null) { + sb.append(",\"timestamp\":\"" + timestamp + "\""); + } + sb.append("}" + getJSONPostfix()); + return sb.toString(); + } @Override protected String getMetaChannel() { Index: bayeux/Subscribe.java =================================================================== --- bayeux/Subscribe.java (revision 728) +++ bayeux/Subscribe.java (working copy) @@ -124,9 +124,9 @@ * @param isResponse * @param printTimestamp for UnsubscribeResponse */ - protected String getBody(boolean isResponse, boolean printTimestamp) { + protected String getBody(boolean isResponse, String timestamp) { StringBuilder sb = new StringBuilder( - "/*[{" + getJSONPrefix() + "{" + "\"channel\":\"" + channel + "\"" ); if (isResponse) { @@ -146,14 +146,10 @@ if (id != null) { sb.append(",\"id\":\"" + id + "\""); } - if (printTimestamp) { - sb.append(",\"timestamp\":\"" + FastHttpDateFormat.getCurrentDate() + "\""); + if (timestamp != null) { + sb.append(",\"timestamp\":\"" + timestamp + "\""); } - sb.append("}"); - if (data != null) { - sb.append("," + data.toData()); - } - sb.append("]*/\n"); + sb.append("}" + getJSONPostfix()); return sb.toString(); } } Index: bayeux/Handshake.java =================================================================== --- bayeux/Handshake.java (revision 728) +++ bayeux/Handshake.java (working copy) @@ -180,7 +180,7 @@ protected String toJSON(boolean isResponse) { StringBuilder sb = new StringBuilder( - "/*[{" + getJSONPrefix() + "{" + "\"channel\":\"" + channel + "\"" + ",\"version\":\"" + version + "\"" ); @@ -219,13 +219,13 @@ } } - sb.append("}]*/\n"); + sb.append("}" + getJSONPostfix()); return sb.toString(); } protected String toErrorResponseJSON() { StringBuilder sb = new StringBuilder( - "/*[{" + getJSONPrefix() + "{" + "\"channel\":\"" + channel + "\"" + ",\"successful\":" + successful + ",\"error\":\"" + error + "\"" @@ -262,7 +262,7 @@ sb.append(",\"authSuccessful\":" + authSuccessful); } - sb.append("}]*/\n"); + sb.append("}" + getJSONPostfix()); return sb.toString(); } } Index: bayeux/VerbBase.java =================================================================== --- bayeux/VerbBase.java (revision 728) +++ bayeux/VerbBase.java (working copy) @@ -69,6 +69,11 @@ protected Ext ext; + + // this is used to writing appropriate JSON format + protected boolean first = false; + protected boolean follow = false; + protected boolean last = false; public VerbBase() { @@ -120,6 +125,13 @@ public void setAdvice(Advice advice) { this.advice = advice; } + + /** + * Since advice is optional, null advice is valid. + */ + public boolean hasValidAdvice() { + return ((getAdvice() != null)? getAdvice().isValid() : true); + } public void setId(String id){ @@ -149,7 +161,47 @@ this.dataId = dataId; } + public boolean isFirst() { + return first; + } + + public void setFirst(boolean first) { + this.first = first; + } + + public boolean isFollow() { + return follow; + } + + public void setFollow(boolean follow) { + this.follow = follow; + } + + public boolean isLast() { + return last; + } + + public void setLast(boolean last) { + this.last = last; + } + public boolean isValid() { return true; } + + protected String getJSONPrefix() { + String prefix = null; + if (first) { + prefix = "/*["; + } else if (follow) { + prefix = ", "; + } else { + prefix = ""; + } + return prefix; + } + + protected String getJSONPostfix() { + return ((last)? "]*/" : ""); + } } Index: bayeux/ReconnectRequest.java =================================================================== --- bayeux/ReconnectRequest.java (revision 728) +++ bayeux/ReconnectRequest.java (working copy) @@ -43,14 +43,6 @@ public class ReconnectRequest extends Reconnect { public ReconnectRequest() { - type = Verb.Type.RECONNECT; + super(); } - - public String getConnectionType() { - return connectionType; - } - - public void setConnectionType(String connectionType) { - this.connectionType = connectionType; - } } Index: bayeux/SubscribeRequest.java =================================================================== --- bayeux/SubscribeRequest.java (revision 728) +++ bayeux/SubscribeRequest.java (working copy) @@ -47,6 +47,6 @@ } public String toJSON() { - return getBody(false, false); + return getBody(false, null); } } Index: bayeux/Advice.java =================================================================== --- bayeux/Advice.java (revision 728) +++ bayeux/Advice.java (working copy) @@ -55,6 +55,9 @@ public class Advice extends VerbBase{ // "none", "retry", "handshake", "recover" (deprecated) + private static final String[] VALID_RECONNECT = + new String[] { "none", "retry", "handshake", "recover" }; + private String reconnect = "retry"; @@ -146,4 +149,12 @@ this.transport = transport; } + public boolean isValid() { + for (String rc : VALID_RECONNECT) { + if (rc.equals(reconnect)) { + return true; + } + } + return false; + } } Index: bayeux/UnsubscribeRequest.java =================================================================== --- bayeux/UnsubscribeRequest.java (revision 728) +++ bayeux/UnsubscribeRequest.java (working copy) @@ -85,6 +85,6 @@ } public String toJSON() { - return getBody(false, false); + return getBody(false, null); } } Index: bayeux/Verb.java =================================================================== --- bayeux/Verb.java (revision 728) +++ bayeux/Verb.java (working copy) @@ -59,6 +59,8 @@ RECONNECT, // deprecated SUBSCRIBE, UNSUBSCRIBE, + PUBLISH, + DELIVER, STATUS, // deprecated PING, // deprecated Index: util/JSONParser.java =================================================================== --- util/JSONParser.java (revision 728) +++ util/JSONParser.java (working copy) @@ -504,6 +504,12 @@ private int index; Source(String s) { + // support JSON comment filtered format + int firstInd = s.indexOf("/*"); + int lastInd = s.lastIndexOf("*/"); + if (firstInd != -1 && lastInd != -1) { + s = s.substring(firstInd + 2, lastInd); + } string=s; } Index: BayeuxCometHandlerBase.java =================================================================== --- BayeuxCometHandlerBase.java (revision 728) +++ BayeuxCometHandlerBase.java (working copy) @@ -75,8 +75,8 @@ case UNSUBSCRIBE: onUnsubscribe(event); break; - case DATA: - onData(event); + case PUBLISH: + onPublish(event); break; case PING: onPing(event);