# This patch file was generated by NetBeans IDE # Following Index: paths are relative to: C:\Projects\grizzly\trunk\samples\framework-samples\src\main\java # This patch can be applied using context Tools: Patch action on respective folder. # It uses platform neutral UTF-8 encoding and \n newlines. # Above lines and this line are ignored by the patching process. Index: com/sun/grizzly/filter/BytesTrafficListener.java --- com/sun/grizzly/filter/BytesTrafficListener.java Locally New +++ com/sun/grizzly/filter/BytesTrafficListener.java Locally New @@ -0,0 +1,53 @@ +/* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can obtain + * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html + * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. + * Sun designates this particular file as subject to the "Classpath" exception + * as provided by Sun in the GPL Version 2 section of the License file that + * accompanied this code. If applicable, add the following below the License + * Header, with the fields enclosed by brackets [] replaced by your own + * identifying information: "Portions Copyrighted [year] + * [name of copyright owner]" + * + * Contributor(s): + * + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + * + */ +package com.sun.grizzly.filter; + +/** + * This could be used by Progressbars to show an indication that bytes go over the wire + * @author John Vieten 30.06.2008 + * @version 1.0 + */ +public interface BytesTrafficListener { + /** + * Approximate indication that bytes have been send or received + * Please b carefull not to do any timeconsuming operation in here since + * it be called on Grizzly Workerthread + */ + public void traffic(); + +} Index: com/sun/grizzly/filter/ClosedMessage.java --- com/sun/grizzly/filter/ClosedMessage.java Locally New +++ com/sun/grizzly/filter/ClosedMessage.java Locally New @@ -0,0 +1,9 @@ +package com.sun.grizzly.filter; + +/** + * @author John Vieten 03.07.2008 + * @version 1.0 + */ +public class ClosedMessage extends MessageBase{ + +} Index: com/sun/grizzly/filter/CustomProtocolClient.java --- com/sun/grizzly/filter/CustomProtocolClient.java Base (BASE) +++ com/sun/grizzly/filter/CustomProtocolClient.java Locally Modified (Based On LOCAL) @@ -37,29 +37,21 @@ */ package com.sun.grizzly.filter; -import com.sun.grizzly.BaseSelectionKeyHandler; -import com.sun.grizzly.CallbackHandler; -import com.sun.grizzly.Context; -import com.sun.grizzly.Controller; -import com.sun.grizzly.ControllerStateListenerAdapter; -import com.sun.grizzly.DefaultPipeline; -import com.sun.grizzly.DefaultProtocolChain; -import com.sun.grizzly.DefaultProtocolChainInstanceHandler; -import com.sun.grizzly.IOEvent; -import com.sun.grizzly.ProtocolChain; -import com.sun.grizzly.ProtocolChainInstanceHandler; -import com.sun.grizzly.ProtocolFilter; -import com.sun.grizzly.TCPConnectorHandler; -import com.sun.grizzly.TCPSelectorHandler; +import com.sun.grizzly.*; + + import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.List; import java.util.ArrayList; + +import java.nio.channels.SelectionKey; + import java.net.InetSocketAddress; import java.io.IOException; -import java.io.InputStream; +import java.io.OutputStream; -import java.nio.channels.SelectionKey; + import static com.sun.grizzly.filter.CustomProtocolHelper.logger; import static com.sun.grizzly.filter.CustomProtocolHelper.log; @@ -71,15 +63,14 @@ * client.init(new YourMessageDispatcher()); * client.start(); * client.connect(new InetSocketAddress("localhost", 8087)); - * - * + *

+ *

*

