Salut,
Oleksiy Stashok wrote:
> Hello Jeanfrancois,
>
> by "UDP problem" you mean some specific scenario, which doesn't work?
> Because I wrote the simple UDPConnectorHandlerTest (similar to TCP one)
> and it works fine.
LOL! I was trying to write such test and failed. Then go ahead and
commit if you didn't yet. I guess my UDP skill are bad :-)
Thanks a lot!
-- Jeanfrancois
>
> WBR,
> Alexey.
>
>
> ------------------------------------------------------------------------
>
> /*
> * The contents of this file are subject to the terms
> * of the Common Development and Distribution License
> * (the License). You may not use this file except in
> * compliance with the License.
> *
> * You can obtain a copy of the license at
> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> * glassfish/bootstrap/legal/CDDLv1.0.txt.
> * See the License for the specific language governing
> * permissions and limitations under the License.
> *
> * When distributing Covered Code, include this CDDL
> * Header Notice in each file and include the License file
> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> * If applicable, add the following below the CDDL Header,
> * with the fields enclosed by brackets [] replaced by
> * you own identifying information:
> * "Portions Copyrighted [year] [name of copyright owner]"
> *
> * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
> */
> package com.sun.grizzly;
>
> import com.sun.grizzly.filter.UDPReadFilter;
> import com.sun.grizzly.utils.ControllerUtils;
> import com.sun.grizzly.utils.EchoFilter;
> import java.io.IOException;
> import java.net.InetSocketAddress;
> import java.nio.ByteBuffer;
> import java.nio.channels.DatagramChannel;
> import java.nio.channels.SelectionKey;
> import java.util.Arrays;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.TimeUnit;
> import junit.framework.TestCase;
>
> /**
> * Tests the default <code>UDPConnectorHandler</code>
> *
> * @author Jeanfrancois Arcand
> */
> public class UDPConnectorHandlerTest extends TestCase {
> public static final int PORT = 18888;
> public static final int PACKETS_COUNT = 100;
>
> /**
> * A <code>CallbackHandler</code> handler invoked by the UDPSelectorHandler
> * when a non blocking operation is ready to be processed.
> */
> private CallbackHandler callbackHandler;
>
> public void testSimplePacket() throws IOException {
> final Controller controller = createController();
> ControllerUtils.startController(controller);
> final ConnectorHandler udpConnector =
> controller.acquireConnectorHandler(Controller.Protocol.UDP);
>
> try {
> final byte[] testData = "Hello".getBytes();
> final byte[] response = new byte[testData.length];
>
> final ByteBuffer writeBB = ByteBuffer.wrap(testData);
> final ByteBuffer readBB = ByteBuffer.wrap(response);
> final CountDownLatch responseArrivedLatch = new CountDownLatch(1);
>
> callbackHandler = new CallbackHandler<SelectionKey>(){
>
> private int readTry;
>
> public void onConnect(IOEvent<SelectionKey> ioEvent) {
> SelectionKey key = ioEvent.attachment();
> udpConnector.finishConnect(key);
> controller.registerKey(key,
> SelectionKey.OP_WRITE|SelectionKey.OP_READ);
> }
>
> public void onRead(IOEvent<SelectionKey> ioEvent) {
> SelectionKey key = ioEvent.attachment();
> DatagramChannel datagramChannel = (DatagramChannel)key.channel();
>
> try {
> int nRead = datagramChannel.read(readBB);
> if (nRead == 0 && readTry++ < 2){
> key.attach(callbackHandler);
> controller.getSelectorHandler(Controller.Protocol.UDP)
> .register(key,SelectionKey.OP_READ);
> } else {
> responseArrivedLatch.countDown();
> }
> } catch (IOException ex){
> ex.printStackTrace();
> controller.cancelKey(key);
> }
> }
>
> public void onWrite(IOEvent<SelectionKey> ioEvent) {
> SelectionKey key = ioEvent.attachment();
> DatagramChannel datagramChannel = (DatagramChannel)key.channel();
> try{
> while(writeBB.hasRemaining()){
> int nWrite = datagramChannel.write(writeBB);
>
> if (nWrite == 0){
> key.attach(callbackHandler);
> controller.getSelectorHandler(Controller.Protocol.UDP)
> .register(key,SelectionKey.OP_WRITE);
> return;
> }
> }
>
> udpConnector.read(readBB,false);
> } catch (IOException ex){
> ex.printStackTrace();
> controller.cancelKey(key);
> }
>
> }
> };
>
> try{
> udpConnector.connect(new InetSocketAddress("localhost",PORT)
> ,callbackHandler);
> } catch (Throwable t){
> t.printStackTrace();
> }
>
> long nWrite = udpConnector.write(writeBB,false);
>
> long nRead = -1;
>
> // All bytes written
> if (nWrite == testData.length){
> nRead = udpConnector.read(readBB,false);
> }
>
> if (nRead != nWrite){
> try {
> responseArrivedLatch.await(5,TimeUnit.SECONDS);
> } catch (InterruptedException ex) {
> ex.printStackTrace();
> }
> }
> assertTrue(Arrays.equals(testData, readBB.array()));
> } finally {
> try{
> udpConnector.close();
> controller.releaseConnectorHandler(udpConnector);
> controller.stop();
> } catch (Throwable t){
> t.printStackTrace();
> }
> }
> }
>
>
> private Controller createController() {
> final ProtocolFilter readFilter = new UDPReadFilter();
> final ProtocolFilter echoFilter = new EchoFilter();
>
> UDPSelectorHandler selectorHandler = new UDPSelectorHandler();
> selectorHandler.setPort(PORT);
>
> final Controller controller = new Controller();
>
> controller.setSelectorHandler(selectorHandler);
>
> controller.setProtocolChainInstanceHandler(
> new DefaultProtocolChainInstanceHandler(){
> public ProtocolChain poll() {
> ProtocolChain protocolChain = protocolChains.poll();
> if (protocolChain == null){
> protocolChain = new DefaultProtocolChain();
> protocolChain.addFilter(readFilter);
> protocolChain.addFilter(echoFilter);
> }
> return protocolChain;
> }
> });
>
> return controller;
> }
> }
>
>
> ------------------------------------------------------------------------
>
> Subject:
> svn commit: r181 - trunk/modules: grizzly/src/main/java/com/sun/grizzly
> grizzly/src/main/java/com/sun/grizzly/util
> grizzly/src/test/java/com/sun/gr...
> From:
> jfarcand_at_dev.java.net
> Date:
> Wed, 16 May 2007 21:47:37 +0000
> To:
> commits_at_grizzly.dev.java.net
>
> To:
> commits_at_grizzly.dev.java.net
>
>
> Author: jfarcand
> Date: 2007-05-16 21:47:36+0000
> New Revision: 181
>
> Added:
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java
> trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java
> Modified:
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
> trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java
> trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java
> trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java
> trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java
>
> Log:
> Add UDP client side support (still not fully working...need a UDP expert :-)). Also tigth the SelectorHandler with the ConnectorHandler so client instance can be retrieved using the Protocol.
>
> Hopefully I will figure out what's wrong with the UDP client (receive doesn't work).
>
>
> Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java (original)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java 2007-05-16 21:47:36+0000
> @@ -39,7 +39,15 @@
> * @author Jeanfrancois Arcand
> */
> public interface ConnectorHandler<E extends SelectorHandler> extends Handler{
> -
> +
> +
> + /**
> + * A token decribing the protocol supported by an implementation of this
> + * interface
> + */
> + public Controller.Protocol protocol();
> +
> +
> /**
> * Connect to hostname:port. When an aysnchronous event happens (e.g
> * OP_READ or OP_WRITE), the <code>Controller</code> will invoke
>
> Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (original)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java 2007-05-16 21:47:36+0000
> @@ -113,6 +113,9 @@
> */
> public class Controller implements Runnable, Lifecycle, Copyable {
>
> + public enum Protocol { UDP, TCP , TLS, CUSTOM }
> +
> +
> /**
> * A cached list of Context. Context are by default stateless.
> */
> @@ -138,13 +141,6 @@
> * of the TCPSelectorHandler will be created.
> */
> protected ConcurrentLinkedQueue<SelectorHandler> selectorHandlers;
> -
> -
> - /**
> - * The ConnectorInstanceHandler used to return a new or pooled
> - * ConnectorHandler
> - */
> - private ConnectorInstanceHandler connectorInstanceHandler;
>
>
> /**
> @@ -408,10 +404,15 @@
>
>
> /**
> - * Return the first <code>SelectorHandler</code>
> + * Return the <code>SelectorHandler</code> associated with the protocol.
> */
> - public SelectorHandler getSelectorHandler(){
> - return selectorHandlers.peek();
> + public SelectorHandler getSelectorHandler(Protocol protocol){
> + for (SelectorHandler selectorHandler: selectorHandlers){
> + if (selectorHandler.protocol() == protocol){
> + return selectorHandler;
> + }
> + }
> + return null;
> }
>
>
> @@ -492,11 +493,7 @@
> if (selectionKeyHandler == null){
> selectionKeyHandler = new DefaultSelectionKeyHandler();
> }
> -
> - if (connectorInstanceHandler == null){
> - connectorInstanceHandler = new TCPConnectorInstanceHandler();
> - }
> -
> +
> if (readThreadsCount > 1) {
> initReadThreads();
> selectorHandlers.clear();
> @@ -599,22 +596,28 @@
> * Return an instance of the default <code>ConnectorHandler</code>,
> * which is the <code>TCPConnectorHandler</code>
> */
> - public ConnectorHandler acquireConnectorHandler(){
> - if (state != state.STARTED){
> - throw new IllegalStateException("Controller not yet started");
> + public ConnectorHandler acquireConnectorHandler(Protocol protocol){
> + ConnectorHandler ch = null;
> + for (SelectorHandler selectorHandler: selectorHandlers){
> + if (selectorHandler.protocol() == protocol){
> + ch = selectorHandler.acquireConnectorHandler();
> + ch.setController(this);
> + break;
> + }
> }
> -
> - ConnectorHandler connectorHandler = connectorInstanceHandler.acquire();
> - connectorHandler.setController(this);
> - return connectorHandler;
> + return ch;
> }
>
>
> /**
> * Release a ConnectorHandler.
> */
> - public void releaseConnectorHandler(ConnectorHandler connectorHandler){
> - connectorInstanceHandler.release(connectorHandler);
> + public void releaseConnectorHandler(ConnectorHandler connectorHandler){
> + for (SelectorHandler selectorHandler: selectorHandlers){
> + if (selectorHandler.protocol() == connectorHandler.protocol()){
> + selectorHandler.releaseConnectorHandler(connectorHandler);
> + }
> + }
> }
>
>
> @@ -646,12 +649,4 @@
> logger.log(Level.SEVERE,"Controller.start()",ex);
> }
> }
> -
> - public ConnectorInstanceHandler getConnectorInstanceHandler() {
> - return connectorInstanceHandler;
> - }
> -
> - public void setConnectorInstanceHandler(ConnectorInstanceHandler connectorInstanceHandler) {
> - this.connectorInstanceHandler = connectorInstanceHandler;
> - }
> }
>
> Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java (original)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java 2007-05-16 21:47:36+0000
> @@ -53,7 +53,7 @@
> channels.add(channel);
> }
>
> - getSelectorHandler().getSelector().wakeup();
> + getSelectorHandler(Protocol.TCP).getSelector().wakeup();
> }
>
> /**
> @@ -62,7 +62,8 @@
> private void registerNewChannels() throws IOException {
> synchronized(channels) {
> int size = channels.size();
> - TCPSelectorHandler selectorHandler = (TCPSelectorHandler) getSelectorHandler();
> + TCPSelectorHandler selectorHandler =
> + (TCPSelectorHandler) getSelectorHandler(Protocol.TCP);
> Selector auxSelector = selectorHandler.getSelector();
> for (int i = 0; i < size; i++) {
> SocketChannel channel = channels.get(i);
>
> Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java (original)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java 2007-05-16 21:47:36+0000
> @@ -39,17 +39,26 @@
> public interface SelectorHandler extends Handler, Copyable {
>
> /**
> + * A token decribing the protocol supported by an implementation of this
> + * interface
> + */
> + public Controller.Protocol protocol();
> +
> +
> + /**
> * Gets the underlying selector.
> * @return underlying <code>Selector</code>
> */
> public Selector getSelector();
>
> +
> /**
> * Sets the underlying <code>Selector</code>
> * @param selector underlying <code>Selector</code>
> */
> public void setSelector(Selector selector);
>
> +
> /**
> * The SelectionKey that has been registered.
> * @return <code>Set</code> of <code>SelectionKey</code>
> @@ -149,4 +158,16 @@
> */
> public boolean onConnectInterest(SelectionKey key,Context controllerCtx)
> throws IOException;
> +
> +
> + /**
> + * Return an instance of the <code>ConnectorHandler</code>
> + */
> + public ConnectorHandler acquireConnectorHandler();
> +
> +
> + /**
> + * Release a ConnectorHandler.
> + */
> + public void releaseConnectorHandler(ConnectorHandler connectorHandler);
> }
>
> Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java (original)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java 2007-05-16 21:47:36+0000
> @@ -179,6 +179,15 @@
>
>
> /**
> + * A token decribing the protocol supported by an implementation of this
> + * interface
> + */
> + public Controller.Protocol protocol(){
> + return Controller.Protocol.TLS;
> + }
> +
> +
> + /**
> * Set the SSLContext required to support SSL over NIO.
> * @param sslContext <code>SSLContext</code>
> */
>
> Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java (original)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java 2007-05-16 21:47:36+0000
> @@ -146,7 +146,7 @@
> }
>
> connect(remoteAddress,localAddress,callbackHandler,
> - (TCPSelectorHandler)controller.getSelectorHandler());
> + (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
> }
>
>
> @@ -281,7 +281,7 @@
> }
>
> connect(remoteAddress,localAddress,callbackHandler,
> - (TCPSelectorHandler)controller.getSelectorHandler());
> + (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
> }
>
>
> @@ -402,6 +402,15 @@
>
>
> /**
> + * A token decribing the protocol supported by an implementation of this
> + * interface
> + */
> + public Controller.Protocol protocol(){
> + return Controller.Protocol.TCP;
> + }
> +
> +
> + /**
> * Is the underlying SocketChannel connected.
> */
> public boolean isConnected(){
>
> Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java (original)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java 2007-05-16 21:47:36+0000
> @@ -56,7 +56,15 @@
> * @author Jeanfrancois Arcand
> */
> public class TCPSelectorHandler implements SelectorHandler {
> -
> +
> +
> + /**
> + * The ConnectorInstanceHandler used to return a new or pooled
> + * ConnectorHandler
> + */
> + protected ConnectorInstanceHandler connectorInstanceHandler;
> +
> +
> /**
> * The list of SelectionKey to register next time the Selector.select is
> * invoked.
> @@ -181,6 +189,7 @@
> copyHandler.socketTimeout = socketTimeout;
> copyHandler.logger = logger;
> copyHandler.reuseAddress = reuseAddress;
> + copyHandler.connectorInstanceHandler = connectorInstanceHandler;
> }
>
>
> @@ -215,6 +224,9 @@
> public void preSelect(Context ctx) throws IOException {
> if (selector == null){
> try{
> +
> + connectorInstanceHandler = new TCPConnectorInstanceHandler();
> +
> // Create the socket listener
> selector = Selector.open();
>
> @@ -358,7 +370,8 @@
> if (opConnectToRegister== null){
> opConnectToRegister = new ConcurrentHashMap
> <SocketAddress[],CallbackHandler>();
> - }
> + }
> +
> opConnectToRegister.put(new SocketAddress[]{remoteAddress,localAddress},
> callBackHandler);
> selector.wakeup();
> @@ -491,7 +504,37 @@
> }
> return false;
> }
> -
> +
> +
> + /**
> + * Return an instance of the default <code>ConnectorHandler</code>,
> + * which is the <code>TCPConnectorHandler</code>
> + */
> + public ConnectorHandler acquireConnectorHandler(){
> + if (selector == null && !selector.isOpen()){
> + throw new IllegalStateException("SelectorHandler not yet started");
> + }
> +
> + ConnectorHandler connectorHandler = connectorInstanceHandler.acquire();
> + return connectorHandler;
> + }
> +
> +
> + /**
> + * Release a ConnectorHandler.
> + */
> + public void releaseConnectorHandler(ConnectorHandler connectorHandler){
> + connectorInstanceHandler.release(connectorHandler);
> + }
> +
> +
> + /**
> + * A token decribing the protocol supported by an implementation of this
> + * interface
> + */
> + public Controller.Protocol protocol(){
> + return Controller.Protocol.TCP;
> + }
> // ------------------------------------------------------ Utils ----------//
>
>
>
> Added: trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java?view=auto&rev=181
> ==============================================================================
> --- (empty file)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java 2007-05-16 21:47:36+0000
> @@ -0,0 +1,449 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly;
> +
> +import com.sun.grizzly.util.ByteBufferInputStream;
> +import com.sun.grizzly.util.OutputWriter;
> +import java.io.IOException;
> +import java.net.SocketAddress;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.AlreadyConnectedException;
> +import java.nio.channels.DatagramChannel;
> +import java.nio.channels.NotYetConnectedException;
> +import java.nio.channels.SelectionKey;
> +import java.nio.channels.DatagramChannel;
> +import java.util.concurrent.CountDownLatch;
> +
> +/**
> + * Client side interface used to implement non blocking client operation.
> + * Implementation of this class must make sure the following methods are
> + * invoked in that order:
> + *
> + * (1) connect()
> + * (2) read() or write().
> + *
> + * @author Jeanfrancois Arcand
> + */
> +public class UDPConnectorHandler implements ConnectorHandler<UDPSelectorHandler>{
> +
> + /**
> + * The underlying UDPSelectorHandler used to mange SelectionKeys.
> + */
> + private UDPSelectorHandler selectorHandler;
> +
> +
> + /**
> + * A <code>CallbackHandler</code> handler invoked by the UDPSelectorHandler
> + * when a non blocking operation is ready to be processed.
> + */
> + private CallbackHandler callbackHandler;
> +
> +
> + /**
> + * The connection's DatagramChannel.
> + */
> + private DatagramChannel datagramChannel;
> +
> +
> + /**
> + * Is the connection established.
> + */
> + private volatile boolean isConnected;
> +
> +
> + /**
> + * The internal Controller used (in case not specified).
> + */
> + private Controller controller;
> +
> +
> + /**
> + * IsConnected Latch related
> + */
> + private CountDownLatch isConnectedLatch;
> +
> +
> + /**
> + * Are we creating a controller every run.
> + */
> + private boolean isStandalone = false;
> +
> +
> + /**
> + * A blocking <code>InputStream</code> that use a pool of Selector
> + * to execute a blocking read operation.
> + */
> + private ByteBufferInputStream inputStream;
> +
> +
> + /**
> + * Connect to hostname:port. When an aysnchronous event happens (e.g
> + * OP_READ or OP_WRITE), the <code>Controller</code> will invoke
> + * the CallBackHandler.
> + * @param remoteAddress remote address to connect
> + * @param callbackHandler the handler invoked by the Controller when
> + * an non blocking operation is ready to be handled.
> + */
> + public void connect(SocketAddress remoteAddress,
> + CallbackHandler callbackHandler) throws IOException {
> +
> + connect(remoteAddress,null,callbackHandler);
> + }
> +
> +
> + /**
> + * Connect to hostname:port. When an aysnchronous event happens (e.g
> + * OP_READ or OP_WRITE), the <code>Controller</code> will invoke
> + * the CallBackHandler.
> + * @param remoteAddress remote address to connect
> + * @param localAddress local address to bind
> + * @param callbackHandler the handler invoked by the Controller when
> + * an non blocking operation is ready to be handled.
> + */
> + public void connect(SocketAddress remoteAddress, SocketAddress localAddress,
> + CallbackHandler callbackHandler) throws IOException {
> +
> + if (controller == null){
> + throw new IllegalStateException("Controller cannot be null");
> + }
> +
> + connect(remoteAddress,localAddress,callbackHandler,
> + (UDPSelectorHandler)controller.getSelectorHandler(protocol()));
> + }
> +
> +
> + /**
> + * Connect to hostname:port. When an aysnchronous event happens (e.g
> + * OP_READ or OP_WRITE), the <code>Controller</code> will invoke
> + * the CallBackHandler.
> + * @param remoteAddress remote address to connect
> + * @param callbackHandler the handler invoked by the Controller when
> + * an non blocking operation is ready to be handled.
> + * @param selectorHandler an instance of SelectorHandler.
> + */
> + public void connect(SocketAddress remoteAddress,
> + CallbackHandler callbackHandler,
> + UDPSelectorHandler selectorHandler) throws IOException {
> +
> + connect(remoteAddress,null,callbackHandler,selectorHandler);
> + }
> +
> + /**
> + * Connect to hostname:port. When an aysnchronous event happens (e.g
> + * OP_READ or OP_WRITE), the <code>Controller</code> will invoke
> + * the CallBackHandler.
> + * @param remoteAddress remote address to connect
> + * @param localAddress local address to bin
> + * @param callbackHandler the handler invoked by the Controller when
> + * an non blocking operation is ready to be handled.
> + * @param selectorHandler an instance of SelectorHandler.
> + */
> + public void connect(SocketAddress remoteAddress, SocketAddress localAddress,
> + CallbackHandler callbackHandler,
> + UDPSelectorHandler selectorHandler) throws IOException {
> +
> + if (isConnected){
> + throw new AlreadyConnectedException();
> + }
> +
> + if (controller == null){
> + throw new IllegalStateException("Controller cannot be null");
> + }
> +
> + if (selectorHandler == null){
> + throw new IllegalStateException("Controller cannot be null");
> + }
> +
> + this.selectorHandler = selectorHandler;
> + this.callbackHandler = callbackHandler;
> +
> + // Wait for the onConnect to be invoked.
> + isConnectedLatch = new CountDownLatch(1);
> +
> + selectorHandler.connect(remoteAddress,localAddress,callbackHandler);
> + inputStream = new ByteBufferInputStream();
> +
> + try {
> + isConnectedLatch.await();
> + } catch (InterruptedException ex) {
> + ; // LOG ME
> + }
> + }
> +
> +
> + /**
> + * Connect to hostname:port. Internally an instance of Controller and
> + * its default SelectorHandler will be created everytime this method is
> + * called. This method should be used only and only if no external
> + * Controller has been initialized.
> + * @param remoteAddress remote address to connect
> + */
> + public void connect(SocketAddress remoteAddress)
> + throws IOException {
> + connect(remoteAddress,(SocketAddress)null);
> + }
> +
> +
> + /**
> + * Connect to hostname:port. Internally an instance of Controller and
> + * its default SelectorHandler will be created everytime this method is
> + * called. This method should be used only and only if no external
> + * Controller has been initialized.
> + * @param remoteAddress remote address to connect
> + * @param localAddress local address to bin
> + */
> + public void connect(SocketAddress remoteAddress, SocketAddress localAddress)
> + throws IOException {
> +
> + if (isConnected){
> + throw new AlreadyConnectedException();
> + }
> +
> + if (controller == null){
> + isStandalone = true;
> + controller = new Controller();
> + controller.setSelectorHandler(new UDPSelectorHandler(true));
> + DefaultPipeline pipeline = new DefaultPipeline();
> + pipeline.setMaxThreads(1);
> + pipeline.initPipeline();
> + pipeline.startPipeline();
> + controller.setPipeline(pipeline);
> +
> + callbackHandler = new CallbackHandler<SelectionKey>(){
> + public void onConnect(IOEvent<SelectionKey> ioEvent) {
> + SelectionKey key = ioEvent.attachment();
> + finishConnect(key);
> + }
> + public void onRead(IOEvent<SelectionKey> ioEvent) {
> + }
> + public void onWrite(IOEvent<SelectionKey> ioEvent) {
> + }
> + };
> +
> + final CountDownLatch latch = new CountDownLatch(1);
> + try{
> + pipeline.execute(new Context(){
> + public Object call() throws Exception {
> + latch.countDown();
> + controller.start();
> + return null;
> + }
> + });
> + } catch (PipelineFullException ex){
> + throw new IOException(ex.getMessage());
> + }
> +
> + try {
> + latch.await();
> + Thread.sleep(1000);
> + } catch (InterruptedException ex) {
> + }
> + }
> +
> + connect(remoteAddress,localAddress,callbackHandler,
> + (UDPSelectorHandler)controller.getSelectorHandler(protocol()));
> + }
> +
> +
> + /**
> + * Register the <code>CallbackHandler</code> so when an asynchonous
> + * event occurs, the <code>CallbackHandler</code> will be invoked.
> + * @param callbackHandler the handler invoked by the Controller when
> + * an non blocking operation is ready to be handled.
> + */
> + public void register(CallbackHandler callbackHandler){
> + this.callbackHandler = callbackHandler;
> + }
> +
> +
> + /**
> + * Read bytes. If blocking is set to <tt>true</tt>, a pool of temporary
> + * <code>Selector</code> will be used to read bytes.
> + * @param byteBuffer The byteBuffer to store bytes.
> + * @param blocking <tt>true</tt> if a a pool of temporary Selector
> + * is required to handle a blocking read.
> + */
> + public long read(ByteBuffer byteBuffer, boolean blocking) throws IOException {
> + if (!isConnected){
> + throw new NotYetConnectedException();
> + }
> +
> + SelectionKey key = datagramChannel.keyFor(selectorHandler.getSelector());
> + if (blocking){
> + inputStream.setSelectionKey(key);
> + inputStream.setChannelType(
> + ByteBufferInputStream.ChannelType.DatagramChannel);
> + int nRead = inputStream.read(byteBuffer);
> + return nRead;
> + } else {
> + if (callbackHandler == null){
> + throw new IllegalStateException
> + ("Non blocking read needs a CallbackHandler");
> + }
> + int nRead = datagramChannel.read(byteBuffer);
> +
> + if (nRead == 0){
> + key.attach(callbackHandler);
> + selectorHandler.register(key,
> + SelectionKey.OP_READ|SelectionKey.OP_WRITE);
> + }
> + return nRead;
> + }
> + }
> +
> +
> + /**
> + * Writes bytes. If blocking is set to <tt>true</tt>, a pool of temporary
> + * <code>Selector</code> will be used to writes bytes.
> + * @param byteBuffer The byteBuffer to write.
> + * @param blocking <tt>true</tt> if a a pool of temporary Selector
> + * is required to handle a blocking write.
> + */
> + public long write(ByteBuffer byteBuffer, boolean blocking) throws IOException {
> + if (!isConnected){
> + throw new NotYetConnectedException();
> + }
> +
> + SelectionKey key = datagramChannel.keyFor(selectorHandler.getSelector());
> + if (blocking){
> + throw new IllegalStateException("Blocking mode not supported");
> + } else {
> + if (callbackHandler == null){
> + throw new IllegalStateException
> + ("Non blocking write needs a CallbackHandler");
> + }
> + int nWrite = datagramChannel.write(byteBuffer);
> +
> + if (nWrite == 0){
> + key.attach(callbackHandler);
> + selectorHandler.register(key,
> + SelectionKey.OP_READ|SelectionKey.OP_WRITE);
> + }
> + return nWrite;
> + }
> + }
> +
> +
> + /**
> + * Receive bytes.
> + * @param byteBuffer The byteBuffer to store bytes.
> + */
> + public long send(ByteBuffer byteBuffer, SocketAddress socketAddress)
> + throws IOException {
> + if (!isConnected){
> + throw new NotYetConnectedException();
> + }
> +
> + if (callbackHandler == null){
> + throw new IllegalStateException
> + ("Non blocking read needs a CallbackHandler");
> + }
> +
> + return datagramChannel.send(byteBuffer,socketAddress);
> + }
> +
> +
> + /**
> + * Receive bytes.
> + * @param byteBuffer The byteBuffer to store bytes.
> + */
> + public SocketAddress receive(ByteBuffer byteBuffer) throws IOException {
> + if (!isConnected){
> + throw new NotYetConnectedException();
> + }
> +
> + SelectionKey key = datagramChannel.keyFor(selectorHandler.getSelector());
> +
> + if (callbackHandler == null){
> + throw new IllegalStateException
> + ("Non blocking read needs a CallbackHandler");
> + }
> +
> + SocketAddress socketAddress = datagramChannel.receive(byteBuffer);
> + return socketAddress;
> + }
> +
> +
> + /**
> + * Close the underlying connection.
> + */
> + public void close() throws IOException{
> + if (datagramChannel != null){
> + if (selectorHandler != null){
> + SelectionKey key =
> + datagramChannel.keyFor(selectorHandler.getSelector());
> +
> + if (key == null) return;
> +
> + key.cancel();
> + key.attach(null);
> + }
> + datagramChannel.close();
> + }
> +
> + if (controller != null && isStandalone){
> + controller.stop();
> + controller = null;
> + }
> + isStandalone = false;
> + }
> +
> +
> + /**
> + * Finish handling the OP_CONNECT interest ops.
> + */
> + public void finishConnect(SelectionKey key){
> + datagramChannel = (DatagramChannel)key.channel();
> + isConnected = datagramChannel.isConnected();
> + isConnectedLatch.countDown();
> + }
> +
> +
> + /**
> + * A token decribing the protocol supported by an implementation of this
> + * interface
> + */
> + public Controller.Protocol protocol(){
> + return Controller.Protocol.UDP;
> + }
> +
> +
> + /**
> + * Is the underlying DatagramChannel connected.
> + */
> + public boolean isConnected(){
> + return isConnected;
> + }
> +
> +
> + public Controller getController() {
> + return controller;
> + }
> +
> +
> + public void setController(Controller controller) {
> + this.controller = controller;
> + }
> +
> +}
>
> Added: trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java?view=auto&rev=181
> ==============================================================================
> --- (empty file)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java 2007-05-16 21:47:36+0000
> @@ -0,0 +1,66 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly;
> +
> +import java.util.concurrent.ConcurrentLinkedQueue;
> +
> +/**
> + * Default <code>ConnectorInstanceHandler</code> which use a
> + * <code>List</code> to pool <code>ConnectorHandler</code>
> + *
> + * @author Jeanfrancois
> + */
> +public class UDPConnectorInstanceHandler implements
> + ConnectorInstanceHandler<UDPConnectorHandler> {
> +
> + /**
> + * Simple queue used to pool <code>UDPConnectorHandler</code>
> + */
> + private ConcurrentLinkedQueue<UDPConnectorHandler> pool;
> +
> +
> + public UDPConnectorInstanceHandler(){
> + pool = new ConcurrentLinkedQueue<UDPConnectorHandler>();
> + }
> +
> + /**
> + * Acquire a <code>ConnectorHandler</code>
> + */
> + public UDPConnectorHandler acquire(){
> + UDPConnectorHandler UDPConnectorHandler = pool.poll();
> + if (UDPConnectorHandler == null){
> + UDPConnectorHandler = new UDPConnectorHandler();
> + }
> + return UDPConnectorHandler;
> + }
> +
> +
> + /**
> + * Release a <code>ConnectorHandler</code>
> + */
> + public void release(UDPConnectorHandler connectorHandler){
> + pool.offer(connectorHandler);
> + }
> +
> +}
>
> Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java (original)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java 2007-05-16 21:47:36+0000
> @@ -28,10 +28,15 @@
> import java.net.BindException;
> import java.net.DatagramSocket;
> import java.net.InetSocketAddress;
> +import java.net.SocketAddress;
> import java.net.SocketException;
> import java.nio.channels.DatagramChannel;
> +import java.nio.channels.SelectableChannel;
> import java.nio.channels.SelectionKey;
> import java.nio.channels.Selector;
> +import java.nio.channels.SocketChannel;
> +import java.util.Iterator;
> +import java.util.concurrent.ConcurrentHashMap;
> import java.util.concurrent.ConcurrentLinkedQueue;
> import java.util.logging.Level;
>
> @@ -66,6 +71,11 @@
>
> public UDPSelectorHandler() {
> }
> +
> +
> + public UDPSelectorHandler(boolean isClient) {
> + super(isClient);
> + }
>
>
> @Override
> @@ -84,6 +94,7 @@
> public void preSelect(Context ctx) throws IOException {
> if (selector == null){
> try{
> + connectorInstanceHandler = new UDPConnectorInstanceHandler();
> datagramChannel = DatagramChannel.open();
> datagramChannel.configureBlocking(false);
> selector = Selector.open();
> @@ -103,7 +114,76 @@
>
> datagramSocket.setSoTimeout(serverTimeout);
> } else {
> - super.preSelect(ctx);
> + if (opReadToRegister == null){
> + opReadToRegister = new ConcurrentLinkedQueue<SelectionKey>();
> + }
> +
> + SelectionKey key;
> + int size = opReadToRegister.size();
> + long currentTime = 0L;
> + if (size > 0){
> + currentTime = System.currentTimeMillis();
> + }
> +
> + for (int i=0; i < size; i++) {
> + key = opReadToRegister.poll();
> +
> + key.interestOps(key.interestOps() | SelectionKey.OP_READ);
> +
> + if (key.attachment() == null)
> + key.attach(currentTime);
> + }
> +
> + if (opWriteToRegister== null){
> + opWriteToRegister = new ConcurrentLinkedQueue<SelectionKey>();
> + }
> +
> + size = opWriteToRegister.size();
> + currentTime = 0L;
> + if (size > 0){
> + currentTime = System.currentTimeMillis();
> + }
> +
> + for (int i=0; i < size; i++) {
> + key = opWriteToRegister.poll();
> +
> + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
> +
> + if (key.attachment() == null)
> + key.attach(currentTime);
> + }
> +
> + if (opConnectToRegister== null){
> + opConnectToRegister = new ConcurrentHashMap
> + <SocketAddress[],CallbackHandler>();
> + }
> +
> + size = opConnectToRegister.size();
> + currentTime = 0L;
> + if (size > 0){
> + currentTime = System.currentTimeMillis();
> + } else {
> + return;
> + }
> +
> + CallbackHandler<SocketChannel> callbackHandler;
> + Iterator<SocketAddress[]> iterator =
> + opConnectToRegister.keySet().iterator();
> + SocketAddress[] remoteLocal;
> + while(iterator.hasNext()){
> + remoteLocal = iterator.next();
> + final DatagramChannel datagramChannel = DatagramChannel.open();
> + datagramChannel.socket().setReuseAddress(true);
> + if (remoteLocal[1] != null){
> + datagramChannel.socket().bind(remoteLocal[1]);
> + }
> + datagramChannel.configureBlocking(false);
> + datagramChannel.connect(remoteLocal[0]);
> + key = datagramChannel.register(selector,
> + SelectionKey.OP_READ|SelectionKey.OP_WRITE,
> + opConnectToRegister.remove(remoteLocal));
> + onConnectInterest(key, ctx);
> + }
> }
> }
>
> @@ -153,28 +233,88 @@
> /**
> * Handle OP_READ.
> */
> - @Override
> - public boolean onReadInterest(SelectionKey key,Context ctx)
> + public boolean onReadInterest(final SelectionKey key,Context ctx)
> throws IOException{
> -
> // disable OP_READ on key before doing anything else
> key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
> - return true;
> + if (key.attachment() instanceof CallbackHandler){
> + ((CallbackHandler)key.attachment())
> + .onRead(new IOEvent<SelectionKey>(){
> +
> + public SelectionKey attach(SelectionKey sc){
> + return key;
> + }
> +
> + public SelectionKey attachment(){
> + return key;
> + }
> + });
> + return false;
> + } else {
> + return true;
> + }
> }
>
>
> /**
> * Handle OP_WRITE.
> + *
> */
> - @Override
> - public boolean onWriteInterest(SelectionKey key, Context ctx)
> + public boolean onWriteInterest(final SelectionKey key, Context ctx)
> throws IOException{
> -
> + // disable OP_WRITE on key before doing anything else
> key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
> - return true;
> + if (key.attachment() instanceof CallbackHandler){
> + ((CallbackHandler)key.attachment())
> + .onWrite(new IOEvent<SelectionKey>(){
> +
> + public SelectionKey attach(SelectionKey sc){
> + return key;
> + }
> +
> + public SelectionKey attachment(){
> + return key;
> + }
> + });
> + return false;
> + } else {
> + return true;
> + }
> + }
> +
> +
> + /**
> + * Handle OP_CONNECT.
> + */
> + public boolean onConnectInterest(final SelectionKey key, Context ctx)
> + throws IOException{
> + if (key.attachment() instanceof CallbackHandler){
> + ((CallbackHandler)key.attachment())
> + .onConnect(new IOEvent<SelectionKey>(){
> +
> + public SelectionKey attach(SelectionKey sc){
> + return key;
> + }
> +
> + public SelectionKey attachment(){
> + return key;
> + }
> + });
> +
> + }
> + return false;
> }
>
>
> + /**
> + * A token decribing the protocol supported by an implementation of this
> + * interface
> + */
> + public Controller.Protocol protocol(){
> + return Controller.Protocol.UDP;
> + }
> +
> +
> public int getSsBackLog() {
> throw new IllegalStateException(NOT_SUPPORTED);
> }
>
> Modified: trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java (original)
> +++ trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java 2007-05-16 21:47:36+0000
> @@ -26,8 +26,10 @@
> import java.io.InputStream;
> import java.io.IOException;
> import java.nio.ByteBuffer;
> +import java.nio.channels.DatagramChannel;
> import java.nio.channels.SelectionKey;
> import java.nio.channels.Selector;
> +import java.nio.channels.ReadableByteChannel;
> import java.nio.channels.SocketChannel;
>
> /**
> @@ -39,6 +41,12 @@
> */
> public class ByteBufferInputStream extends InputStream {
>
> + public enum ChannelType { SocketChannel, DatagramChannel }
> +
> +
> + private ChannelType defaultChannelType = ChannelType.SocketChannel;
> +
> +
> private static int defaultReadTimeout = 15000;
>
> /**
> @@ -238,9 +246,9 @@
> SelectionKey tmpKey = null;
>
> try{
> - SocketChannel socketChannel = (SocketChannel)key.channel();
> + ReadableByteChannel readableChannel = (ReadableByteChannel)key.channel();
> while (count > 0){
> - count = socketChannel.read(byteBuffer);
> + count = readableChannel.read(byteBuffer);
> if ( count > -1 )
> byteRead += count;
> else
> @@ -254,8 +262,15 @@
> return 0;
> }
> count = 1;
> - tmpKey = socketChannel
> - .register(readSelector,SelectionKey.OP_READ);
> +
> + tmpKey = null;
> + if (defaultChannelType == ChannelType.SocketChannel){
> + tmpKey = ((SocketChannel)readableChannel)
> + .register(readSelector,SelectionKey.OP_READ);
> + } else {
> + tmpKey = ((DatagramChannel)readableChannel)
> + .register(readSelector,SelectionKey.OP_READ);
> + }
> tmpKey.interestOps(tmpKey.interestOps() | SelectionKey.OP_READ);
> int code = readSelector.select(readTimeout);
> tmpKey.interestOps(
> @@ -266,7 +281,7 @@
> }
>
> while (count > 0){
> - count = socketChannel.read(byteBuffer);
> + count = readableChannel.read(byteBuffer);
> if ( count > -1 )
> byteRead += count;
> else
> @@ -318,5 +333,15 @@
> public static void setDefaultReadTimeout(int aDefaultReadTimeout) {
> defaultReadTimeout = aDefaultReadTimeout;
> }
> +
> +
> + public ChannelType getChannelType() {
> + return defaultChannelType;
> + }
> +
> +
> + public void setChannelType(ChannelType channelType) {
> + this.defaultChannelType = channelType;
> + }
> }
>
>
> Modified: trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java&p2=trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java (original)
> +++ trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java 2007-05-16 21:47:36+0000
> @@ -53,7 +53,8 @@
> public void testSimplePacket() throws IOException {
> final Controller controller = createController();
> ControllerUtils.startController(controller);
> - final ConnectorHandler tcpConnector = controller.acquireConnectorHandler();
> + final ConnectorHandler tcpConnector =
> + controller.acquireConnectorHandler(Controller.Protocol.TCP);
>
> try {
> final byte[] testData = "Hello".getBytes();
> @@ -82,7 +83,7 @@
> int nRead = socketChannel.read(readBB);
> if (nRead == 0 && readTry++ < 2){
> key.attach(callbackHandler);
> - controller.getSelectorHandler()
> + controller.getSelectorHandler(Controller.Protocol.TCP)
> .register(key,SelectionKey.OP_READ);
> } else {
> responseArrivedLatch.countDown();
> @@ -102,7 +103,7 @@
>
> if (nWrite == 0){
> key.attach(callbackHandler);
> - controller.getSelectorHandler()
> + controller.getSelectorHandler(Controller.Protocol.TCP)
> .register(key,SelectionKey.OP_WRITE);
> return;
> }
> @@ -144,6 +145,7 @@
> } finally {
> try{
> tcpConnector.close();
> + controller.releaseConnectorHandler(tcpConnector);
> controller.stop();
> } catch (Throwable t){
> t.printStackTrace();
>
> Added: trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java?view=auto&rev=181
> ==============================================================================
> --- (empty file)
> +++ trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java 2007-05-16 21:47:36+0000
> @@ -0,0 +1,75 @@
> +/*
> + * The contents of this file are subject to the terms
> + * of the Common Development and Distribution License
> + * (the License). You may not use this file except in
> + * compliance with the License.
> + *
> + * You can obtain a copy of the license at
> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * See the License for the specific language governing
> + * permissions and limitations under the License.
> + *
> + * When distributing Covered Code, include this CDDL
> + * Header Notice in each file and include the License file
> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
> + * If applicable, add the following below the CDDL Header,
> + * with the fields enclosed by brackets [] replaced by
> + * you own identifying information:
> + * "Portions Copyrighted [year] [name of copyright owner]"
> + *
> + * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
> + */
> +
> +package com.sun.grizzly.utils;
> +
> +import com.sun.grizzly.UDPConnectorHandler;
> +import java.io.IOException;
> +import java.net.InetSocketAddress;
> +import java.net.SocketAddress;
> +import java.nio.ByteBuffer;
> +
> +/**
> + * UDP client that exercise the non blocking ConnectorHandler.
> + *
> + * @author Jeanfrancois Arcand
> + */
> +public class NonBlockingUDPIOClient {
> + private String host;
> + private int port;
> +
> + private UDPConnectorHandler udpConnectorHandler;
> +
> + public NonBlockingUDPIOClient(String host, int port) {
> + this.host = host;
> + this.port = port;
> + }
> +
> + public void connect() throws IOException {
> + udpConnectorHandler = new UDPConnectorHandler();
> + udpConnectorHandler.connect(new InetSocketAddress(host,port));
> + }
> +
> + public void send(byte[] msg) throws IOException {
> + udpConnectorHandler.send(ByteBuffer.wrap(msg),
> + new InetSocketAddress(host,port));
> + }
> +
> + public int receive(byte[] buf) throws IOException {
> + return receive(buf, buf.length);
> + }
> +
> + public int receive(byte[] buf, int length) throws IOException {
> + ByteBuffer byteBuffer = ByteBuffer.wrap(buf,0,length);
> + SocketAddress sa = udpConnectorHandler.receive(byteBuffer);
> + System.out.println("sa: " + sa);
> + System.out.println("nRead: " + byteBuffer.position());
> + return byteBuffer.position();
> + }
> +
> + public void close() throws IOException {
> + if (udpConnectorHandler != null) {
> + udpConnectorHandler.close();
> + }
> + }
> +}
>
> Modified: trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java&p2=trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java (original)
> +++ trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java 2007-05-16 21:47:36+0000
> @@ -45,6 +45,8 @@
> public void connect() throws IOException {
> socket = new DatagramSocket();
> socket.connect(InetAddress.getByName(host), port);
> +
> +
> }
>
> public void send(byte[] msg) throws IOException {
>
> Modified: trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java
> Url: https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java?view=diff&rev=181&p1=trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java&p2=trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java&r1=180&r2=181
> ==============================================================================
> --- trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java (original)
> +++ trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java 2007-05-16 21:47:36+0000
> @@ -597,7 +597,8 @@
> * running of this thread.
> */
> public void registerKey(SelectionKey key){
> - controller.getSelectorHandler().register(key,SelectionKey.OP_READ);
> + controller.getSelectorHandler(Controller.Protocol.TCP)
> + .register(key,SelectionKey.OP_READ);
> }
>
> // -------------------------------------------------------------- Init //
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: commits-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: commits-help_at_grizzly.dev.java.net
>
>
>
>
> ------------------------------------------------------------------------
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net