# This patch file was generated by NetBeans IDE
# Following Index: paths are relative to: C:\Projects\grizzly\trunk\samples\framework-samples\src\main\java
# This patch can be applied using context Tools: Patch action on respective folder.
# It uses platform neutral UTF-8 encoding and \n newlines.
# Above lines and this line are ignored by the patching process.
Index: com/sun/grizzly/filter/BytesTrafficListener.java
--- com/sun/grizzly/filter/BytesTrafficListener.java Locally New
+++ com/sun/grizzly/filter/BytesTrafficListener.java Locally New
@@ -0,0 +1,53 @@
+/*
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ *
+ * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
+ *
+ * The contents of this file are subject to the terms of either the GNU
+ * General Public License Version 2 only ("GPL") or the Common Development
+ * and Distribution License("CDDL") (collectively, the "License"). You
+ * may not use this file except in compliance with the License. You can obtain
+ * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
+ * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
+ * language governing permissions and limitations under the License.
+ *
+ * When distributing the software, include this License Header Notice in each
+ * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
+ * Sun designates this particular file as subject to the "Classpath" exception
+ * as provided by Sun in the GPL Version 2 section of the License file that
+ * accompanied this code. If applicable, add the following below the License
+ * Header, with the fields enclosed by brackets [] replaced by your own
+ * identifying information: "Portions Copyrighted [year]
+ * [name of copyright owner]"
+ *
+ * Contributor(s):
+ *
+ * If you wish your version of this file to be governed by only the CDDL or
+ * only the GPL Version 2, indicate your decision by adding "[Contributor]
+ * elects to include this software in this distribution under the [CDDL or GPL
+ * Version 2] license." If you don't indicate a single choice of license, a
+ * recipient has the option to distribute your version of this file under
+ * either the CDDL, the GPL Version 2 or to extend the choice of license to
+ * its licensees as provided above. However, if you add GPL Version 2 code
+ * and therefore, elected the GPL Version 2 license, then the option applies
+ * only if the new code is made subject to such option by the copyright
+ * holder.
+ *
+ */
+package com.sun.grizzly.filter;
+
+/**
+ * This could be used by Progressbars to show an indication that bytes go over the wire
+ * @author John Vieten 30.06.2008
+ * @version 1.0
+ */
+public interface BytesTrafficListener {
+ /**
+ * Approximate indication that bytes have been send or received
+ * Please b carefull not to do any timeconsuming operation in here since
+ * it be called on Grizzly Workerthread
+ */
+ public void traffic();
+
+}
Index: com/sun/grizzly/filter/ClosedMessage.java
--- com/sun/grizzly/filter/ClosedMessage.java Locally New
+++ com/sun/grizzly/filter/ClosedMessage.java Locally New
@@ -0,0 +1,9 @@
+package com.sun.grizzly.filter;
+
+/**
+ * @author John Vieten 03.07.2008
+ * @version 1.0
+ */
+public class ClosedMessage extends MessageBase{
+
+}
Index: com/sun/grizzly/filter/CustomProtocolClient.java
--- com/sun/grizzly/filter/CustomProtocolClient.java Base (BASE)
+++ com/sun/grizzly/filter/CustomProtocolClient.java Locally Modified (Based On LOCAL)
@@ -37,29 +37,21 @@
*/
package com.sun.grizzly.filter;
-import com.sun.grizzly.BaseSelectionKeyHandler;
-import com.sun.grizzly.CallbackHandler;
-import com.sun.grizzly.Context;
-import com.sun.grizzly.Controller;
-import com.sun.grizzly.ControllerStateListenerAdapter;
-import com.sun.grizzly.DefaultPipeline;
-import com.sun.grizzly.DefaultProtocolChain;
-import com.sun.grizzly.DefaultProtocolChainInstanceHandler;
-import com.sun.grizzly.IOEvent;
-import com.sun.grizzly.ProtocolChain;
-import com.sun.grizzly.ProtocolChainInstanceHandler;
-import com.sun.grizzly.ProtocolFilter;
-import com.sun.grizzly.TCPConnectorHandler;
-import com.sun.grizzly.TCPSelectorHandler;
+import com.sun.grizzly.*;
+
+
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.List;
import java.util.ArrayList;
+
+import java.nio.channels.SelectionKey;
+
import java.net.InetSocketAddress;
import java.io.IOException;
-import java.io.InputStream;
+import java.io.OutputStream;
-import java.nio.channels.SelectionKey;
+
import static com.sun.grizzly.filter.CustomProtocolHelper.logger;
import static com.sun.grizzly.filter.CustomProtocolHelper.log;
@@ -71,15 +63,14 @@
* client.init(new YourMessageDispatcher());
* client.start();
* client.connect(new InetSocketAddress("localhost", 8087));
- *
- *
+ *
+ *
*
*
* @author John Vieten 28.06.2008
* @version 1.0
*/
public class CustomProtocolClient {
-
private Controller controller;
private TCPConnectorHandler connectorHandler;
private CallbackHandler callbackHandler;
@@ -87,55 +78,74 @@
private DefaultProtocolChain protocolChain = new DefaultProtocolChain();
private List beforeParserList = new ArrayList();
private List afterParserList = new ArrayList();
+ private BytesTrafficListener bytesArrivedListener;
private int requestId;
private int session = 0;
private boolean gzip;
private MessageDispatcher dispatcher;
private CountDownLatch started;
- private int minWorkerThreads = 1;
- private int maxWorkerThreads = 1;
+ private IOExceptionHandler ioExceptionHandler;
+
+
+ private int minWorkerThreads = 5;
+ private int maxWorkerThreads = 20;
+
/**
- * If bytes send to server are gzipped
+ * If bytes send to server are gzipped
+ *
* @return zipped
*/
public boolean isGzip() {
+ // if(true) return false;
return gzip;
}
/**
- * If bytes send to server should be gzipped
- * @param gzip true if they should
+ * If bytes send to server should be gzipped
+ *
+ * @param gzip true if they should
*/
public void setGzip(boolean gzip) {
this.gzip = gzip;
}
+
+ public void setBytesArrivedListener(BytesTrafficListener bytesArrivedListener) {
+ this.bytesArrivedListener = bytesArrivedListener;
+ }
+
/**
* Giving the {@link com.sun.grizzly.TCPConnectorHandler} to the Client
- * @param connectorHandler TCP Connector Handler
+ *
*/
- public void setConnectorHandler(TCPConnectorHandler connectorHandler) {
- this.connectorHandler = connectorHandler;
- }
+
+
public TCPConnectorHandler getConnectorHandler() {
+ if(connectorHandler==null) {
+ connectorHandler=(TCPConnectorHandler) controller.acquireConnectorHandler(Controller.Protocol.TCP);
+ }
+
return connectorHandler;
}
/**
* User can override the default Callbackhandler.
* For Example an Proxy Callbackhandler could be plugged here in.
+ *
* @param callbackHandler for non blocking client operations
*/
+
public void setCallbackHandler(CallbackHandler callbackHandler) {
this.callbackHandler = callbackHandler;
}
/**
* Set up the Grizzly Worker Threads.
- * @param minWorkerThreads min count of grizzly workers
- * @param maxWorkerThreads max count of grizzly workers
+ *
+ * @param minWorkerThreads min count of grizzly workers
+ * @param maxWorkerThreads max count of grizzly workers
*/
public void setThreadSizes(int minWorkerThreads, int maxWorkerThreads) {
this.minWorkerThreads = minWorkerThreads;
@@ -146,8 +156,9 @@
* Users can add addtional ProtocolFilters to the Reading Chain.
* These filter here will be called before or after the CustomProtocol Filters will execute
* (Remark have to check if after makes sense...)
+ *
* @param protocolFilter Filter for chain
- * @param beforeParser before or after
+ * @param beforeParser before or after
*/
public void addProtocolFilter(ProtocolFilter protocolFilter, boolean beforeParser) {
if (beforeParser) {
@@ -158,8 +169,9 @@
}
/**
- * Adds a Filter before the CustomProtocol Filters will execute.
- * @param protocolFilter Filter for chain
+ * Adds a Filter before the CustomProtocol Filters will execute.
+ *
+ * @param protocolFilter Filter for chain
*/
public void addProtocolFilter(ProtocolFilter protocolFilter) {
addProtocolFilter(protocolFilter, true);
@@ -176,31 +188,24 @@
controller = new Controller();
DefaultPipeline defp = new DefaultPipeline();
+
defp.setMinThreads(minWorkerThreads);
defp.setMaxThreads(maxWorkerThreads);
defp.setInitialByteBufferSize(com.sun.grizzly.filter.Message.MessageMaxLength);
-
controller.setPipeline(defp);
selectorHandler = new TCPSelectorHandler(true);
BaseSelectionKeyHandler keyHandler = new BaseSelectionKeyHandler();
- if (connectorHandler == null) {
- connectorHandler = new TCPConnectorHandler();
- }
- connectorHandler.setController(controller);
selectorHandler.setSelectionKeyHandler(keyHandler);
controller.addSelectorHandler(selectorHandler);
started = new CountDownLatch(1);
- controller.addStateListener(new ControllerStateListenerAdapter() {
- @Override
+ controller.addStateListener(new ControllerStateListenerAdapter() {
public void onException(Throwable e) {
logger().log(Level.SEVERE, "Grizzly controller exception", e);
}
-
- @Override
public void onReady() {
started.countDown();
}
@@ -208,10 +213,12 @@
for (ProtocolFilter protocolFilter : beforeParserList) {
protocolChain.addFilter(protocolFilter);
}
- protocolChain.addFilter(CustomProtocolParser.createParserProtocolFilter());
- if (dispatcher != null) {
+
+ ProtocolOutputStream.setBytesTrafficListener(bytesArrivedListener);
+ ProtocolOutputStream.setExceptionHandler(ioExceptionHandler);
+ protocolChain.addFilter(CustomProtocolParser.createParserProtocolFilter( bytesArrivedListener));
+ if (dispatcher != null)
protocolChain.addFilter(dispatcher);
- }
for (ProtocolFilter protocolFilter : afterParserList) {
protocolChain.addFilter(protocolFilter);
@@ -220,7 +227,6 @@
controller.setProtocolChainInstanceHandler(
new ProtocolChainInstanceHandler() {
-
public ProtocolChain poll() {
return protocolChain;
}
@@ -233,12 +239,10 @@
ProtocolChainInstanceHandler pciHandler = new DefaultProtocolChainInstanceHandler() {
- @Override
public ProtocolChain poll() {
return protocolChain;
}
- @Override
public boolean offer(ProtocolChain protocolChain) {
return false;
}
@@ -251,41 +255,43 @@
/**
* Starts the Grizzly Framework
*/
+
public void start() {
Thread t = new Thread(controller);
- t.setDaemon(true);
t.start();
+ try {
+ started.await();
+ log("controller.start() done");
+
+ } catch (Exception e) {
+ logger().log(Level.SEVERE, "Timeout in wait", e);
}
+ }
+
/**
* Stops the Grizzly Framework
+ *
* @throws Exception Exception
*/
public void stop() throws Exception {
controller.stop();
- if (dispatcher != null) {
- dispatcher.stop();
+ if (dispatcher != null) dispatcher.stop();
}
- }
/**
* Connects to an Server.
* Methods {@link #init} and {@link #start} must have been called before.
- * @param address host and port
+ *
+ * @param address host and port
* @throws IOException Exception
*/
+
public void connect(InetSocketAddress address) throws IOException {
- try {
- started.await();
- } catch (Exception e) {
- logger().log(Level.SEVERE, "Timeout in wait", e);
- }
if (callbackHandler == null) {
callbackHandler = getDefaultCallbackHandler();
}
-
-
- connectorHandler.connect(address, callbackHandler, selectorHandler);
+ getConnectorHandler().connect(address, callbackHandler, selectorHandler);
}
/**
@@ -296,20 +302,26 @@
*/
public CallbackHandler getDefaultCallbackHandler() {
return new CallbackHandler() {
-
public void onConnect(IOEvent ioEvent) {
SelectionKey k = ioEvent.attachment().getSelectionKey();
try {
- connectorHandler.finishConnect(k);
+ getConnectorHandler().finishConnect(k);
log("finishConnect");
- } catch (Exception ex) {
- logger().log(Level.SEVERE, "CallbackHandler", ex);
+ } catch (java.io.IOException ex) {
+ ioExceptionHandler.handle(ex);
+ return;
}
+ catch (Throwable ex) {
+ logger().log(Level.SEVERE,"onConnect",ex);
+ return;
+ }
+
ioEvent.attachment().getSelectorHandler().register(k, SelectionKey.OP_READ);
}
+
public void onRead(IOEvent ioEvent) {
try {
Context ctx = ioEvent.attachment();
@@ -335,58 +347,60 @@
/**
* Sends data to server. The caller receives an InputStream with
- * which the server streams an reply back to the client.
+ * which the server streams an reply back to the client.
*
- *
- *
- * @param data the data to be send to the server
- * @return an InputStream that blocks until data arrives
- * @throws Exception ex
+ * @return an RequestReplyHolder that blocks until data arrives
+ * @throws Exception ex
*/
- public InputStream sendRequest(final Object data) throws Exception {
+ public RequestReplyHolder prepareRequest() throws Exception {
final int id = getNextRequestId();
- InputStream future = ReplyMessage.getInputStream(id);
- writeRequest(data, id);
- return future;
+ final BlockingBuffersInputStream future = ReplyMessage.getInputStream(id);
+ final OutputStream outputStream=getRequestOutputStream(id,future);
+ RequestReplyHolder holder=new RequestReplyHolder(future,outputStream);
+ return holder;
}
- /**
- * Sends data to server without wanting an reply.
- * @param data the data to be send to the server
- * @throws Exception ex
- */
- public void sendOneWayRequest(final Object data) throws Exception {
- final int id = getNextRequestId();
- writeRequest(data, id);
- }
+
/**
* Every message send to an server can have a session id attached so that
* server can keep track of its clients.
- * @param session an id
+ *
+ * @param session an id
*/
+
public void setSession(int session) {
this.session = session;
}
- private int writeRequest(Object data, final int requestId) throws Exception {
- ProtocolWriter.write(requestId,
+ public OutputStream getRequestOutputStream() throws IOException {
+ return getRequestOutputStream(getNextRequestId(),null);
+ }
+
+ private OutputStream getRequestOutputStream(final int requestId,final BlockingBuffersInputStream inputStream) throws IOException {
+ return ProtocolOutputStream.getOutputStream(requestId,
session,
Message.Message_Request,
- data,
isGzip(),
- connectorHandler);
+ getConnectorHandler(),
+ inputStream
+ );
- return requestId;
}
+
/**
* Every Request Message gets an unique request id so that replies can be assigned back to
* their requests.
+ *
* @return unique id
*/
private int getNextRequestId() {
return requestId++;
}
+
+ public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
+ this.ioExceptionHandler = ioExceptionHandler;
}
+}
Index: com/sun/grizzly/filter/FragmentMessage.java
--- com/sun/grizzly/filter/FragmentMessage.java Base (BASE)
+++ com/sun/grizzly/filter/FragmentMessage.java Locally Modified (Based On LOCAL)
@@ -54,7 +54,6 @@
*/
public class FragmentMessage extends MessageBase{
private List byteBufferList= new ArrayList();
- @Override
public void addByteBuffer(ByteBuffer byteBuffer) {
super.addByteBuffer(byteBuffer);
byteBufferList.add(byteBuffer);
Index: com/sun/grizzly/filter/IOExceptionHandler.java
--- com/sun/grizzly/filter/IOExceptionHandler.java Locally New
+++ com/sun/grizzly/filter/IOExceptionHandler.java Locally New
@@ -0,0 +1,11 @@
+package com.sun.grizzly.filter;
+
+import java.io.IOException;
+
+/**
+ * @author John Vieten 03.07.2008
+ * @version 1.0
+ */
+public interface IOExceptionHandler {
+ void handle(IOException ex);
+}
Index: com/sun/grizzly/filter/Message.java
--- com/sun/grizzly/filter/Message.java Base (BASE)
+++ com/sun/grizzly/filter/Message.java Locally Modified (Based On LOCAL)
@@ -46,8 +46,12 @@
* @version 1.0
*/
public interface Message {
- static final int Magic = 0x98784F50;
static final int HeaderLength = 23;
+
+
+ static final int Magic = 0x98784F50;
+
+ static final int MagicByteLength = 4;
static final int MessageMaxLength = 8192;
static final byte CurrentVersion = 1;
static final byte Message_Request = 0;
@@ -64,4 +68,11 @@
static final byte MORE_FRAGMENTS_BIT = 0x02;
static final byte GZIP_BIT = 0x04;
+ static public enum ErrorCode {
+ ERROR_CODE_MAGIC,
+ ERROR_CODE_HEADER_FORMAT,
+ ERROR_CODE_UNKOWN_MESSAGE_TYPE,
+ ERROR_CODE_NO_CONNECTION
}
+
+}
Index: com/sun/grizzly/filter/MessageBase.java
--- com/sun/grizzly/filter/MessageBase.java Base (BASE)
+++ com/sun/grizzly/filter/MessageBase.java Locally Modified (Based On LOCAL)
@@ -38,6 +38,8 @@
package com.sun.grizzly.filter;
import java.nio.ByteBuffer;
+import java.util.logging.Logger;
+
import static com.sun.grizzly.filter.CustomProtocolHelper.log;
/**
* Note idea taken from {@link com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase}
@@ -60,7 +62,7 @@
/**
- * Parsers the Header of the Custom Protocol.
+ * Parses the Header of the Custom Protocol.
* The Protocol has the following format:
*
* byte 1 Magic :Used to identify Protocol
@@ -82,7 +84,7 @@
* byte 17 Request ID
* byte 18 Request ID
* byte 19 Request ID
- * byte 20 Session ID : Session Id of Message (Used to authenticate Client)
+ * byte 20 Session ID : Session Id of Message (Used to authenticate Client).
* byte 21 Session ID
* byte 22 Session ID
* byte 23 Session ID
@@ -91,9 +93,11 @@
* @param buf Bytebuffer with read in bytes
* @param startPosition the message beginning
* @return a Message based on the Header bytes
+ * @throws MessageParseException Indicating a parse Exception
*/
- public static MessageBase parseHeader(ByteBuffer buf, int startPosition) {
- MessageBase result = null;
+
+ public static MessageBase parseHeader(ByteBuffer buf, int startPosition) throws MessageParseException{
+ MessageBase result;
try {
byte[] it = new byte[HeaderLength];
@@ -101,21 +105,10 @@
buf.position(startPosition);
buf.get(it);
buf.position(restorePosition);
- int b1, b2, b3, b4;
- b1 = (it[0] << 24) & 0xFF000000;
- b2 = (it[1] << 16) & 0x00FF0000;
- b3 = (it[2] << 8) & 0x0000FF00;
- b4 = (it[3] << 0) & 0x000000FF;
- int magic = (b1 | b2 | b3 | b4);
+ checkMagic(buf,startPosition);
- if (magic != Magic) {
- result = new MessageError(getErrorMsg(buf, startPosition, "Unknown Magic"),
- MessageError.ErrorCode.ERROR_CODE_MAGIC);
- return result;
-
- }
if (it[4] > CurrentVersion) {
}
@@ -133,33 +126,64 @@
case Message_Error:
result = new MessageError();
break;
+ default: throw new MessageParseException(
+ getErrorMsg(buf, startPosition, "Unknown Message Type"),
+ MessageError.ErrorCode.ERROR_CODE_UNKOWN_MESSAGE_TYPE);
}
-
result.messageType = it[5];
result.flags = it[6];
result.uniqueMessageId = readInt(it[7], it[8], it[9], it[10]);
result.messageSize = readInt(it[11], it[12], it[13], it[14]);
- result.unmarshalRequestID(buf);
- result.unmarshalSessionID(buf);
+ result.unmarshalRequestID(buf,startPosition);
+ result.unmarshalSessionID(buf,startPosition);
} catch (Exception e) {
- e.printStackTrace();
-
- result = new MessageError(getErrorMsg(buf, startPosition, " Bad Header Format"),
+ //e.printStackTrace();
+ System.out.println(CustomProtocolHelper.printBuffer("bas",buf));
+ throw new MessageParseException(
+ getErrorMsg(buf, startPosition, "Bad Header Format"),
MessageError.ErrorCode.ERROR_CODE_HEADER_FORMAT);
+
}
return result;
}
+ /**
+ * Checks if the current byteBuffer has a message starting with the
+ * correct <@link Message#Magic>
+ * @param buf current buffer containing message bytes
+ * @param startPosition start of current message
+ * @throws MessageParseException
+ */
+ public static void checkMagic(ByteBuffer buf, int startPosition) throws MessageParseException{
+ int b1, b2, b3, b4;
+ b1 = (buf.get(startPosition) << 24) & 0xFF000000;
+ b2 = (buf.get(startPosition+1) << 16) & 0x00FF0000;
+ b3 = (buf.get(startPosition+2)<< 8) & 0x0000FF00;
+ b4 = (buf.get(startPosition+3) << 0) & 0x000000FF;
+
+ int magic = (b1 | b2 | b3 | b4);
+
+ if (magic != Magic) {
+ System.out.println(CustomProtocolHelper.printBuffer("", buf));
+ throw new MessageParseException(
+ getErrorMsg(buf, startPosition, "Unknown Magic"),
+ MessageError.ErrorCode.ERROR_CODE_MAGIC);
+ }
+
+ }
+
+
+
private static String getErrorMsg(ByteBuffer buf, int startPosition, String hint) {
StringBuffer sb = new StringBuffer();
sb.append("Server Protocol Error. Could not parse Header").append(hint).append("\n");
sb.append("Position:").append(buf.position()).append("\n");
sb.append("Limit:").append(buf.limit()).append("\n");
sb.append("Parse Position:").append(startPosition).append("\n");
- sb.append(CustomProtocolHelper.printBuffer("", buf));
+ // sb.append(CustomProtocolHelper.printBuffer("", buf));
return sb.toString();
}
@@ -202,10 +226,16 @@
return messageSize;
}
+
public int getNeededBytesSize() {
- int missingMessageSize = messageSize - bytesRemaining;
- return missingMessageSize;
+ return messageSize - bytesRemaining;
}
+ /**Signals that more Messages will follow all belonging to
+ * the first Message with all the same uniqueId.
+ * The last Message will return on {@link #moreFragmentsToFollow} false.
+ *
+ * @return if more fragments are expected
+ */
public boolean moreFragmentsToFollow() {
return ((this.flags & MORE_FRAGMENTS_BIT) == MORE_FRAGMENTS_BIT);
@@ -222,14 +252,12 @@
this.requestId = requestId;
}
- private void unmarshalRequestID(ByteBuffer byteBuffer) {
+ private void unmarshalRequestID(final ByteBuffer byteBuffer,final int s) {
int b1, b2, b3, b4;
- b1 = (byteBuffer.get(15) << 24) & 0xFF000000;
- b2 = (byteBuffer.get(16) << 16) & 0x00FF0000;
- b3 = (byteBuffer.get(17) << 8) & 0x0000FF00;
- b4 = (byteBuffer.get(18) << 0) & 0x000000FF;
-
-
+ b1 = (byteBuffer.get(s+15) << 24) & 0xFF000000;
+ b2 = (byteBuffer.get(s+16) << 16) & 0x00FF0000;
+ b3 = (byteBuffer.get(s+17) << 8) & 0x0000FF00;
+ b4 = (byteBuffer.get(s+18) << 0) & 0x000000FF;
setRequestId (b1 | b2 | b3 | b4);
}
@@ -237,17 +265,20 @@
return sessionId;
}
- private void unmarshalSessionID(ByteBuffer byteBuffer) {
+ private void unmarshalSessionID(final ByteBuffer byteBuffer,final int s) {
int b1, b2, b3, b4;
- b1 = (byteBuffer.get(19) << 24) & 0xFF000000;
- b2 = (byteBuffer.get(20) << 16) & 0x00FF0000;
- b3 = (byteBuffer.get(21) << 8) & 0x0000FF00;
- b4 = (byteBuffer.get(22) << 0) & 0x000000FF;
+ b1 = (byteBuffer.get(s+19) << 24) & 0xFF000000;
+ b2 = (byteBuffer.get(s+20) << 16) & 0x00FF0000;
+ b3 = (byteBuffer.get(s+21) << 8) & 0x0000FF00;
+ b4 = (byteBuffer.get(s+22) << 0) & 0x000000FF;
this.sessionId = (b1 | b2 | b3 | b4);
}
+
+
+
private static int readInt(byte b1, byte b2, byte b3, byte b4) {
int a1, a2, a3, a4;
a1 = (b1 << 24) & 0xFF000000;
Index: com/sun/grizzly/filter/MessageContext.java
--- com/sun/grizzly/filter/MessageContext.java Base (BASE)
+++ com/sun/grizzly/filter/MessageContext.java Locally Deleted
@@ -1,69 +0,0 @@
-/*
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
- *
- * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
- *
- * The contents of this file are subject to the terms of either the GNU
- * General Public License Version 2 only ("GPL") or the Common Development
- * and Distribution License("CDDL") (collectively, the "License"). You
- * may not use this file except in compliance with the License. You can obtain
- * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
- * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
- * language governing permissions and limitations under the License.
- *
- * When distributing the software, include this License Header Notice in each
- * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
- * Sun designates this particular file as subject to the "Classpath" exception
- * as provided by Sun in the GPL Version 2 section of the License file that
- * accompanied this code. If applicable, add the following below the License
- * Header, with the fields enclosed by brackets [] replaced by your own
- * identifying information: "Portions Copyrighted [year]
- * [name of copyright owner]"
- *
- * Contributor(s):
- *
- * If you wish your version of this file to be governed by only the CDDL or
- * only the GPL Version 2, indicate your decision by adding "[Contributor]
- * elects to include this software in this distribution under the [CDDL or GPL
- * Version 2] license." If you don't indicate a single choice of license, a
- * recipient has the option to distribute your version of this file under
- * either the CDDL, the GPL Version 2 or to extend the choice of license to
- * its licensees as provided above. However, if you add GPL Version 2 code
- * and therefore, elected the GPL Version 2 license, then the option applies
- * only if the new code is made subject to such option by the copyright
- * holder.
- *
- */
-package com.sun.grizzly.filter;
-
-import com.sun.grizzly.async.AsyncQueueWriter;
-import com.sun.grizzly.util.AttributeHolder;
-
-import java.nio.channels.SelectionKey;
-import java.nio.ByteBuffer;
-import java.util.Set;
-import java.io.IOException;
-
-/**
- * Gives Client a way to write, query and store an Connection.
- *
- * @author John Vieten 28.06.2008
- * @version 1.0
- */
-public interface MessageContext {
- public AsyncQueueWriter getAsyncQueueWriter();
-
- public void writeToAsyncQueue(ByteBuffer b) throws IOException;
-
- public Set keys();
-
- public SelectionKey getSelectionKey();
-
- public void closeConnection(SelectionKey key);
-
- public AttributeHolder getAttributeHolder();
-
-
-
-}
Index: com/sun/grizzly/filter/MessageContextImpl.java
--- com/sun/grizzly/filter/MessageContextImpl.java Base (BASE)
+++ com/sun/grizzly/filter/MessageContextImpl.java Locally Deleted
@@ -1,104 +0,0 @@
-/*
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
- *
- * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
- *
- * The contents of this file are subject to the terms of either the GNU
- * General Public License Version 2 only ("GPL") or the Common Development
- * and Distribution License("CDDL") (collectively, the "License"). You
- * may not use this file except in compliance with the License. You can obtain
- * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
- * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
- * language governing permissions and limitations under the License.
- *
- * When distributing the software, include this License Header Notice in each
- * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
- * Sun designates this particular file as subject to the "Classpath" exception
- * as provided by Sun in the GPL Version 2 section of the License file that
- * accompanied this code. If applicable, add the following below the License
- * Header, with the fields enclosed by brackets [] replaced by your own
- * identifying information: "Portions Copyrighted [year]
- * [name of copyright owner]"
- *
- * Contributor(s):
- *
- * If you wish your version of this file to be governed by only the CDDL or
- * only the GPL Version 2, indicate your decision by adding "[Contributor]
- * elects to include this software in this distribution under the [CDDL or GPL
- * Version 2] license." If you don't indicate a single choice of license, a
- * recipient has the option to distribute your version of this file under
- * either the CDDL, the GPL Version 2 or to extend the choice of license to
- * its licensees as provided above. However, if you add GPL Version 2 code
- * and therefore, elected the GPL Version 2 license, then the option applies
- * only if the new code is made subject to such option by the copyright
- * holder.
- *
- */
-package com.sun.grizzly.filter;
-
-import com.sun.grizzly.async.AsyncQueueWriter;
-import com.sun.grizzly.util.AttributeHolder;
-import com.sun.grizzly.util.ThreadAttachment;
-import com.sun.grizzly.Context;
-import com.sun.grizzly.SelectorHandler;
-
-import java.nio.channels.SelectionKey;
-import java.nio.ByteBuffer;
-import java.util.Set;
-import java.io.IOException;
-
-/**
- * Gives Client a way to write, query and store an Connection.
- *
- * @author John Vieten 28.06.2008
- * @version 1.0
- */
-public class MessageContextImpl implements MessageContext {
- private AsyncQueueWriter asyncQueueWriter;
-
- private SelectionKey currentKey;
- private SelectorHandler selectorHandler;
-
-
- public MessageContextImpl(final Context context) {
- this.asyncQueueWriter = context.getSelectorHandler().getAsyncQueueWriter();
- currentKey = context.getSelectionKey();
- selectorHandler = context.getSelectorHandler();
-
- }
-
- public AsyncQueueWriter getAsyncQueueWriter() {
- return asyncQueueWriter;
- }
-
- public void writeToAsyncQueue(ByteBuffer b) throws IOException {
- asyncQueueWriter.write(currentKey,b);
- }
-
- public AttributeHolder getAttributeHolder() {
-
- Object attachment = getSelectionKey().attachment();
- if (attachment instanceof AttributeHolder) {
- return (AttributeHolder) attachment;
- } else {
- ThreadAttachment threadAttachment = new ThreadAttachment();
- currentKey.attach(threadAttachment);
- return threadAttachment;
- }
-
-
- }
-
- public SelectionKey getSelectionKey() {
- return currentKey;
- }
-
- public Set keys() {
- return selectorHandler.keys();
- }
-
- public void closeConnection(SelectionKey key) {
- selectorHandler.getSelectionKeyHandler().cancel(key);
- }
-}
Index: com/sun/grizzly/filter/MessageDispatcher.java
--- com/sun/grizzly/filter/MessageDispatcher.java Base (BASE)
+++ com/sun/grizzly/filter/MessageDispatcher.java Locally Modified (Based On LOCAL)
@@ -89,22 +89,22 @@
if(executorService!=null) executorService.shutdown();
}
-
public boolean execute(Context ctx) throws IOException {
+
if(executorService==null) {
executorService=Executors.newFixedThreadPool(threadPoolSize);
}
final MessageBase incomingMessage = (MessageBase) ctx.removeAttribute(ProtocolParser.MESSAGE);
switch (incomingMessage.getMessageType()) {
+ case Message.Message_Error:
+ dispatch(incomingMessage, ctx);
+ break;
case Message.Message_Reply:
case Message.Message_Request:
-
- case Message.Message_Error:
if (incomingMessage.moreFragmentsToFollow()) {
attachToConnection(incomingMessage, ctx);
} else {
-
incomingMessage.allDataParsed();
}
dispatch(incomingMessage, ctx);
@@ -148,7 +148,6 @@
WorkerThread workerThread = (WorkerThread) Thread.currentThread();
AttributeHolder connectionAttrs = workerThread.getAttachment();
return (Map) connectionAttrs.getAttribute(needMoreDataMessageMapKey);
-
}
private BlockingBuffersMessage getFromMessageMap(int uniqueId) {
@@ -156,43 +155,45 @@
}
- private void dispatch(final MessageBase msg, final Context workerCtx) {
- final MessageContext msgContext=new MessageContextImpl(workerCtx);
+ private void dispatch(final Message msg, final Context workerCtx) {
+ workerCtx.suspend();
+ // workerCtx.blockRecycle(true);
+ workerCtx.setKeyRegistrationState(Context.KeyRegistrationState.REGISTER);
+
+
executorService.execute(new Runnable() {
public void run() {
switch (msg.getMessageType()) {
case Message.Message_Request:
- onRequestMessage((RequestMessage) msg, msgContext);
+ onRequestMessage((RequestMessage) msg, workerCtx);
break;
case Message.Message_Reply:
-
-
- //hadled by
break;
case Message.Message_Error:
- onMessageError((MessageError) msg, msgContext);
+ onMessageError((MessageError) msg, workerCtx);
break;
case Message.Message_Fragment:
CustomProtocolHelper.logger().log(Level.SEVERE, "Cannot dispatch Fragment");
-
}
+ workerCtx.resume();
}
});
}
+
+
/**
* Pluginpoint to handle an Message received from some Endpoint
*
* @param msg RequestMessage the message received from an Endpoint
* @param ctx MessageContext Simple API for writing,quering the connection.
*/
- abstract public void onRequestMessage(RequestMessage msg, MessageContext ctx);
+ abstract public void onRequestMessage(RequestMessage msg, Context ctx);
+ abstract public void onMessageError(MessageError msg, Context ctx);
- abstract public void onMessageError(MessageError msg, MessageContext ctx);
-
}
Index: com/sun/grizzly/filter/MessageParseException.java
--- com/sun/grizzly/filter/MessageParseException.java Locally New
+++ com/sun/grizzly/filter/MessageParseException.java Locally New
@@ -0,0 +1,30 @@
+package com.sun.grizzly.filter;
+
+/**
+ * If something went wrong during parsing like a Bad Magic or bad Message Format
+ * usually a <@link MessageParseException> is thrown.
+ *
+ * @author John Vieten 01.07.2008
+ * @version 1.0
+ */
+public class MessageParseException extends Exception{
+ private MessageError.ErrorCode errorCode;
+ public MessageParseException(String message,MessageError.ErrorCode errorCode) {
+ super(message);
+ this.errorCode=errorCode;
+ }
+
+ /**
+ *
+ * @return MessageBase {@link com.sun.grizzly.filter.MessageError}
+ * since {@link com.sun.grizzly.filter.CustomProtocolParser} can not throw an Exception
+ * a {@link com.sun.grizzly.filter.MessageBase} must be given to the next Filter
+ */
+ public MessageBase convertToMessage() {
+ MessageError messageError= new MessageError(getMessage(),errorCode);
+ messageError.setErrorHappendHere(true);
+ return messageError;
+ }
+
+
+}
Index: com/sun/grizzly/filter/ProtocolOutputStream.java
--- com/sun/grizzly/filter/ProtocolOutputStream.java Base (BASE)
+++ com/sun/grizzly/filter/ProtocolOutputStream.java Locally Modified (Based On LOCAL)
@@ -37,8 +37,15 @@
*/
package com.sun.grizzly.filter;
+import com.sun.grizzly.TCPConnectorHandler;
+import com.sun.grizzly.async.AsyncWriteCallbackHandler;
+import com.sun.grizzly.async.AsyncWriteQueueRecord;
+
import java.io.*;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.zip.GZIPOutputStream;
+import java.util.Queue;
/**
* A Stream which wrapp bytes into Custom Protocol.
@@ -49,7 +56,18 @@
* @version 1.0
*/
public class ProtocolOutputStream extends OutputStream {
+ private static BytesTrafficListener bytesTrafficListener;
+ private static IOExceptionHandler exceptionHandler;
+ public static void setBytesTrafficListener(BytesTrafficListener listener) {
+ bytesTrafficListener = listener;
+ }
+
+ public static void setExceptionHandler(IOExceptionHandler handler) {
+ exceptionHandler = handler;
+ }
+
+
public interface AsyncWrite {
public void writeToAsyncQueue(java.nio.ByteBuffer byteBuffer) throws java.io.IOException;
}
@@ -72,6 +90,7 @@
private AsyncWrite out;
private int capacity = 0;
private int uniqueMessageId = 0;
+ private BytesTrafficListener trafficListener;
private void setHeaderMask() {
@@ -83,8 +102,8 @@
buf[4] = (byte) 1;
buf[5] = (messageCount == 0) ? messageType : Message.Message_Fragment;
buf[6] = closedFlag ? 0 : hasMoreFragments;
- if(gzip) {
- buf[6]=(byte)((int)buf[6]|(int)Message.GZIP_BIT);
+ if (gzip) {
+ buf[6] = (byte) ((int) buf[6] | (int) Message.GZIP_BIT);
}
@@ -92,7 +111,7 @@
if (messageCount == 0) {
uniqueMessageId = uniqueGlobalMessageId++;
// just in case
- if(uniqueGlobalMessageId==Integer.MAX_VALUE)uniqueMessageId=0;
+ if (uniqueGlobalMessageId == Integer.MAX_VALUE) uniqueMessageId = 0;
}
}
@@ -126,8 +145,23 @@
public ProtocolOutputStream(AsyncWrite writer,
byte messageType,
Integer requestId,
+ Integer sessionId) {
+ buf = new byte[Message.MessageMaxLength];
+ this.out = writer;
+ this.position = Message.HeaderLength;
+ this.requestId = requestId;
+ this.sessionId = sessionId;
+ this.messageType = messageType;
+ this.capacity = Message.MessageMaxLength;
+ }
+
+
+ public ProtocolOutputStream(AsyncWrite writer,
+ byte messageType,
+ Integer requestId,
Integer sessionId,
- boolean gzip) {
+ boolean gzip,
+ BytesTrafficListener trafficListener) {
buf = new byte[Message.MessageMaxLength];
this.out = writer;
this.position = Message.HeaderLength;
@@ -135,7 +169,8 @@
this.sessionId = sessionId;
this.messageType = messageType;
this.capacity = Message.MessageMaxLength;
- this.gzip=gzip;
+ this.gzip = gzip;
+ this.trafficListener = trafficListener;
}
@@ -146,7 +181,10 @@
out.writeToAsyncQueue(byteBuffer);
position = Message.HeaderLength;
buf = new byte[Message.MessageMaxLength];
+ if (trafficListener != null) {
+ trafficListener.traffic();
}
+ }
public synchronized void write(int b) throws IOException {
@@ -175,6 +213,9 @@
position += len;
}
+ public Integer getRequestId() {
+ return requestId;
+ }
public synchronized void flush() throws IOException {
@@ -189,5 +230,100 @@
super.close();
}
+ //-------------------------------- Helpers------------------------------------------------------
+ /**
+ * @param requestId unique number within a connection that identifies a message
+ * @param session session which keeps track of a logical connection
+ * @param messageType see @link com.sun.grizzly.filter.Message
+ * @param data the data which will be wrapped by this protocol
+ * @param compress enable gzip
+ * @param writer look at @link com.sun.grizzly.filter.Message.AsyncWrite for contract
+ */
+ public static void write(
+ int requestId,
+ int session,
+ byte messageType,
+ byte[] data,
+ boolean compress,
+ ProtocolOutputStream.AsyncWrite writer
+ ) {
+
+ OutputStream byteArrayOutputStream = new ProtocolOutputStream(writer,
+ messageType,
+ requestId,
+ session,
+ compress,
+ bytesTrafficListener);
+ OutputStream oos = null;
+ GZIPOutputStream gzipOutputStream = null;
+ try {
+
+ if (compress) {
+ gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream, Message.MessageMaxLength);
+ } else {
+ oos = new ObjectOutputStream(byteArrayOutputStream);
}
+ byteArrayOutputStream.write(data);
+ } catch (IOException e) {
+ if (exceptionHandler != null)exceptionHandler.handle(e);
+ }
+
+ finally {
+ try {
+ if (oos != null) oos.close();
+ if (gzipOutputStream != null) gzipOutputStream.close();
+ } catch (IOException e) {
+
+ e.printStackTrace();
+
+ }
+ }
+ }
+
+ public static OutputStream getOutputStream(
+ final int requestId,
+ final int session,
+ final byte messageType,
+ final boolean compress,
+ final TCPConnectorHandler tcpConnectorHandler,
+ final BlockingBuffersInputStream inputStream) {
+
+ AsyncWrite writer = new AsyncWrite() {
+ public void writeToAsyncQueue(ByteBuffer byteBuffer) throws IOException {
+ try {
+ tcpConnectorHandler.writeToAsyncQueue(byteBuffer, new AsyncWriteCallbackHandler() {
+ public void onWriteCompleted(SelectionKey selectionKey, ByteBuffer byteBuffer) {
+
+ }
+
+ public void onIOException(IOException e,
+ SelectionKey selectionKey,
+ ByteBuffer byteBuffer, Queue asyncWriteQueueRecords) {
+ if(inputStream!=null) {
+ inputStream.setException(e);
+ inputStream.close();
+ }
+ if (exceptionHandler != null) exceptionHandler.handle(e);
+ }
+ });
+ } catch (IOException e) {
+ if (exceptionHandler != null) exceptionHandler.handle(e);
+ }
+ }
+ };
+
+
+ OutputStream outputStream = new ProtocolOutputStream(writer,
+ messageType,
+ requestId,
+ session
+ ,compress,
+ bytesTrafficListener);
+ return outputStream;
+
+
+ }
+
+
+}
Index: com/sun/grizzly/filter/RequestReplyHolder.java
--- com/sun/grizzly/filter/RequestReplyHolder.java Locally New
+++ com/sun/grizzly/filter/RequestReplyHolder.java Locally New
@@ -0,0 +1,34 @@
+package com.sun.grizzly.filter;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * @author John Vieten 03.07.2008
+ * @version 1.0
+ */
+public class RequestReplyHolder {
+ private InputStream inputStream;
+ private OutputStream outputStream;
+
+ public RequestReplyHolder(InputStream inputStream, OutputStream outputStream) {
+ this.inputStream = inputStream;
+ this.outputStream = outputStream;
+ }
+
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+ public void setInputStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ public OutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ public void setOutputStream(OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+}
Index: com/sun/grizzly/standalone/framework/Example_1_Client.java
--- com/sun/grizzly/standalone/framework/Example_1_Client.java Base (BASE)
+++ com/sun/grizzly/standalone/framework/Example_1_Client.java Locally Modified (Based On LOCAL)
@@ -38,15 +38,16 @@
package com.sun.grizzly.standalone.framework;
import com.sun.grizzly.filter.CustomProtocolClient;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
+import com.sun.grizzly.filter.RequestReplyHolder;
+
+import java.io.*;
import java.net.InetSocketAddress;
/**
* An example of How-To write a "Custom Protocol" with Grizzly.
* Just shows an simple Example of how the CustomProtocol Layer can be used.
- * This Example_1_Server goes in pair with Example_1_Client
+ * This Example_1_Server goes in pair with Example_1_Client
* @author John Vieten 21.06.2008
* @version 1.0
*/
@@ -55,6 +56,7 @@
public static void main(String[] args) throws Exception {
client = new CustomProtocolClient();
+
client.init(new Example_1_Client_Dispatcher());
client.start();
client.connect(new InetSocketAddress("localhost", Example_1_Server.PORT));
@@ -72,7 +74,8 @@
System.out.println("------- useCaseEchoLargeMessages ---------------------------");
useCaseEchoLargeMessages();
- client.sendOneWayRequest(new Object[]{"bye",""});
+ write(new Object[]{"bye",""},client.getRequestOutputStream());
+
client.stop();
System.out.println("stop");
@@ -123,12 +126,27 @@
}
static private Object sendMessage(Object... data) throws Exception {
- InputStream stream = client.sendRequest(data);
- ObjectInputStream ois = new ObjectInputStream(stream);
+ RequestReplyHolder holder= client.prepareRequest();
+ write(data,holder.getOutputStream());
+ ObjectInputStream ois = new ObjectInputStream(holder.getInputStream());
Object result = ois.readObject();
ois.close();
return result;
}
+ public static void write(final Object params[], OutputStream stream) throws IOException {
+ ObjectOutputStream oos = null;
+ try {
+ oos = new ObjectOutputStream(stream);
+ oos.writeObject(params);
+ } finally {
+
+ if (oos != null) oos.close();
+
+
}
+ }
+
+
+}
Index: com/sun/grizzly/standalone/framework/Example_1_Client_Dispatcher.java
--- com/sun/grizzly/standalone/framework/Example_1_Client_Dispatcher.java Base (BASE)
+++ com/sun/grizzly/standalone/framework/Example_1_Client_Dispatcher.java Locally Modified (Based On LOCAL)
@@ -1,10 +1,11 @@
package com.sun.grizzly.standalone.framework;
-import com.sun.grizzly.filter.CustomProtocolHelper;
-import com.sun.grizzly.filter.MessageContext;
+import com.sun.grizzly.Context;
import com.sun.grizzly.filter.MessageDispatcher;
import com.sun.grizzly.filter.MessageError;
+import com.sun.grizzly.filter.CustomProtocolHelper;
import com.sun.grizzly.filter.RequestMessage;
+
import java.util.logging.Level;
import java.io.ObjectInputStream;
@@ -15,15 +16,15 @@
* @version 1.0
*/
public class Example_1_Client_Dispatcher extends MessageDispatcher {
- public void onMessageError(MessageError msg, MessageContext ctx) {
+ public void onMessageError(MessageError msg, Context ctx) {
CustomProtocolHelper.logger().log(Level.SEVERE, "onMessageError() " + msg.getMessage());
- System.exit(1);
+ //System.exit(1);
}
- public void onRequestMessage(RequestMessage msg, MessageContext ctx) {
+ public void onRequestMessage(RequestMessage msg, Context ctx) {
try {
ObjectInputStream ois =
new ObjectInputStream(msg.getInputStream());
Index: com/sun/grizzly/standalone/framework/Example_1_Server.java
--- com/sun/grizzly/standalone/framework/Example_1_Server.java Base (BASE)
+++ com/sun/grizzly/standalone/framework/Example_1_Server.java Locally Modified (Based On LOCAL)
@@ -37,21 +37,16 @@
*/
package com.sun.grizzly.standalone.framework;
-import com.sun.grizzly.BaseSelectionKeyHandler;
-import com.sun.grizzly.Controller;
-import com.sun.grizzly.DefaultPipeline;
-import com.sun.grizzly.DefaultProtocolChain;
-import com.sun.grizzly.ProtocolChain;
-import com.sun.grizzly.ProtocolChainInstanceHandler;
-import com.sun.grizzly.TCPSelectorHandler;
-import com.sun.grizzly.filter.CustomProtocolParser;
+import com.sun.grizzly.*;
import com.sun.grizzly.filter.Message;
+import com.sun.grizzly.filter.CustomProtocolParser;
+
import java.io.IOException;
/**
* An example of How-To write a "Custom Protocol" with Grizzly.
* Just shows an simple Example of how the CustomProtocol Layer can be used.
- * This Example_1_Server goes in pair with Example_1_Client
+ * The Example_1_Server goes in pair with Example_1_Client
* @author John Vieten 21.06.2008
* @version 1.0
*/
@@ -72,7 +67,7 @@
tcpSelectorHandler.setPort(PORT);
controller.addSelectorHandler(tcpSelectorHandler);
final DefaultProtocolChain protocolChain = new DefaultProtocolChain();
- protocolChain.addFilter(CustomProtocolParser.createParserProtocolFilter());
+ protocolChain.addFilter(CustomProtocolParser.createParserProtocolFilter(null));
protocolChain.addFilter(new Example_1_Server_Dispatcher());
protocolChain.setContinuousExecution(true);
Index: com/sun/grizzly/standalone/framework/Example_1_Server_Dispatcher.java
--- com/sun/grizzly/standalone/framework/Example_1_Server_Dispatcher.java Base (BASE)
+++ com/sun/grizzly/standalone/framework/Example_1_Server_Dispatcher.java Locally Modified (Based On LOCAL)
@@ -38,17 +38,14 @@
package com.sun.grizzly.standalone.framework;
-import com.sun.grizzly.filter.Message;
-import com.sun.grizzly.filter.MessageContext;
-import com.sun.grizzly.filter.MessageDispatcher;
-import com.sun.grizzly.filter.MessageError;
-import com.sun.grizzly.filter.ProtocolWriter;
-import com.sun.grizzly.filter.ReplyMessage;
-import com.sun.grizzly.filter.RequestMessage;
-import java.io.ObjectInputStream;
+import com.sun.grizzly.Context;
+import com.sun.grizzly.filter.*;
+
+import java.io.*;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.nio.ByteBuffer;
/**
* @author John Vieten 28.06.2008
@@ -57,15 +54,15 @@
public class Example_1_Server_Dispatcher extends MessageDispatcher {
private ConcurrentHashMap sessionMap = new ConcurrentHashMap();
- public void onMessageError(MessageError msg, MessageContext ctx) {
+ public void onMessageError(MessageError msg, Context ctx) {
System.out.println("Msg error " + msg.getMessage());
}
- public void onReplyMessage(ReplyMessage msg, MessageContext ctx) {
+ public void onReplyMessage(ReplyMessage msg, Context ctx) {
}
- public void onRequestMessage(RequestMessage msg, final MessageContext ctx) {
+ public void onRequestMessage(RequestMessage msg, final Context ctx) {
final int requestId = msg.getRequestId();
final int session = msg.getSessionId();
try {
@@ -91,7 +88,6 @@
} else if ("bye".equals(methodName)) {
sessionMap.remove(session);
System.out.println("User logged of");
- ctx.closeConnection(ctx.getSelectionKey());
return;
} else if ("echo".equals(methodName)) {
result = data[1];
@@ -110,7 +106,13 @@
ProtocolWriter.write(requestId,
session,
Message.Message_Request,
- feedStr, false, ctx);
+ feedStr, false,
+ new ProtocolOutputStream.AsyncWrite() {
+ public void writeToAsyncQueue(ByteBuffer byteBuffer) throws IOException {
+ ctx.getAsyncQueueWritable().writeToAsyncQueue(byteBuffer);
+ }
+ }
+ );
if (n == feedcount) {
System.out.println("stopping feed");
memoryTimer.cancel();
@@ -125,16 +127,31 @@
}
}
+ ProtocolOutputStream.AsyncWrite writer =new ProtocolOutputStream.AsyncWrite() {
+ public void writeToAsyncQueue(ByteBuffer byteBuffer) throws IOException {
+ ctx.getAsyncQueueWritable().writeToAsyncQueue (byteBuffer);
+ }
+ };
- ProtocolWriter.write(requestId, session, Message.Message_Reply, result, false, ctx);
+ OutputStream byteArrayOutputStream = new ProtocolOutputStream(writer,
+ Message.Message_Reply,
+ requestId,
+ session);
+ ObjectOutputStream objectOutputStream= new ObjectOutputStream(byteArrayOutputStream);
+ objectOutputStream.writeObject(result);
+ objectOutputStream.close();
+
} catch (Exception e) {
e.printStackTrace();
}
}
+
+
+
private boolean badSession(int s) {
if (s == 0) return false;
return sessionMap.contains(s);