* * @author John Vieten 28.06.2008 * @version 1.0 */ public class CustomProtocolClient { - private Controller controller; private TCPConnectorHandler connectorHandler; private CallbackHandler callbackHandler; @@ -87,55 +78,74 @@ private DefaultProtocolChain protocolChain = new DefaultProtocolChain(); private List beforeParserList = new ArrayList(); private List afterParserList = new ArrayList(); + private BytesTrafficListener bytesArrivedListener; private int requestId; private int session = 0; private boolean gzip; private MessageDispatcher dispatcher; private CountDownLatch started; - private int minWorkerThreads = 1; - private int maxWorkerThreads = 1; + private IOExceptionHandler ioExceptionHandler; + + + private int minWorkerThreads = 5; + private int maxWorkerThreads = 20; + /** - * If bytes send to server are gzipped + * If bytes send to server are gzipped + * * @return zipped */ public boolean isGzip() { + // if(true) return false; return gzip; } /** - * If bytes send to server should be gzipped - * @param gzip true if they should + * If bytes send to server should be gzipped + * + * @param gzip true if they should */ public void setGzip(boolean gzip) { this.gzip = gzip; } + + public void setBytesArrivedListener(BytesTrafficListener bytesArrivedListener) { + this.bytesArrivedListener = bytesArrivedListener; + } + /** * Giving the {@link com.sun.grizzly.TCPConnectorHandler} to the Client - * @param connectorHandler TCP Connector Handler + * */ - public void setConnectorHandler(TCPConnectorHandler connectorHandler) { - this.connectorHandler = connectorHandler; - } + + public TCPConnectorHandler getConnectorHandler() { + if(connectorHandler==null) { + connectorHandler=(TCPConnectorHandler) controller.acquireConnectorHandler(Controller.Protocol.TCP); + } + return connectorHandler; } /** * User can override the default Callbackhandler. * For Example an Proxy Callbackhandler could be plugged here in. + * * @param callbackHandler for non blocking client operations */ + public void setCallbackHandler(CallbackHandler callbackHandler) { this.callbackHandler = callbackHandler; } /** * Set up the Grizzly Worker Threads. - * @param minWorkerThreads min count of grizzly workers - * @param maxWorkerThreads max count of grizzly workers + * + * @param minWorkerThreads min count of grizzly workers + * @param maxWorkerThreads max count of grizzly workers */ public void setThreadSizes(int minWorkerThreads, int maxWorkerThreads) { this.minWorkerThreads = minWorkerThreads; @@ -146,8 +156,9 @@ * Users can add addtional ProtocolFilters to the Reading Chain. * These filter here will be called before or after the CustomProtocol Filters will execute * (Remark have to check if after makes sense...) + * * @param protocolFilter Filter for chain - * @param beforeParser before or after + * @param beforeParser before or after */ public void addProtocolFilter(ProtocolFilter protocolFilter, boolean beforeParser) { if (beforeParser) { @@ -158,8 +169,9 @@ } /** - * Adds a Filter before the CustomProtocol Filters will execute. - * @param protocolFilter Filter for chain + * Adds a Filter before the CustomProtocol Filters will execute. + * + * @param protocolFilter Filter for chain */ public void addProtocolFilter(ProtocolFilter protocolFilter) { addProtocolFilter(protocolFilter, true); @@ -176,31 +188,24 @@ controller = new Controller(); DefaultPipeline defp = new DefaultPipeline(); + defp.setMinThreads(minWorkerThreads); defp.setMaxThreads(maxWorkerThreads); defp.setInitialByteBufferSize(com.sun.grizzly.filter.Message.MessageMaxLength); - controller.setPipeline(defp); selectorHandler = new TCPSelectorHandler(true); BaseSelectionKeyHandler keyHandler = new BaseSelectionKeyHandler(); - if (connectorHandler == null) { - connectorHandler = new TCPConnectorHandler(); - } - connectorHandler.setController(controller); selectorHandler.setSelectionKeyHandler(keyHandler); controller.addSelectorHandler(selectorHandler); started = new CountDownLatch(1); - controller.addStateListener(new ControllerStateListenerAdapter() { - @Override + controller.addStateListener(new ControllerStateListenerAdapter() { public void onException(Throwable e) { logger().log(Level.SEVERE, "Grizzly controller exception", e); } - - @Override public void onReady() { started.countDown(); } @@ -208,10 +213,12 @@ for (ProtocolFilter protocolFilter : beforeParserList) { protocolChain.addFilter(protocolFilter); } - protocolChain.addFilter(CustomProtocolParser.createParserProtocolFilter()); - if (dispatcher != null) { + + ProtocolOutputStream.setBytesTrafficListener(bytesArrivedListener); + ProtocolOutputStream.setExceptionHandler(ioExceptionHandler); + protocolChain.addFilter(CustomProtocolParser.createParserProtocolFilter( bytesArrivedListener)); + if (dispatcher != null) protocolChain.addFilter(dispatcher); - } for (ProtocolFilter protocolFilter : afterParserList) { protocolChain.addFilter(protocolFilter); @@ -220,7 +227,6 @@ controller.setProtocolChainInstanceHandler( new ProtocolChainInstanceHandler() { - public ProtocolChain poll() { return protocolChain; } @@ -233,12 +239,10 @@ ProtocolChainInstanceHandler pciHandler = new DefaultProtocolChainInstanceHandler() { - @Override public ProtocolChain poll() { return protocolChain; } - @Override public boolean offer(ProtocolChain protocolChain) { return false; } @@ -251,41 +255,43 @@ /** * Starts the Grizzly Framework */ + public void start() { Thread t = new Thread(controller); - t.setDaemon(true); t.start(); + try { + started.await(); + log("controller.start() done"); + + } catch (Exception e) { + logger().log(Level.SEVERE, "Timeout in wait", e); } + } + /** * Stops the Grizzly Framework + * * @throws Exception Exception */ public void stop() throws Exception { controller.stop(); - if (dispatcher != null) { - dispatcher.stop(); + if (dispatcher != null) dispatcher.stop(); } - } /** * Connects to an Server. * Methods {@link #init} and {@link #start} must have been called before. - * @param address host and port + * + * @param address host and port * @throws IOException Exception */ + public void connect(InetSocketAddress address) throws IOException { - try { - started.await(); - } catch (Exception e) { - logger().log(Level.SEVERE, "Timeout in wait", e); - } if (callbackHandler == null) { callbackHandler = getDefaultCallbackHandler(); } - - - connectorHandler.connect(address, callbackHandler, selectorHandler); + getConnectorHandler().connect(address, callbackHandler, selectorHandler); } /** @@ -296,20 +302,26 @@ */ public CallbackHandler getDefaultCallbackHandler() { return new CallbackHandler() { - public void onConnect(IOEvent ioEvent) { SelectionKey k = ioEvent.attachment().getSelectionKey(); try { - connectorHandler.finishConnect(k); + getConnectorHandler().finishConnect(k); log("finishConnect"); - } catch (Exception ex) { - logger().log(Level.SEVERE, "CallbackHandler", ex); + } catch (java.io.IOException ex) { + ioExceptionHandler.handle(ex); + return; } + catch (Throwable ex) { + logger().log(Level.SEVERE,"onConnect",ex); + return; + } + ioEvent.attachment().getSelectorHandler().register(k, SelectionKey.OP_READ); } + public void onRead(IOEvent ioEvent) { try { Context ctx = ioEvent.attachment(); @@ -335,58 +347,60 @@ /** * Sends data to server. The caller receives an InputStream with - * which the server streams an reply back to the client. + * which the server streams an reply back to the client. * - * - * - * @param data the data to be send to the server - * @return an InputStream that blocks until data arrives - * @throws Exception ex + * @return an RequestReplyHolder that blocks until data arrives + * @throws Exception ex */ - public InputStream sendRequest(final Object data) throws Exception { + public RequestReplyHolder prepareRequest() throws Exception { final int id = getNextRequestId(); - InputStream future = ReplyMessage.getInputStream(id); - writeRequest(data, id); - return future; + final BlockingBuffersInputStream future = ReplyMessage.getInputStream(id); + final OutputStream outputStream=getRequestOutputStream(id,future); + RequestReplyHolder holder=new RequestReplyHolder(future,outputStream); + return holder; } - /** - * Sends data to server without wanting an reply. - * @param data the data to be send to the server - * @throws Exception ex - */ - public void sendOneWayRequest(final Object data) throws Exception { - final int id = getNextRequestId(); - writeRequest(data, id); - } + /** * Every message send to an server can have a session id attached so that * server can keep track of its clients. - * @param session an id + * + * @param session an id */ + public void setSession(int session) { this.session = session; } - private int writeRequest(Object data, final int requestId) throws Exception { - ProtocolWriter.write(requestId, + public OutputStream getRequestOutputStream() throws IOException { + return getRequestOutputStream(getNextRequestId(),null); + } + + private OutputStream getRequestOutputStream(final int requestId,final BlockingBuffersInputStream inputStream) throws IOException { + return ProtocolOutputStream.getOutputStream(requestId, session, Message.Message_Request, - data, isGzip(), - connectorHandler); + getConnectorHandler(), + inputStream + ); - return requestId; } + /** * Every Request Message gets an unique request id so that replies can be assigned back to * their requests. + * * @return unique id */ private int getNextRequestId() { return requestId++; } + + public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { + this.ioExceptionHandler = ioExceptionHandler; } +} Index: com/sun/grizzly/filter/FragmentMessage.java --- com/sun/grizzly/filter/FragmentMessage.java Base (BASE) +++ com/sun/grizzly/filter/FragmentMessage.java Locally Modified (Based On LOCAL) @@ -54,7 +54,6 @@ */ public class FragmentMessage extends MessageBase{ private List byteBufferList= new ArrayList(); - @Override public void addByteBuffer(ByteBuffer byteBuffer) { super.addByteBuffer(byteBuffer); byteBufferList.add(byteBuffer); Index: com/sun/grizzly/filter/IOExceptionHandler.java --- com/sun/grizzly/filter/IOExceptionHandler.java Locally New +++ com/sun/grizzly/filter/IOExceptionHandler.java Locally New @@ -0,0 +1,11 @@ +package com.sun.grizzly.filter; + +import java.io.IOException; + +/** + * @author John Vieten 03.07.2008 + * @version 1.0 + */ +public interface IOExceptionHandler { + void handle(IOException ex); +} Index: com/sun/grizzly/filter/Message.java --- com/sun/grizzly/filter/Message.java Base (BASE) +++ com/sun/grizzly/filter/Message.java Locally Modified (Based On LOCAL) @@ -46,8 +46,12 @@ * @version 1.0 */ public interface Message { - static final int Magic = 0x98784F50; static final int HeaderLength = 23; + + + static final int Magic = 0x98784F50; + + static final int MagicByteLength = 4; static final int MessageMaxLength = 8192; static final byte CurrentVersion = 1; static final byte Message_Request = 0; @@ -64,4 +68,11 @@ static final byte MORE_FRAGMENTS_BIT = 0x02; static final byte GZIP_BIT = 0x04; + static public enum ErrorCode { + ERROR_CODE_MAGIC, + ERROR_CODE_HEADER_FORMAT, + ERROR_CODE_UNKOWN_MESSAGE_TYPE, + ERROR_CODE_NO_CONNECTION } + +} Index: com/sun/grizzly/filter/MessageBase.java --- com/sun/grizzly/filter/MessageBase.java Base (BASE) +++ com/sun/grizzly/filter/MessageBase.java Locally Modified (Based On LOCAL) @@ -38,6 +38,8 @@ package com.sun.grizzly.filter; import java.nio.ByteBuffer; +import java.util.logging.Logger; + import static com.sun.grizzly.filter.CustomProtocolHelper.log; /** * Note idea taken from {@link com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase} @@ -60,7 +62,7 @@ /** - * Parsers the Header of the Custom Protocol. + * Parses the Header of the Custom Protocol. * The Protocol has the following format: * * byte 1 Magic :Used to identify Protocol @@ -82,7 +84,7 @@ * byte 17 Request ID * byte 18 Request ID * byte 19 Request ID - * byte 20 Session ID : Session Id of Message (Used to authenticate Client) + * byte 20 Session ID : Session Id of Message (Used to authenticate Client). * byte 21 Session ID * byte 22 Session ID * byte 23 Session ID @@ -91,9 +93,11 @@ * @param buf Bytebuffer with read in bytes * @param startPosition the message beginning * @return a Message based on the Header bytes + * @throws MessageParseException Indicating a parse Exception */ - public static MessageBase parseHeader(ByteBuffer buf, int startPosition) { - MessageBase result = null; + + public static MessageBase parseHeader(ByteBuffer buf, int startPosition) throws MessageParseException{ + MessageBase result; try { byte[] it = new byte[HeaderLength]; @@ -101,21 +105,10 @@ buf.position(startPosition); buf.get(it); buf.position(restorePosition); - int b1, b2, b3, b4; - b1 = (it[0] << 24) & 0xFF000000; - b2 = (it[1] << 16) & 0x00FF0000; - b3 = (it[2] << 8) & 0x0000FF00; - b4 = (it[3] << 0) & 0x000000FF; - int magic = (b1 | b2 | b3 | b4); + checkMagic(buf,startPosition); - if (magic != Magic) { - result = new MessageError(getErrorMsg(buf, startPosition, "Unknown Magic"), - MessageError.ErrorCode.ERROR_CODE_MAGIC); - return result; - - } if (it[4] > CurrentVersion) { } @@ -133,33 +126,64 @@ case Message_Error: result = new MessageError(); break; + default: throw new MessageParseException( + getErrorMsg(buf, startPosition, "Unknown Message Type"), + MessageError.ErrorCode.ERROR_CODE_UNKOWN_MESSAGE_TYPE); } - result.messageType = it[5]; result.flags = it[6]; result.uniqueMessageId = readInt(it[7], it[8], it[9], it[10]); result.messageSize = readInt(it[11], it[12], it[13], it[14]); - result.unmarshalRequestID(buf); - result.unmarshalSessionID(buf); + result.unmarshalRequestID(buf,startPosition); + result.unmarshalSessionID(buf,startPosition); } catch (Exception e) { - e.printStackTrace(); - - result = new MessageError(getErrorMsg(buf, startPosition, " Bad Header Format"), + //e.printStackTrace(); + System.out.println(CustomProtocolHelper.printBuffer("bas",buf)); + throw new MessageParseException( + getErrorMsg(buf, startPosition, "Bad Header Format"), MessageError.ErrorCode.ERROR_CODE_HEADER_FORMAT); + } return result; } + /** + * Checks if the current byteBuffer has a message starting with the + * correct <@link Message#Magic> + * @param buf current buffer containing message bytes + * @param startPosition start of current message + * @throws MessageParseException + */ + public static void checkMagic(ByteBuffer buf, int startPosition) throws MessageParseException{ + int b1, b2, b3, b4; + b1 = (buf.get(startPosition) << 24) & 0xFF000000; + b2 = (buf.get(startPosition+1) << 16) & 0x00FF0000; + b3 = (buf.get(startPosition+2)<< 8) & 0x0000FF00; + b4 = (buf.get(startPosition+3) << 0) & 0x000000FF; + + int magic = (b1 | b2 | b3 | b4); + + if (magic != Magic) { + System.out.println(CustomProtocolHelper.printBuffer("", buf)); + throw new MessageParseException( + getErrorMsg(buf, startPosition, "Unknown Magic"), + MessageError.ErrorCode.ERROR_CODE_MAGIC); + } + + } + + + private static String getErrorMsg(ByteBuffer buf, int startPosition, String hint) { StringBuffer sb = new StringBuffer(); sb.append("Server Protocol Error. Could not parse Header").append(hint).append("\n"); sb.append("Position:").append(buf.position()).append("\n"); sb.append("Limit:").append(buf.limit()).append("\n"); sb.append("Parse Position:").append(startPosition).append("\n"); - sb.append(CustomProtocolHelper.printBuffer("", buf)); + // sb.append(CustomProtocolHelper.printBuffer("", buf)); return sb.toString(); } @@ -202,10 +226,16 @@ return messageSize; } + public int getNeededBytesSize() { - int missingMessageSize = messageSize - bytesRemaining; - return missingMessageSize; + return messageSize - bytesRemaining; } + /**Signals that more Messages will follow all belonging to + * the first Message with all the same uniqueId. + * The last Message will return on {@link #moreFragmentsToFollow} false. + * + * @return if more fragments are expected + */ public boolean moreFragmentsToFollow() { return ((this.flags & MORE_FRAGMENTS_BIT) == MORE_FRAGMENTS_BIT); @@ -222,14 +252,12 @@ this.requestId = requestId; } - private void unmarshalRequestID(ByteBuffer byteBuffer) { + private void unmarshalRequestID(final ByteBuffer byteBuffer,final int s) { int b1, b2, b3, b4; - b1 = (byteBuffer.get(15) << 24) & 0xFF000000; - b2 = (byteBuffer.get(16) << 16) & 0x00FF0000; - b3 = (byteBuffer.get(17) << 8) & 0x0000FF00; - b4 = (byteBuffer.get(18) << 0) & 0x000000FF; - - + b1 = (byteBuffer.get(s+15) << 24) & 0xFF000000; + b2 = (byteBuffer.get(s+16) << 16) & 0x00FF0000; + b3 = (byteBuffer.get(s+17) << 8) & 0x0000FF00; + b4 = (byteBuffer.get(s+18) << 0) & 0x000000FF; setRequestId (b1 | b2 | b3 | b4); } @@ -237,17 +265,20 @@ return sessionId; } - private void unmarshalSessionID(ByteBuffer byteBuffer) { + private void unmarshalSessionID(final ByteBuffer byteBuffer,final int s) { int b1, b2, b3, b4; - b1 = (byteBuffer.get(19) << 24) & 0xFF000000; - b2 = (byteBuffer.get(20) << 16) & 0x00FF0000; - b3 = (byteBuffer.get(21) << 8) & 0x0000FF00; - b4 = (byteBuffer.get(22) << 0) & 0x000000FF; + b1 = (byteBuffer.get(s+19) << 24) & 0xFF000000; + b2 = (byteBuffer.get(s+20) << 16) & 0x00FF0000; + b3 = (byteBuffer.get(s+21) << 8) & 0x0000FF00; + b4 = (byteBuffer.get(s+22) << 0) & 0x000000FF; this.sessionId = (b1 | b2 | b3 | b4); } + + + private static int readInt(byte b1, byte b2, byte b3, byte b4) { int a1, a2, a3, a4; a1 = (b1 << 24) & 0xFF000000; Index: com/sun/grizzly/filter/MessageContext.java --- com/sun/grizzly/filter/MessageContext.java Base (BASE) +++ com/sun/grizzly/filter/MessageContext.java Locally Deleted @@ -1,69 +0,0 @@ -/* - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. - * - * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. - * - * The contents of this file are subject to the terms of either the GNU - * General Public License Version 2 only ("GPL") or the Common Development - * and Distribution License("CDDL") (collectively, the "License"). You - * may not use this file except in compliance with the License. You can obtain - * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html - * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific - * language governing permissions and limitations under the License. - * - * When distributing the software, include this License Header Notice in each - * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. - * Sun designates this particular file as subject to the "Classpath" exception - * as provided by Sun in the GPL Version 2 section of the License file that - * accompanied this code. If applicable, add the following below the License - * Header, with the fields enclosed by brackets [] replaced by your own - * identifying information: "Portions Copyrighted [year] - * [name of copyright owner]" - * - * Contributor(s): - * - * If you wish your version of this file to be governed by only the CDDL or - * only the GPL Version 2, indicate your decision by adding "[Contributor] - * elects to include this software in this distribution under the [CDDL or GPL - * Version 2] license." If you don't indicate a single choice of license, a - * recipient has the option to distribute your version of this file under - * either the CDDL, the GPL Version 2 or to extend the choice of license to - * its licensees as provided above. However, if you add GPL Version 2 code - * and therefore, elected the GPL Version 2 license, then the option applies - * only if the new code is made subject to such option by the copyright - * holder. - * - */ -package com.sun.grizzly.filter; - -import com.sun.grizzly.async.AsyncQueueWriter; -import com.sun.grizzly.util.AttributeHolder; - -import java.nio.channels.SelectionKey; -import java.nio.ByteBuffer; -import java.util.Set; -import java.io.IOException; - -/** - * Gives Client a way to write, query and store an Connection. - * - * @author John Vieten 28.06.2008 - * @version 1.0 - */ -public interface MessageContext { - public AsyncQueueWriter getAsyncQueueWriter(); - - public void writeToAsyncQueue(ByteBuffer b) throws IOException; - - public Set keys(); - - public SelectionKey getSelectionKey(); - - public void closeConnection(SelectionKey key); - - public AttributeHolder getAttributeHolder(); - - - -} Index: com/sun/grizzly/filter/MessageContextImpl.java --- com/sun/grizzly/filter/MessageContextImpl.java Base (BASE) +++ com/sun/grizzly/filter/MessageContextImpl.java Locally Deleted @@ -1,104 +0,0 @@ -/* - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. - * - * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. - * - * The contents of this file are subject to the terms of either the GNU - * General Public License Version 2 only ("GPL") or the Common Development - * and Distribution License("CDDL") (collectively, the "License"). You - * may not use this file except in compliance with the License. You can obtain - * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html - * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific - * language governing permissions and limitations under the License. - * - * When distributing the software, include this License Header Notice in each - * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. - * Sun designates this particular file as subject to the "Classpath" exception - * as provided by Sun in the GPL Version 2 section of the License file that - * accompanied this code. If applicable, add the following below the License - * Header, with the fields enclosed by brackets [] replaced by your own - * identifying information: "Portions Copyrighted [year] - * [name of copyright owner]" - * - * Contributor(s): - * - * If you wish your version of this file to be governed by only the CDDL or - * only the GPL Version 2, indicate your decision by adding "[Contributor] - * elects to include this software in this distribution under the [CDDL or GPL - * Version 2] license." If you don't indicate a single choice of license, a - * recipient has the option to distribute your version of this file under - * either the CDDL, the GPL Version 2 or to extend the choice of license to - * its licensees as provided above. However, if you add GPL Version 2 code - * and therefore, elected the GPL Version 2 license, then the option applies - * only if the new code is made subject to such option by the copyright - * holder. - * - */ -package com.sun.grizzly.filter; - -import com.sun.grizzly.async.AsyncQueueWriter; -import com.sun.grizzly.util.AttributeHolder; -import com.sun.grizzly.util.ThreadAttachment; -import com.sun.grizzly.Context; -import com.sun.grizzly.SelectorHandler; - -import java.nio.channels.SelectionKey; -import java.nio.ByteBuffer; -import java.util.Set; -import java.io.IOException; - -/** - * Gives Client a way to write, query and store an Connection. - * - * @author John Vieten 28.06.2008 - * @version 1.0 - */ -public class MessageContextImpl implements MessageContext { - private AsyncQueueWriter asyncQueueWriter; - - private SelectionKey currentKey; - private SelectorHandler selectorHandler; - - - public MessageContextImpl(final Context context) { - this.asyncQueueWriter = context.getSelectorHandler().getAsyncQueueWriter(); - currentKey = context.getSelectionKey(); - selectorHandler = context.getSelectorHandler(); - - } - - public AsyncQueueWriter getAsyncQueueWriter() { - return asyncQueueWriter; - } - - public void writeToAsyncQueue(ByteBuffer b) throws IOException { - asyncQueueWriter.write(currentKey,b); - } - - public AttributeHolder getAttributeHolder() { - - Object attachment = getSelectionKey().attachment(); - if (attachment instanceof AttributeHolder) { - return (AttributeHolder) attachment; - } else { - ThreadAttachment threadAttachment = new ThreadAttachment(); - currentKey.attach(threadAttachment); - return threadAttachment; - } - - - } - - public SelectionKey getSelectionKey() { - return currentKey; - } - - public Set keys() { - return selectorHandler.keys(); - } - - public void closeConnection(SelectionKey key) { - selectorHandler.getSelectionKeyHandler().cancel(key); - } -} Index: com/sun/grizzly/filter/MessageDispatcher.java --- com/sun/grizzly/filter/MessageDispatcher.java Base (BASE) +++ com/sun/grizzly/filter/MessageDispatcher.java Locally Modified (Based On LOCAL) @@ -89,22 +89,22 @@ if(executorService!=null) executorService.shutdown(); } - public boolean execute(Context ctx) throws IOException { + if(executorService==null) { executorService=Executors.newFixedThreadPool(threadPoolSize); } final MessageBase incomingMessage = (MessageBase) ctx.removeAttribute(ProtocolParser.MESSAGE); switch (incomingMessage.getMessageType()) { + case Message.Message_Error: + dispatch(incomingMessage, ctx); + break; case Message.Message_Reply: case Message.Message_Request: - - case Message.Message_Error: if (incomingMessage.moreFragmentsToFollow()) { attachToConnection(incomingMessage, ctx); } else { - incomingMessage.allDataParsed(); } dispatch(incomingMessage, ctx); @@ -148,7 +148,6 @@ WorkerThread workerThread = (WorkerThread) Thread.currentThread(); AttributeHolder connectionAttrs = workerThread.getAttachment(); return (Map) connectionAttrs.getAttribute(needMoreDataMessageMapKey); - } private BlockingBuffersMessage getFromMessageMap(int uniqueId) { @@ -156,43 +155,45 @@ } - private void dispatch(final MessageBase msg, final Context workerCtx) { - final MessageContext msgContext=new MessageContextImpl(workerCtx); + private void dispatch(final Message msg, final Context workerCtx) { + workerCtx.suspend(); + // workerCtx.blockRecycle(true); + workerCtx.setKeyRegistrationState(Context.KeyRegistrationState.REGISTER); + + executorService.execute(new Runnable() { public void run() { switch (msg.getMessageType()) { case Message.Message_Request: - onRequestMessage((RequestMessage) msg, msgContext); + onRequestMessage((RequestMessage) msg, workerCtx); break; case Message.Message_Reply: - - - //hadled by break; case Message.Message_Error: - onMessageError((MessageError) msg, msgContext); + onMessageError((MessageError) msg, workerCtx); break; case Message.Message_Fragment: CustomProtocolHelper.logger().log(Level.SEVERE, "Cannot dispatch Fragment"); - } + workerCtx.resume(); } }); } + + /** * Pluginpoint to handle an Message received from some Endpoint * * @param msg RequestMessage the message received from an Endpoint * @param ctx MessageContext Simple API for writing,quering the connection. */ - abstract public void onRequestMessage(RequestMessage msg, MessageContext ctx); + abstract public void onRequestMessage(RequestMessage msg, Context ctx); + abstract public void onMessageError(MessageError msg, Context ctx); - abstract public void onMessageError(MessageError msg, MessageContext ctx); - } Index: com/sun/grizzly/filter/MessageParseException.java --- com/sun/grizzly/filter/MessageParseException.java Locally New +++ com/sun/grizzly/filter/MessageParseException.java Locally New @@ -0,0 +1,30 @@ +package com.sun.grizzly.filter; + +/** + * If something went wrong during parsing like a Bad Magic or bad Message Format + * usually a <@link MessageParseException> is thrown. + * + * @author John Vieten 01.07.2008 + * @version 1.0 + */ +public class MessageParseException extends Exception{ + private MessageError.ErrorCode errorCode; + public MessageParseException(String message,MessageError.ErrorCode errorCode) { + super(message); + this.errorCode=errorCode; + } + + /** + * + * @return MessageBase {@link com.sun.grizzly.filter.MessageError} + * since {@link com.sun.grizzly.filter.CustomProtocolParser} can not throw an Exception + * a {@link com.sun.grizzly.filter.MessageBase} must be given to the next Filter + */ + public MessageBase convertToMessage() { + MessageError messageError= new MessageError(getMessage(),errorCode); + messageError.setErrorHappendHere(true); + return messageError; + } + + +} Index: com/sun/grizzly/filter/ProtocolOutputStream.java --- com/sun/grizzly/filter/ProtocolOutputStream.java Base (BASE) +++ com/sun/grizzly/filter/ProtocolOutputStream.java Locally Modified (Based On LOCAL) @@ -37,8 +37,15 @@ */ package com.sun.grizzly.filter; +import com.sun.grizzly.TCPConnectorHandler; +import com.sun.grizzly.async.AsyncWriteCallbackHandler; +import com.sun.grizzly.async.AsyncWriteQueueRecord; + import java.io.*; import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.util.zip.GZIPOutputStream; +import java.util.Queue; /** * A Stream which wrapp bytes into Custom Protocol. @@ -49,7 +56,18 @@ * @version 1.0 */ public class ProtocolOutputStream extends OutputStream { + private static BytesTrafficListener bytesTrafficListener; + private static IOExceptionHandler exceptionHandler; + public static void setBytesTrafficListener(BytesTrafficListener listener) { + bytesTrafficListener = listener; + } + + public static void setExceptionHandler(IOExceptionHandler handler) { + exceptionHandler = handler; + } + + public interface AsyncWrite { public void writeToAsyncQueue(java.nio.ByteBuffer byteBuffer) throws java.io.IOException; } @@ -72,6 +90,7 @@ private AsyncWrite out; private int capacity = 0; private int uniqueMessageId = 0; + private BytesTrafficListener trafficListener; private void setHeaderMask() { @@ -83,8 +102,8 @@ buf[4] = (byte) 1; buf[5] = (messageCount == 0) ? messageType : Message.Message_Fragment; buf[6] = closedFlag ? 0 : hasMoreFragments; - if(gzip) { - buf[6]=(byte)((int)buf[6]|(int)Message.GZIP_BIT); + if (gzip) { + buf[6] = (byte) ((int) buf[6] | (int) Message.GZIP_BIT); } @@ -92,7 +111,7 @@ if (messageCount == 0) { uniqueMessageId = uniqueGlobalMessageId++; // just in case - if(uniqueGlobalMessageId==Integer.MAX_VALUE)uniqueMessageId=0; + if (uniqueGlobalMessageId == Integer.MAX_VALUE) uniqueMessageId = 0; } } @@ -126,8 +145,23 @@ public ProtocolOutputStream(AsyncWrite writer, byte messageType, Integer requestId, + Integer sessionId) { + buf = new byte[Message.MessageMaxLength]; + this.out = writer; + this.position = Message.HeaderLength; + this.requestId = requestId; + this.sessionId = sessionId; + this.messageType = messageType; + this.capacity = Message.MessageMaxLength; + } + + + public ProtocolOutputStream(AsyncWrite writer, + byte messageType, + Integer requestId, Integer sessionId, - boolean gzip) { + boolean gzip, + BytesTrafficListener trafficListener) { buf = new byte[Message.MessageMaxLength]; this.out = writer; this.position = Message.HeaderLength; @@ -135,7 +169,8 @@ this.sessionId = sessionId; this.messageType = messageType; this.capacity = Message.MessageMaxLength; - this.gzip=gzip; + this.gzip = gzip; + this.trafficListener = trafficListener; } @@ -146,7 +181,10 @@ out.writeToAsyncQueue(byteBuffer); position = Message.HeaderLength; buf = new byte[Message.MessageMaxLength]; + if (trafficListener != null) { + trafficListener.traffic(); } + } public synchronized void write(int b) throws IOException { @@ -175,6 +213,9 @@ position += len; } + public Integer getRequestId() { + return requestId; + } public synchronized void flush() throws IOException { @@ -189,5 +230,100 @@ super.close(); } + //-------------------------------- Helpers------------------------------------------------------ + /** + * @param requestId unique number within a connection that identifies a message + * @param session session which keeps track of a logical connection + * @param messageType see @link com.sun.grizzly.filter.Message + * @param data the data which will be wrapped by this protocol + * @param compress enable gzip + * @param writer look at @link com.sun.grizzly.filter.Message.AsyncWrite for contract + */ + public static void write( + int requestId, + int session, + byte messageType, + byte[] data, + boolean compress, + ProtocolOutputStream.AsyncWrite writer + ) { + + OutputStream byteArrayOutputStream = new ProtocolOutputStream(writer, + messageType, + requestId, + session, + compress, + bytesTrafficListener); + OutputStream oos = null; + GZIPOutputStream gzipOutputStream = null; + try { + + if (compress) { + gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream, Message.MessageMaxLength); + } else { + oos = new ObjectOutputStream(byteArrayOutputStream); } + byteArrayOutputStream.write(data); + } catch (IOException e) { + if (exceptionHandler != null)exceptionHandler.handle(e); + } + + finally { + try { + if (oos != null) oos.close(); + if (gzipOutputStream != null) gzipOutputStream.close(); + } catch (IOException e) { + + e.printStackTrace(); + + } + } + } + + public static OutputStream getOutputStream( + final int requestId, + final int session, + final byte messageType, + final boolean compress, + final TCPConnectorHandler tcpConnectorHandler, + final BlockingBuffersInputStream inputStream) { + + AsyncWrite writer = new AsyncWrite() { + public void writeToAsyncQueue(ByteBuffer byteBuffer) throws IOException { + try { + tcpConnectorHandler.writeToAsyncQueue(byteBuffer, new AsyncWriteCallbackHandler() { + public void onWriteCompleted(SelectionKey selectionKey, ByteBuffer byteBuffer) { + + } + + public void onIOException(IOException e, + SelectionKey selectionKey, + ByteBuffer byteBuffer, Queue asyncWriteQueueRecords) { + if(inputStream!=null) { + inputStream.setException(e); + inputStream.close(); + } + if (exceptionHandler != null) exceptionHandler.handle(e); + } + }); + } catch (IOException e) { + if (exceptionHandler != null) exceptionHandler.handle(e); + } + } + }; + + + OutputStream outputStream = new ProtocolOutputStream(writer, + messageType, + requestId, + session + ,compress, + bytesTrafficListener); + return outputStream; + + + } + + +} Index: com/sun/grizzly/filter/RequestReplyHolder.java --- com/sun/grizzly/filter/RequestReplyHolder.java Locally New +++ com/sun/grizzly/filter/RequestReplyHolder.java Locally New @@ -0,0 +1,34 @@ +package com.sun.grizzly.filter; + +import java.io.InputStream; +import java.io.OutputStream; + +/** + * @author John Vieten 03.07.2008 + * @version 1.0 + */ +public class RequestReplyHolder { + private InputStream inputStream; + private OutputStream outputStream; + + public RequestReplyHolder(InputStream inputStream, OutputStream outputStream) { + this.inputStream = inputStream; + this.outputStream = outputStream; + } + + public InputStream getInputStream() { + return inputStream; + } + + public void setInputStream(InputStream inputStream) { + this.inputStream = inputStream; + } + + public OutputStream getOutputStream() { + return outputStream; + } + + public void setOutputStream(OutputStream outputStream) { + this.outputStream = outputStream; + } +} Index: com/sun/grizzly/standalone/framework/Example_1_Client.java --- com/sun/grizzly/standalone/framework/Example_1_Client.java Base (BASE) +++ com/sun/grizzly/standalone/framework/Example_1_Client.java Locally Modified (Based On LOCAL) @@ -38,15 +38,16 @@ package com.sun.grizzly.standalone.framework; import com.sun.grizzly.filter.CustomProtocolClient; -import java.io.InputStream; -import java.io.ObjectInputStream; +import com.sun.grizzly.filter.RequestReplyHolder; + +import java.io.*; import java.net.InetSocketAddress; /** * An example of How-To write a "Custom Protocol" with Grizzly. * Just shows an simple Example of how the CustomProtocol Layer can be used. - * This Example_1_Server goes in pair with Example_1_Client + * This Example_1_Server goes in pair with Example_1_Client * @author John Vieten 21.06.2008 * @version 1.0 */ @@ -55,6 +56,7 @@ public static void main(String[] args) throws Exception { client = new CustomProtocolClient(); + client.init(new Example_1_Client_Dispatcher()); client.start(); client.connect(new InetSocketAddress("localhost", Example_1_Server.PORT)); @@ -72,7 +74,8 @@ System.out.println("------- useCaseEchoLargeMessages ---------------------------"); useCaseEchoLargeMessages(); - client.sendOneWayRequest(new Object[]{"bye",""}); + write(new Object[]{"bye",""},client.getRequestOutputStream()); + client.stop(); System.out.println("stop"); @@ -123,12 +126,27 @@ } static private Object sendMessage(Object... data) throws Exception { - InputStream stream = client.sendRequest(data); - ObjectInputStream ois = new ObjectInputStream(stream); + RequestReplyHolder holder= client.prepareRequest(); + write(data,holder.getOutputStream()); + ObjectInputStream ois = new ObjectInputStream(holder.getInputStream()); Object result = ois.readObject(); ois.close(); return result; } + public static void write(final Object params[], OutputStream stream) throws IOException { + ObjectOutputStream oos = null; + try { + oos = new ObjectOutputStream(stream); + oos.writeObject(params); + } finally { + + if (oos != null) oos.close(); + + } + } + + +} Index: com/sun/grizzly/standalone/framework/Example_1_Client_Dispatcher.java --- com/sun/grizzly/standalone/framework/Example_1_Client_Dispatcher.java Base (BASE) +++ com/sun/grizzly/standalone/framework/Example_1_Client_Dispatcher.java Locally Modified (Based On LOCAL) @@ -1,10 +1,11 @@ package com.sun.grizzly.standalone.framework; -import com.sun.grizzly.filter.CustomProtocolHelper; -import com.sun.grizzly.filter.MessageContext; +import com.sun.grizzly.Context; import com.sun.grizzly.filter.MessageDispatcher; import com.sun.grizzly.filter.MessageError; +import com.sun.grizzly.filter.CustomProtocolHelper; import com.sun.grizzly.filter.RequestMessage; + import java.util.logging.Level; import java.io.ObjectInputStream; @@ -15,15 +16,15 @@ * @version 1.0 */ public class Example_1_Client_Dispatcher extends MessageDispatcher { - public void onMessageError(MessageError msg, MessageContext ctx) { + public void onMessageError(MessageError msg, Context ctx) { CustomProtocolHelper.logger().log(Level.SEVERE, "onMessageError() " + msg.getMessage()); - System.exit(1); + //System.exit(1); } - public void onRequestMessage(RequestMessage msg, MessageContext ctx) { + public void onRequestMessage(RequestMessage msg, Context ctx) { try { ObjectInputStream ois = new ObjectInputStream(msg.getInputStream()); Index: com/sun/grizzly/standalone/framework/Example_1_Server.java --- com/sun/grizzly/standalone/framework/Example_1_Server.java Base (BASE) +++ com/sun/grizzly/standalone/framework/Example_1_Server.java Locally Modified (Based On LOCAL) @@ -37,21 +37,16 @@ */ package com.sun.grizzly.standalone.framework; -import com.sun.grizzly.BaseSelectionKeyHandler; -import com.sun.grizzly.Controller; -import com.sun.grizzly.DefaultPipeline; -import com.sun.grizzly.DefaultProtocolChain; -import com.sun.grizzly.ProtocolChain; -import com.sun.grizzly.ProtocolChainInstanceHandler; -import com.sun.grizzly.TCPSelectorHandler; -import com.sun.grizzly.filter.CustomProtocolParser; +import com.sun.grizzly.*; import com.sun.grizzly.filter.Message; +import com.sun.grizzly.filter.CustomProtocolParser; + import java.io.IOException; /** * An example of How-To write a "Custom Protocol" with Grizzly. * Just shows an simple Example of how the CustomProtocol Layer can be used. - * This Example_1_Server goes in pair with Example_1_Client + * The Example_1_Server goes in pair with Example_1_Client * @author John Vieten 21.06.2008 * @version 1.0 */ @@ -72,7 +67,7 @@ tcpSelectorHandler.setPort(PORT); controller.addSelectorHandler(tcpSelectorHandler); final DefaultProtocolChain protocolChain = new DefaultProtocolChain(); - protocolChain.addFilter(CustomProtocolParser.createParserProtocolFilter()); + protocolChain.addFilter(CustomProtocolParser.createParserProtocolFilter(null)); protocolChain.addFilter(new Example_1_Server_Dispatcher()); protocolChain.setContinuousExecution(true); Index: com/sun/grizzly/standalone/framework/Example_1_Server_Dispatcher.java --- com/sun/grizzly/standalone/framework/Example_1_Server_Dispatcher.java Base (BASE) +++ com/sun/grizzly/standalone/framework/Example_1_Server_Dispatcher.java Locally Modified (Based On LOCAL) @@ -38,17 +38,14 @@ package com.sun.grizzly.standalone.framework; -import com.sun.grizzly.filter.Message; -import com.sun.grizzly.filter.MessageContext; -import com.sun.grizzly.filter.MessageDispatcher; -import com.sun.grizzly.filter.MessageError; -import com.sun.grizzly.filter.ProtocolWriter; -import com.sun.grizzly.filter.ReplyMessage; -import com.sun.grizzly.filter.RequestMessage; -import java.io.ObjectInputStream; +import com.sun.grizzly.Context; +import com.sun.grizzly.filter.*; + +import java.io.*; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.nio.ByteBuffer; /** * @author John Vieten 28.06.2008 @@ -57,15 +54,15 @@ public class Example_1_Server_Dispatcher extends MessageDispatcher { private ConcurrentHashMap sessionMap = new ConcurrentHashMap(); - public void onMessageError(MessageError msg, MessageContext ctx) { + public void onMessageError(MessageError msg, Context ctx) { System.out.println("Msg error " + msg.getMessage()); } - public void onReplyMessage(ReplyMessage msg, MessageContext ctx) { + public void onReplyMessage(ReplyMessage msg, Context ctx) { } - public void onRequestMessage(RequestMessage msg, final MessageContext ctx) { + public void onRequestMessage(RequestMessage msg, final Context ctx) { final int requestId = msg.getRequestId(); final int session = msg.getSessionId(); try { @@ -91,7 +88,6 @@ } else if ("bye".equals(methodName)) { sessionMap.remove(session); System.out.println("User logged of"); - ctx.closeConnection(ctx.getSelectionKey()); return; } else if ("echo".equals(methodName)) { result = data[1]; @@ -110,7 +106,13 @@ ProtocolWriter.write(requestId, session, Message.Message_Request, - feedStr, false, ctx); + feedStr, false, + new ProtocolOutputStream.AsyncWrite() { + public void writeToAsyncQueue(ByteBuffer byteBuffer) throws IOException { + ctx.getAsyncQueueWritable().writeToAsyncQueue(byteBuffer); + } + } + ); if (n == feedcount) { System.out.println("stopping feed"); memoryTimer.cancel(); @@ -125,16 +127,31 @@ } } + ProtocolOutputStream.AsyncWrite writer =new ProtocolOutputStream.AsyncWrite() { + public void writeToAsyncQueue(ByteBuffer byteBuffer) throws IOException { + ctx.getAsyncQueueWritable().writeToAsyncQueue (byteBuffer); + } + }; - ProtocolWriter.write(requestId, session, Message.Message_Reply, result, false, ctx); + OutputStream byteArrayOutputStream = new ProtocolOutputStream(writer, + Message.Message_Reply, + requestId, + session); + ObjectOutputStream objectOutputStream= new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.writeObject(result); + objectOutputStream.close(); + } catch (Exception e) { e.printStackTrace(); } } + + + private boolean badSession(int s) { if (s == 0) return false; return sessionMap.contains(s);