> LOL! I was trying to write such test and failed. Then go ahead and
> commit if you didn't yet. I guess my UDP skill are bad :-)
Well, I just took TCP code and remade it for UDP. Hope it will work not
just on Windows XP :)
Alexey.
>
> Thanks a lot!
>
> -- Jeanfrancois
>
>
>>
>> WBR,
>> Alexey.
>>
>>
>> ------------------------------------------------------------------------
>>
>> /*
>> * The contents of this file are subject to the terms
>> * of the Common Development and Distribution License
>> * (the License). You may not use this file except in
>> * compliance with the License.
>> *
>> * You can obtain a copy of the license at
>> * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> * See the License for the specific language governing
>> * permissions and limitations under the License.
>> *
>> * When distributing Covered Code, include this CDDL
>> * Header Notice in each file and include the License file
>> * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> * If applicable, add the following below the CDDL Header,
>> * with the fields enclosed by brackets [] replaced by
>> * you own identifying information:
>> * "Portions Copyrighted [year] [name of copyright owner]"
>> *
>> * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
>> */
>> package com.sun.grizzly;
>>
>> import com.sun.grizzly.filter.UDPReadFilter;
>> import com.sun.grizzly.utils.ControllerUtils;
>> import com.sun.grizzly.utils.EchoFilter;
>> import java.io.IOException;
>> import java.net.InetSocketAddress;
>> import java.nio.ByteBuffer;
>> import java.nio.channels.DatagramChannel;
>> import java.nio.channels.SelectionKey;
>> import java.util.Arrays;
>> import java.util.concurrent.CountDownLatch;
>> import java.util.concurrent.TimeUnit;
>> import junit.framework.TestCase;
>>
>> /**
>> * Tests the default <code>UDPConnectorHandler</code>
>> * * @author Jeanfrancois Arcand
>> */
>> public class UDPConnectorHandlerTest extends TestCase {
>> public static final int PORT = 18888;
>> public static final int PACKETS_COUNT = 100;
>>
>> /**
>> * A <code>CallbackHandler</code> handler invoked by the
>> UDPSelectorHandler
>> * when a non blocking operation is ready to be processed.
>> */
>> private CallbackHandler callbackHandler;
>> public void testSimplePacket() throws IOException {
>> final Controller controller = createController();
>> ControllerUtils.startController(controller);
>> final ConnectorHandler udpConnector =
>> controller.acquireConnectorHandler(Controller.Protocol.UDP);
>> try {
>> final byte[] testData = "Hello".getBytes();
>> final byte[] response = new byte[testData.length];
>>
>> final ByteBuffer writeBB = ByteBuffer.wrap(testData);
>> final ByteBuffer readBB = ByteBuffer.wrap(response);
>> final CountDownLatch responseArrivedLatch = new
>> CountDownLatch(1); callbackHandler = new
>> CallbackHandler<SelectionKey>(){
>> private int readTry;
>> public void
>> onConnect(IOEvent<SelectionKey> ioEvent) {
>> SelectionKey key = ioEvent.attachment();
>> udpConnector.finishConnect(key);
>> controller.registerKey(key,
>> SelectionKey.OP_WRITE|SelectionKey.OP_READ);
>> }
>> public void
>> onRead(IOEvent<SelectionKey> ioEvent) {
>> SelectionKey key = ioEvent.attachment();
>> DatagramChannel datagramChannel =
>> (DatagramChannel)key.channel();
>> try {
>> int nRead = datagramChannel.read(readBB);
>> if (nRead == 0 && readTry++ < 2){
>> key.attach(callbackHandler);
>>
>> controller.getSelectorHandler(Controller.Protocol.UDP)
>> .register(key,SelectionKey.OP_READ);
>> } else {
>> responseArrivedLatch.countDown();
>> } } catch (IOException ex){
>> ex.printStackTrace();
>> controller.cancelKey(key);
>> }
>> }
>> public void
>> onWrite(IOEvent<SelectionKey> ioEvent) {
>> SelectionKey key = ioEvent.attachment();
>> DatagramChannel datagramChannel =
>> (DatagramChannel)key.channel();
>> try{
>> while(writeBB.hasRemaining()){
>> int nWrite = datagramChannel.write(writeBB);
>>
>> if (nWrite == 0){
>> key.attach(callbackHandler);
>>
>> controller.getSelectorHandler(Controller.Protocol.UDP)
>>
>> .register(key,SelectionKey.OP_WRITE);
>> return;
>> } }
>>
>> udpConnector.read(readBB,false);
>> } catch (IOException ex){
>> ex.printStackTrace();
>> controller.cancelKey(key);
>> }
>> }
>> };
>>
>> try{
>> udpConnector.connect(new
>> InetSocketAddress("localhost",PORT)
>> ,callbackHandler);
>> } catch (Throwable t){
>> t.printStackTrace();
>> }
>>
>> long nWrite = udpConnector.write(writeBB,false);
>> long nRead = -1;
>>
>> // All bytes written
>> if (nWrite == testData.length){
>> nRead = udpConnector.read(readBB,false);
>> }
>> if (nRead != nWrite){
>> try {
>> responseArrivedLatch.await(5,TimeUnit.SECONDS);
>> } catch (InterruptedException ex) {
>> ex.printStackTrace();
>> }
>> }
>> assertTrue(Arrays.equals(testData, readBB.array()));
>> } finally { try{
>> udpConnector.close();
>> controller.releaseConnectorHandler(udpConnector);
>> controller.stop();
>> } catch (Throwable t){
>> t.printStackTrace();
>> }
>> }
>> }
>> private Controller createController() {
>> final ProtocolFilter readFilter = new UDPReadFilter();
>> final ProtocolFilter echoFilter = new EchoFilter();
>> UDPSelectorHandler selectorHandler = new
>> UDPSelectorHandler();
>> selectorHandler.setPort(PORT);
>> final Controller controller = new Controller();
>> controller.setSelectorHandler(selectorHandler);
>> controller.setProtocolChainInstanceHandler(
>> new DefaultProtocolChainInstanceHandler(){
>> public ProtocolChain poll() {
>> ProtocolChain protocolChain = protocolChains.poll();
>> if (protocolChain == null){
>> protocolChain = new DefaultProtocolChain();
>> protocolChain.addFilter(readFilter);
>> protocolChain.addFilter(echoFilter);
>> }
>> return protocolChain;
>> }
>> });
>> return controller;
>> }
>> }
>>
>>
>> ------------------------------------------------------------------------
>>
>> Subject:
>> svn commit: r181 - trunk/modules:
>> grizzly/src/main/java/com/sun/grizzly
>> grizzly/src/main/java/com/sun/grizzly/util
>> grizzly/src/test/java/com/sun/gr...
>> From:
>> jfarcand_at_dev.java.net
>> Date:
>> Wed, 16 May 2007 21:47:37 +0000
>> To:
>> commits_at_grizzly.dev.java.net
>>
>> To:
>> commits_at_grizzly.dev.java.net
>>
>>
>> Author: jfarcand
>> Date: 2007-05-16 21:47:36+0000
>> New Revision: 181
>>
>> Added:
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java
>>
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java
>>
>>
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java
>>
>> Modified:
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java
>>
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java
>>
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java
>>
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
>>
>>
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java
>>
>>
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java
>>
>>
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java
>>
>>
>> trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java
>>
>>
>> Log:
>> Add UDP client side support (still not fully working...need a UDP
>> expert :-)). Also tigth the SelectorHandler with the ConnectorHandler
>> so client instance can be retrieved using the Protocol.
>>
>> Hopefully I will figure out what's wrong with the UDP client (receive
>> doesn't work).
>>
>>
>> Modified:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ConnectorHandler.java
>> 2007-05-16 21:47:36+0000
>> @@ -39,7 +39,15 @@
>> * @author Jeanfrancois Arcand
>> */
>> public interface ConnectorHandler<E extends SelectorHandler> extends
>> Handler{
>> - + + + /**
>> + * A token decribing the protocol supported by an implementation
>> of this
>> + * interface
>> + */
>> + public Controller.Protocol protocol(); + + /**
>> * Connect to hostname:port. When an aysnchronous event happens
>> (e.g * OP_READ or OP_WRITE), the <code>Controller</code> will
>> invoke
>> Modified:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
>> 2007-05-16 21:47:36+0000
>> @@ -113,6 +113,9 @@
>> */
>> public class Controller implements Runnable, Lifecycle, Copyable {
>> + public enum Protocol { UDP, TCP , TLS, CUSTOM }
>> + + /**
>> * A cached list of Context. Context are by default stateless.
>> */
>> @@ -138,13 +141,6 @@
>> * of the TCPSelectorHandler will be created.
>> */
>> protected ConcurrentLinkedQueue<SelectorHandler> selectorHandlers;
>> -
>> - - /**
>> - * The ConnectorInstanceHandler used to return a new or pooled
>> - * ConnectorHandler
>> - */
>> - private ConnectorInstanceHandler connectorInstanceHandler;
>> /**
>> @@ -408,10 +404,15 @@
>> /**
>> - * Return the first <code>SelectorHandler</code>
>> + * Return the <code>SelectorHandler</code> associated with the
>> protocol.
>> */
>> - public SelectorHandler getSelectorHandler(){
>> - return selectorHandlers.peek();
>> + public SelectorHandler getSelectorHandler(Protocol protocol){
>> + for (SelectorHandler selectorHandler: selectorHandlers){
>> + if (selectorHandler.protocol() == protocol){
>> + return selectorHandler;
>> + }
>> + }
>> + return null;
>> }
>> @@ -492,11 +493,7 @@
>> if (selectionKeyHandler == null){
>> selectionKeyHandler = new DefaultSelectionKeyHandler();
>> }
>> - - if (connectorInstanceHandler == null){
>> - connectorInstanceHandler = new
>> TCPConnectorInstanceHandler();
>> - }
>> - +
>> if (readThreadsCount > 1) {
>> initReadThreads();
>> selectorHandlers.clear();
>> @@ -599,22 +596,28 @@
>> * Return an instance of the default
>> <code>ConnectorHandler</code>, * which is the
>> <code>TCPConnectorHandler</code>
>> */
>> - public ConnectorHandler acquireConnectorHandler(){
>> - if (state != state.STARTED){
>> - throw new IllegalStateException("Controller not yet
>> started");
>> + public ConnectorHandler acquireConnectorHandler(Protocol protocol){
>> + ConnectorHandler ch = null;
>> + for (SelectorHandler selectorHandler: selectorHandlers){
>> + if (selectorHandler.protocol() == protocol){
>> + ch = selectorHandler.acquireConnectorHandler();
>> + ch.setController(this);
>> + break; + }
>> }
>> - - ConnectorHandler connectorHandler =
>> connectorInstanceHandler.acquire();
>> - connectorHandler.setController(this);
>> - return connectorHandler;
>> + return ch;
>> }
>> /**
>> * Release a ConnectorHandler.
>> */
>> - public void releaseConnectorHandler(ConnectorHandler
>> connectorHandler){
>> - connectorInstanceHandler.release(connectorHandler);
>> + public void releaseConnectorHandler(ConnectorHandler
>> connectorHandler){ + for (SelectorHandler
>> selectorHandler: selectorHandlers){
>> + if (selectorHandler.protocol() ==
>> connectorHandler.protocol()){
>> +
>> selectorHandler.releaseConnectorHandler(connectorHandler);
>> + }
>> + } }
>> @@ -646,12 +649,4 @@
>> logger.log(Level.SEVERE,"Controller.start()",ex);
>> }
>> }
>> -
>> - public ConnectorInstanceHandler getConnectorInstanceHandler() {
>> - return connectorInstanceHandler;
>> - }
>> -
>> - public void setConnectorInstanceHandler(ConnectorInstanceHandler
>> connectorInstanceHandler) {
>> - this.connectorInstanceHandler = connectorInstanceHandler;
>> - }
>> }
>>
>> Modified:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/ReadController.java
>> 2007-05-16 21:47:36+0000
>> @@ -53,7 +53,7 @@
>> channels.add(channel);
>> }
>> - getSelectorHandler().getSelector().wakeup();
>> + getSelectorHandler(Protocol.TCP).getSelector().wakeup();
>> }
>> /**
>> @@ -62,7 +62,8 @@
>> private void registerNewChannels() throws IOException {
>> synchronized(channels) {
>> int size = channels.size();
>> - TCPSelectorHandler selectorHandler =
>> (TCPSelectorHandler) getSelectorHandler();
>> + TCPSelectorHandler selectorHandler =
>> + (TCPSelectorHandler)
>> getSelectorHandler(Protocol.TCP);
>> Selector auxSelector = selectorHandler.getSelector();
>> for (int i = 0; i < size; i++) {
>> SocketChannel channel = channels.get(i);
>>
>> Modified:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SelectorHandler.java
>> 2007-05-16 21:47:36+0000
>> @@ -39,17 +39,26 @@
>> public interface SelectorHandler extends Handler, Copyable {
>> /**
>> + * A token decribing the protocol supported by an implementation
>> of this
>> + * interface
>> + */
>> + public Controller.Protocol protocol();
>> + + + /**
>> * Gets the underlying selector.
>> * @return underlying <code>Selector</code>
>> */
>> public Selector getSelector();
>> + /**
>> * Sets the underlying <code>Selector</code>
>> * @param selector underlying <code>Selector</code>
>> */
>> public void setSelector(Selector selector);
>>
>> + /**
>> * The SelectionKey that has been registered.
>> * @return <code>Set</code> of <code>SelectionKey</code>
>> @@ -149,4 +158,16 @@
>> */ public boolean onConnectInterest(SelectionKey
>> key,Context controllerCtx)
>> throws IOException; + + + /**
>> + * Return an instance of the <code>ConnectorHandler</code>
>> + */
>> + public ConnectorHandler acquireConnectorHandler();
>> + + + /**
>> + * Release a ConnectorHandler.
>> + */
>> + public void releaseConnectorHandler(ConnectorHandler
>> connectorHandler);
>> }
>>
>> Modified:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/SslTCPSelectorHandler.java
>> 2007-05-16 21:47:36+0000
>> @@ -179,6 +179,15 @@
>> /**
>> + * A token decribing the protocol supported by an implementation
>> of this
>> + * interface
>> + */
>> + public Controller.Protocol protocol(){
>> + return Controller.Protocol.TLS;
>> + } + + + /**
>> * Set the SSLContext required to support SSL over NIO.
>> * @param sslContext <code>SSLContext</code>
>> */
>>
>> Modified:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPConnectorHandler.java
>> 2007-05-16 21:47:36+0000
>> @@ -146,7 +146,7 @@
>> }
>> connect(remoteAddress,localAddress,callbackHandler,
>> -
>> (TCPSelectorHandler)controller.getSelectorHandler());
>> +
>> (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
>> }
>> @@ -281,7 +281,7 @@
>> }
>> connect(remoteAddress,localAddress,callbackHandler,
>> - (TCPSelectorHandler)controller.getSelectorHandler());
>> +
>> (TCPSelectorHandler)controller.getSelectorHandler(protocol()));
>> }
>> @@ -402,6 +402,15 @@
>> /**
>> + * A token decribing the protocol supported by an implementation
>> of this
>> + * interface
>> + */
>> + public Controller.Protocol protocol(){
>> + return Controller.Protocol.TCP;
>> + }
>> + + + /**
>> * Is the underlying SocketChannel connected.
>> */
>> public boolean isConnected(){
>>
>> Modified:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/TCPSelectorHandler.java
>> 2007-05-16 21:47:36+0000
>> @@ -56,7 +56,15 @@
>> * @author Jeanfrancois Arcand
>> */
>> public class TCPSelectorHandler implements SelectorHandler {
>> - +
>> + + /**
>> + * The ConnectorInstanceHandler used to return a new or pooled
>> + * ConnectorHandler
>> + */
>> + protected ConnectorInstanceHandler connectorInstanceHandler;
>> + + /**
>> * The list of SelectionKey to register next time the
>> Selector.select is
>> * invoked.
>> @@ -181,6 +189,7 @@
>> copyHandler.socketTimeout = socketTimeout;
>> copyHandler.logger = logger;
>> copyHandler.reuseAddress = reuseAddress;
>> + copyHandler.connectorInstanceHandler =
>> connectorInstanceHandler;
>> }
>>
>> @@ -215,6 +224,9 @@
>> public void preSelect(Context ctx) throws IOException {
>> if (selector == null){
>> try{
>> + +
>> connectorInstanceHandler = new TCPConnectorInstanceHandler();
>> + // Create the socket listener
>> selector = Selector.open();
>> @@ -358,7 +370,8 @@
>> if (opConnectToRegister== null){
>> opConnectToRegister = new ConcurrentHashMap
>> <SocketAddress[],CallbackHandler>();
>> - } + } +
>> opConnectToRegister.put(new SocketAddress[]{remoteAddress,localAddress},
>> callBackHandler);
>> selector.wakeup();
>> @@ -491,7 +504,37 @@
>> } return false;
>> }
>> -
>> + + + /**
>> + * Return an instance of the default
>> <code>ConnectorHandler</code>, + * which is the
>> <code>TCPConnectorHandler</code>
>> + */
>> + public ConnectorHandler acquireConnectorHandler(){
>> + if (selector == null && !selector.isOpen()){
>> + throw new IllegalStateException("SelectorHandler not yet
>> started");
>> + }
>> + + ConnectorHandler connectorHandler =
>> connectorInstanceHandler.acquire();
>> + return connectorHandler;
>> + }
>> + + + /**
>> + * Release a ConnectorHandler.
>> + */
>> + public void releaseConnectorHandler(ConnectorHandler
>> connectorHandler){
>> + connectorInstanceHandler.release(connectorHandler);
>> + }
>> + + + /**
>> + * A token decribing the protocol supported by an implementation
>> of this
>> + * interface
>> + */
>> + public Controller.Protocol protocol(){
>> + return Controller.Protocol.TCP;
>> + }
>> // ------------------------------------------------------ Utils
>> ----------//
>>
>> Added:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java?view=auto&rev=181
>>
>> ==============================================================================
>>
>> --- (empty file)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorHandler.java
>> 2007-05-16 21:47:36+0000
>> @@ -0,0 +1,449 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly;
>> +
>> +import com.sun.grizzly.util.ByteBufferInputStream;
>> +import com.sun.grizzly.util.OutputWriter;
>> +import java.io.IOException;
>> +import java.net.SocketAddress;
>> +import java.nio.ByteBuffer;
>> +import java.nio.channels.AlreadyConnectedException;
>> +import java.nio.channels.DatagramChannel;
>> +import java.nio.channels.NotYetConnectedException;
>> +import java.nio.channels.SelectionKey;
>> +import java.nio.channels.DatagramChannel;
>> +import java.util.concurrent.CountDownLatch;
>> +
>> +/**
>> + * Client side interface used to implement non blocking client
>> operation. + * Implementation of this class must make sure the
>> following methods are + * invoked in that order:
>> + * + * (1) connect()
>> + * (2) read() or write().
>> + * + * @author Jeanfrancois Arcand
>> + */
>> +public class UDPConnectorHandler implements
>> ConnectorHandler<UDPSelectorHandler>{
>> + + /**
>> + * The underlying UDPSelectorHandler used to mange SelectionKeys.
>> + */
>> + private UDPSelectorHandler selectorHandler;
>> + + + /**
>> + * A <code>CallbackHandler</code> handler invoked by the
>> UDPSelectorHandler
>> + * when a non blocking operation is ready to be processed.
>> + */
>> + private CallbackHandler callbackHandler;
>> +
>> + + /**
>> + * The connection's DatagramChannel.
>> + */
>> + private DatagramChannel datagramChannel;
>> + + + /**
>> + * Is the connection established.
>> + */
>> + private volatile boolean isConnected;
>> + + + /**
>> + * The internal Controller used (in case not specified).
>> + */
>> + private Controller controller;
>> +
>> + + /**
>> + * IsConnected Latch related
>> + */
>> + private CountDownLatch isConnectedLatch; + + + /**
>> + * Are we creating a controller every run.
>> + */
>> + private boolean isStandalone = false;
>> + + + /**
>> + * A blocking <code>InputStream</code> that use a pool of Selector
>> + * to execute a blocking read operation.
>> + */
>> + private ByteBufferInputStream inputStream; + + + /**
>> + * Connect to hostname:port. When an aysnchronous event happens
>> (e.g + * OP_READ or OP_WRITE), the <code>Controller</code> will
>> invoke + * the CallBackHandler.
>> + * @param remoteAddress remote address to connect
>> + * @param callbackHandler the handler invoked by the Controller
>> when + * an non blocking operation is ready to be handled.
>> + */
>> + public void connect(SocketAddress remoteAddress,
>> + CallbackHandler callbackHandler) throws
>> IOException { + +
>> connect(remoteAddress,null,callbackHandler); + } +
>> + + /**
>> + * Connect to hostname:port. When an aysnchronous event happens
>> (e.g + * OP_READ or OP_WRITE), the <code>Controller</code> will
>> invoke + * the CallBackHandler.
>> + * @param remoteAddress remote address to connect
>> + * @param localAddress local address to bind
>> + * @param callbackHandler the handler invoked by the Controller
>> when + * an non blocking operation is ready to be handled.
>> + */
>> + public void connect(SocketAddress remoteAddress, SocketAddress
>> localAddress,
>> + CallbackHandler callbackHandler) throws
>> IOException { + + if (controller == null){
>> + throw new IllegalStateException("Controller cannot be
>> null");
>> + }
>> + + connect(remoteAddress,localAddress,callbackHandler,
>> +
>> (UDPSelectorHandler)controller.getSelectorHandler(protocol()));
>> + }
>> + + + /**
>> + * Connect to hostname:port. When an aysnchronous event happens
>> (e.g + * OP_READ or OP_WRITE), the <code>Controller</code> will
>> invoke + * the CallBackHandler.
>> + * @param remoteAddress remote address to connect
>> + * @param callbackHandler the handler invoked by the Controller
>> when + * an non blocking operation is ready to be handled.
>> + * @param selectorHandler an instance of SelectorHandler.
>> + */
>> + public void connect(SocketAddress remoteAddress,
>> + CallbackHandler callbackHandler,
>> + UDPSelectorHandler selectorHandler) throws
>> IOException { + +
>> connect(remoteAddress,null,callbackHandler,selectorHandler); + }
>> + + /**
>> + * Connect to hostname:port. When an aysnchronous event happens
>> (e.g + * OP_READ or OP_WRITE), the <code>Controller</code> will
>> invoke + * the CallBackHandler.
>> + * @param remoteAddress remote address to connect
>> + * @param localAddress local address to bin
>> + * @param callbackHandler the handler invoked by the Controller
>> when + * an non blocking operation is ready to be handled.
>> + * @param selectorHandler an instance of SelectorHandler.
>> + */
>> + public void connect(SocketAddress remoteAddress, SocketAddress
>> localAddress,
>> + CallbackHandler callbackHandler,
>> + UDPSelectorHandler selectorHandler) throws
>> IOException {
>> + + if (isConnected){
>> + throw new AlreadyConnectedException();
>> + }
>> + + if (controller == null){
>> + throw new IllegalStateException("Controller cannot be
>> null");
>> + }
>> + + if (selectorHandler == null){
>> + throw new IllegalStateException("Controller cannot be
>> null");
>> + }
>> + + this.selectorHandler = selectorHandler;
>> + this.callbackHandler = callbackHandler; +
>> + // Wait for the onConnect to be invoked.
>> + isConnectedLatch = new CountDownLatch(1); + +
>> selectorHandler.connect(remoteAddress,localAddress,callbackHandler);
>> + inputStream = new ByteBufferInputStream();
>> + + try {
>> + isConnectedLatch.await();
>> + } catch (InterruptedException ex) {
>> + ; // LOG ME
>> + }
>> + }
>> +
>> + + /**
>> + * Connect to hostname:port. Internally an instance of
>> Controller and
>> + * its default SelectorHandler will be created everytime this
>> method is + * called. This method should be used only and only if
>> no external + * Controller has been initialized.
>> + * @param remoteAddress remote address to connect
>> + */
>> + public void connect(SocketAddress remoteAddress) +
>> throws IOException {
>> + connect(remoteAddress,(SocketAddress)null); + }
>> + + + /**
>> + * Connect to hostname:port. Internally an instance of
>> Controller and
>> + * its default SelectorHandler will be created everytime this
>> method is + * called. This method should be used only and only if
>> no external + * Controller has been initialized.
>> + * @param remoteAddress remote address to connect
>> + * @param localAddress local address to bin
>> + */
>> + public void connect(SocketAddress remoteAddress, SocketAddress
>> localAddress) + throws IOException {
>> + + if (isConnected){
>> + throw new AlreadyConnectedException();
>> + }
>> + + if (controller == null){ + isStandalone
>> = true;
>> + controller = new Controller();
>> + controller.setSelectorHandler(new
>> UDPSelectorHandler(true));
>> + DefaultPipeline pipeline = new DefaultPipeline();
>> + pipeline.setMaxThreads(1);
>> + pipeline.initPipeline();
>> + pipeline.startPipeline();
>> + controller.setPipeline(pipeline);
>> +
>> + callbackHandler = new CallbackHandler<SelectionKey>(){
>> + public void onConnect(IOEvent<SelectionKey> ioEvent) {
>> + SelectionKey key = ioEvent.attachment();
>> + finishConnect(key);
>> + }
>> + public void onRead(IOEvent<SelectionKey> ioEvent) {
>> + }
>> + public void onWrite(IOEvent<SelectionKey> ioEvent) {
>> + }
>> + };
>> +
>> + final CountDownLatch latch = new CountDownLatch(1);
>> + try{
>> + pipeline.execute(new Context(){
>> + public Object call() throws Exception {
>> + latch.countDown();
>> + controller.start();
>> + return null;
>> + } + });
>> + } catch (PipelineFullException ex){
>> + throw new IOException(ex.getMessage());
>> + }
>> + + try {
>> + latch.await(); + Thread.sleep(1000);
>> + } catch (InterruptedException ex) {
>> + }
>> + }
>> + + connect(remoteAddress,localAddress,callbackHandler,
>> +
>> (UDPSelectorHandler)controller.getSelectorHandler(protocol()));
>> + }
>> + + + /**
>> + * Register the <code>CallbackHandler</code> so when an asynchonous
>> + * event occurs, the <code>CallbackHandler</code> will be
>> invoked. + * @param callbackHandler the handler invoked by the
>> Controller when + * an non blocking operation is ready to
>> be handled.
>> + */
>> + public void register(CallbackHandler callbackHandler){
>> + this.callbackHandler = callbackHandler;
>> + }
>> + + + /**
>> + * Read bytes. If blocking is set to <tt>true</tt>, a pool of
>> temporary
>> + * <code>Selector</code> will be used to read bytes.
>> + * @param byteBuffer The byteBuffer to store bytes.
>> + * @param blocking <tt>true</tt> if a a pool of temporary Selector
>> + * is required to handle a blocking read.
>> + */
>> + public long read(ByteBuffer byteBuffer, boolean blocking) throws
>> IOException {
>> + if (!isConnected){
>> + throw new NotYetConnectedException();
>> + }
>> + + SelectionKey key =
>> datagramChannel.keyFor(selectorHandler.getSelector());
>> + if (blocking){
>> + inputStream.setSelectionKey(key);
>> + inputStream.setChannelType(
>> + ByteBufferInputStream.ChannelType.DatagramChannel);
>> + int nRead = inputStream.read(byteBuffer);
>> + return nRead;
>> + } else {
>> + if (callbackHandler == null){
>> + throw new IllegalStateException
>> + ("Non blocking read needs a CallbackHandler");
>> + }
>> + int nRead = datagramChannel.read(byteBuffer);
>> +
>> + if (nRead == 0){
>> + key.attach(callbackHandler);
>> + selectorHandler.register(key,
>> + SelectionKey.OP_READ|SelectionKey.OP_WRITE);
>> + }
>> + return nRead;
>> + }
>> + }
>> + + + /**
>> + * Writes bytes. If blocking is set to <tt>true</tt>, a pool of
>> temporary
>> + * <code>Selector</code> will be used to writes bytes.
>> + * @param byteBuffer The byteBuffer to write.
>> + * @param blocking <tt>true</tt> if a a pool of temporary Selector
>> + * is required to handle a blocking write.
>> + */ + public long write(ByteBuffer byteBuffer, boolean
>> blocking) throws IOException {
>> + if (!isConnected){
>> + throw new NotYetConnectedException();
>> + }
>> + + SelectionKey key =
>> datagramChannel.keyFor(selectorHandler.getSelector());
>> + if (blocking){
>> + throw new IllegalStateException("Blocking mode not
>> supported");
>> + } else {
>> + if (callbackHandler == null){
>> + throw new IllegalStateException
>> + ("Non blocking write needs a CallbackHandler");
>> + }
>> + int nWrite = datagramChannel.write(byteBuffer);
>> + + if (nWrite == 0){
>> + key.attach(callbackHandler);
>> + selectorHandler.register(key,
>> + SelectionKey.OP_READ|SelectionKey.OP_WRITE);
>> + } + return nWrite;
>> + }
>> + }
>> + + + /**
>> + * Receive bytes.
>> + * @param byteBuffer The byteBuffer to store bytes.
>> + */
>> + public long send(ByteBuffer byteBuffer, SocketAddress
>> socketAddress) + throws IOException {
>> + if (!isConnected){
>> + throw new NotYetConnectedException();
>> + }
>> + + if (callbackHandler == null){
>> + throw new IllegalStateException
>> + ("Non blocking read needs a CallbackHandler");
>> + }
>> +
>> + return datagramChannel.send(byteBuffer,socketAddress);
>> + }
>> + + + /**
>> + * Receive bytes.
>> + * @param byteBuffer The byteBuffer to store bytes.
>> + */
>> + public SocketAddress receive(ByteBuffer byteBuffer) throws
>> IOException {
>> + if (!isConnected){
>> + throw new NotYetConnectedException();
>> + }
>> + + SelectionKey key =
>> datagramChannel.keyFor(selectorHandler.getSelector());
>> + + if (callbackHandler == null){
>> + throw new IllegalStateException
>> + ("Non blocking read needs a CallbackHandler");
>> + }
>> + + SocketAddress socketAddress =
>> datagramChannel.receive(byteBuffer);
>> + return socketAddress;
>> + }
>> + + + /**
>> + * Close the underlying connection.
>> + */
>> + public void close() throws IOException{
>> + if (datagramChannel != null){
>> + if (selectorHandler != null){
>> + SelectionKey key = +
>> datagramChannel.keyFor(selectorHandler.getSelector());
>> + + if (key == null) return;
>> + + key.cancel();
>> + key.attach(null);
>> + }
>> + datagramChannel.close(); + }
>> + + if (controller != null && isStandalone){
>> + controller.stop();
>> + controller = null;
>> + } + isStandalone = false;
>> + }
>> +
>> + + /**
>> + * Finish handling the OP_CONNECT interest ops.
>> + */
>> + public void finishConnect(SelectionKey key){
>> + datagramChannel = (DatagramChannel)key.channel();
>> + isConnected = datagramChannel.isConnected();
>> + isConnectedLatch.countDown();
>> + }
>> + + + /**
>> + * A token decribing the protocol supported by an implementation
>> of this
>> + * interface
>> + */
>> + public Controller.Protocol protocol(){
>> + return Controller.Protocol.UDP;
>> + }
>> + + + /**
>> + * Is the underlying DatagramChannel connected.
>> + */
>> + public boolean isConnected(){
>> + return isConnected;
>> + } +
>> + + public Controller getController() {
>> + return controller;
>> + }
>> +
>> + + public void setController(Controller controller) {
>> + this.controller = controller;
>> + }
>> + +}
>>
>> Added:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java?view=auto&rev=181
>>
>> ==============================================================================
>>
>> --- (empty file)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPConnectorInstanceHandler.java
>> 2007-05-16 21:47:36+0000
>> @@ -0,0 +1,66 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly;
>> +
>> +import java.util.concurrent.ConcurrentLinkedQueue;
>> +
>> +/**
>> + * Default <code>ConnectorInstanceHandler</code> which use a + *
>> <code>List</code> to pool <code>ConnectorHandler</code>
>> + *
>> + * @author Jeanfrancois
>> + */
>> +public class UDPConnectorInstanceHandler implements +
>> ConnectorInstanceHandler<UDPConnectorHandler> {
>> + + /**
>> + * Simple queue used to pool <code>UDPConnectorHandler</code>
>> + */
>> + private ConcurrentLinkedQueue<UDPConnectorHandler> pool;
>> + + + public UDPConnectorInstanceHandler(){
>> + pool = new ConcurrentLinkedQueue<UDPConnectorHandler>();
>> + }
>> + + /**
>> + * Acquire a <code>ConnectorHandler</code>
>> + */
>> + public UDPConnectorHandler acquire(){
>> + UDPConnectorHandler UDPConnectorHandler = pool.poll();
>> + if (UDPConnectorHandler == null){
>> + UDPConnectorHandler = new UDPConnectorHandler();
>> + }
>> + return UDPConnectorHandler;
>> + }
>> + + + /**
>> + * Release a <code>ConnectorHandler</code>
>> + */
>> + public void release(UDPConnectorHandler connectorHandler){
>> + pool.offer(connectorHandler);
>> + }
>> + +}
>>
>> Modified:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
>> 2007-05-16 21:47:36+0000
>> @@ -28,10 +28,15 @@
>> import java.net.BindException;
>> import java.net.DatagramSocket;
>> import java.net.InetSocketAddress;
>> +import java.net.SocketAddress;
>> import java.net.SocketException;
>> import java.nio.channels.DatagramChannel;
>> +import java.nio.channels.SelectableChannel;
>> import java.nio.channels.SelectionKey;
>> import java.nio.channels.Selector;
>> +import java.nio.channels.SocketChannel;
>> +import java.util.Iterator;
>> +import java.util.concurrent.ConcurrentHashMap;
>> import java.util.concurrent.ConcurrentLinkedQueue;
>> import java.util.logging.Level;
>>
>> @@ -66,6 +71,11 @@
>> public UDPSelectorHandler() {
>> }
>> + + + public UDPSelectorHandler(boolean isClient) {
>> + super(isClient);
>> + }
>>
>> @Override
>> @@ -84,6 +94,7 @@
>> public void preSelect(Context ctx) throws IOException {
>> if (selector == null){
>> try{
>> + connectorInstanceHandler = new
>> UDPConnectorInstanceHandler();
>> datagramChannel = DatagramChannel.open();
>> datagramChannel.configureBlocking(false);
>> selector = Selector.open(); @@
>> -103,7 +114,76 @@
>> datagramSocket.setSoTimeout(serverTimeout);
>> } else {
>> - super.preSelect(ctx);
>> + if (opReadToRegister == null){
>> + opReadToRegister = new
>> ConcurrentLinkedQueue<SelectionKey>();
>> + }
>> +
>> + SelectionKey key;
>> + int size = opReadToRegister.size();
>> + long currentTime = 0L;
>> + if (size > 0){
>> + currentTime = System.currentTimeMillis();
>> + }
>> +
>> + for (int i=0; i < size; i++) {
>> + key = opReadToRegister.poll();
>> +
>> + key.interestOps(key.interestOps() |
>> SelectionKey.OP_READ);
>> +
>> + if (key.attachment() == null)
>> + key.attach(currentTime);
>> + }
>> + + if (opWriteToRegister== null){
>> + opWriteToRegister = new
>> ConcurrentLinkedQueue<SelectionKey>();
>> + }
>> + + size = opWriteToRegister.size();
>> + currentTime = 0L;
>> + if (size > 0){
>> + currentTime = System.currentTimeMillis();
>> + }
>> +
>> + for (int i=0; i < size; i++) {
>> + key = opWriteToRegister.poll();
>> +
>> + key.interestOps(key.interestOps() |
>> SelectionKey.OP_WRITE);
>> +
>> + if (key.attachment() == null)
>> + key.attach(currentTime);
>> + } + + if (opConnectToRegister==
>> null){
>> + opConnectToRegister = new ConcurrentHashMap
>> + <SocketAddress[],CallbackHandler>();
>> + }
>> + + size = opConnectToRegister.size();
>> + currentTime = 0L;
>> + if (size > 0){
>> + currentTime = System.currentTimeMillis();
>> + } else {
>> + return;
>> + }
>> +
>> + CallbackHandler<SocketChannel> callbackHandler;
>> + Iterator<SocketAddress[]> iterator =
>> + opConnectToRegister.keySet().iterator();
>> + SocketAddress[] remoteLocal;
>> + while(iterator.hasNext()){
>> + remoteLocal = iterator.next();
>> + final DatagramChannel datagramChannel =
>> DatagramChannel.open();
>> + datagramChannel.socket().setReuseAddress(true);
>> + if (remoteLocal[1] != null){
>> + datagramChannel.socket().bind(remoteLocal[1]);
>> + }
>> + datagramChannel.configureBlocking(false);
>> + datagramChannel.connect(remoteLocal[0]);
>> + key = datagramChannel.register(selector,
>> + SelectionKey.OP_READ|SelectionKey.OP_WRITE,
>> + opConnectToRegister.remove(remoteLocal));
>> + onConnectInterest(key, ctx);
>> + } }
>> }
>>
>> @@ -153,28 +233,88 @@
>> /**
>> * Handle OP_READ.
>> */
>> - @Override
>> - public boolean onReadInterest(SelectionKey key,Context ctx) +
>> public boolean onReadInterest(final SelectionKey key,Context ctx)
>> throws IOException{
>> - // disable OP_READ on key before doing anything
>> else key.interestOps(key.interestOps() &
>> (~SelectionKey.OP_READ));
>> - return true;
>> + if (key.attachment() instanceof CallbackHandler){
>> + ((CallbackHandler)key.attachment())
>> + .onRead(new IOEvent<SelectionKey>(){
>> + + public SelectionKey
>> attach(SelectionKey sc){
>> + return key;
>> + }
>> + + public SelectionKey attachment(){
>> + return key;
>> + } + });
>> + return false;
>> + } else {
>> + return true;
>> + }
>> }
>>
>> /**
>> * Handle OP_WRITE.
>> + * */ - @Override
>> - public boolean onWriteInterest(SelectionKey key, Context ctx)
>> + public boolean onWriteInterest(final SelectionKey key, Context ctx)
>> throws IOException{
>> - + // disable OP_WRITE on key before doing anything
>> else key.interestOps(key.interestOps() &
>> (~SelectionKey.OP_WRITE));
>> - return true;
>> + if (key.attachment() instanceof CallbackHandler){
>> + ((CallbackHandler)key.attachment())
>> + .onWrite(new IOEvent<SelectionKey>(){
>> + + public SelectionKey
>> attach(SelectionKey sc){
>> + return key;
>> + }
>> + + public SelectionKey attachment(){
>> + return key;
>> + } + });
>> + return false;
>> + } else {
>> + return true;
>> + }
>> + }
>> +
>> + + /**
>> + * Handle OP_CONNECT.
>> + */ + public boolean onConnectInterest(final
>> SelectionKey key, Context ctx) + throws IOException{
>> + if (key.attachment() instanceof CallbackHandler){
>> + ((CallbackHandler)key.attachment())
>> + .onConnect(new IOEvent<SelectionKey>(){
>> + + public SelectionKey
>> attach(SelectionKey sc){
>> + return key;
>> + }
>> + + public SelectionKey attachment(){
>> + return key;
>> + } + });
>> + + } + return false;
>> }
>> + /**
>> + * A token decribing the protocol supported by an implementation
>> of this
>> + * interface
>> + */
>> + public Controller.Protocol protocol(){
>> + return Controller.Protocol.UDP;
>> + } + + public int getSsBackLog() {
>> throw new IllegalStateException(NOT_SUPPORTED);
>> }
>>
>> Modified:
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java&p2=trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/main/java/com/sun/grizzly/util/ByteBufferInputStream.java
>> 2007-05-16 21:47:36+0000
>> @@ -26,8 +26,10 @@
>> import java.io.InputStream;
>> import java.io.IOException;
>> import java.nio.ByteBuffer;
>> +import java.nio.channels.DatagramChannel;
>> import java.nio.channels.SelectionKey;
>> import java.nio.channels.Selector;
>> +import java.nio.channels.ReadableByteChannel;
>> import java.nio.channels.SocketChannel;
>>
>> /**
>> @@ -39,6 +41,12 @@
>> */
>> public class ByteBufferInputStream extends InputStream {
>>
>> + public enum ChannelType { SocketChannel, DatagramChannel }
>> + + + private ChannelType defaultChannelType =
>> ChannelType.SocketChannel;
>> + + private static int defaultReadTimeout = 15000;
>> /**
>> @@ -238,9 +246,9 @@
>> SelectionKey tmpKey = null;
>>
>> try{
>> - SocketChannel socketChannel = (SocketChannel)key.channel();
>> + ReadableByteChannel readableChannel =
>> (ReadableByteChannel)key.channel();
>> while (count > 0){
>> - count = socketChannel.read(byteBuffer);
>> + count = readableChannel.read(byteBuffer);
>> if ( count > -1 )
>> byteRead += count;
>> else
>> @@ -254,8 +262,15 @@
>> return 0;
>> }
>> count = 1;
>> - tmpKey = socketChannel
>> -
>> .register(readSelector,SelectionKey.OP_READ);
>> + + tmpKey = null;
>> + if (defaultChannelType == ChannelType.SocketChannel){
>> + tmpKey = ((SocketChannel)readableChannel)
>> +
>> .register(readSelector,SelectionKey.OP_READ); + } else {
>> + tmpKey = ((DatagramChannel)readableChannel)
>> +
>> .register(readSelector,SelectionKey.OP_READ);
>> + }
>> tmpKey.interestOps(tmpKey.interestOps() |
>> SelectionKey.OP_READ);
>> int code = readSelector.select(readTimeout);
>> tmpKey.interestOps(
>> @@ -266,7 +281,7 @@
>> }
>>
>> while (count > 0){
>> - count = socketChannel.read(byteBuffer);
>> + count = readableChannel.read(byteBuffer);
>> if ( count > -1 )
>> byteRead += count;
>> else
>> @@ -318,5 +333,15 @@
>> public static void setDefaultReadTimeout(int aDefaultReadTimeout) {
>> defaultReadTimeout = aDefaultReadTimeout;
>> }
>> +
>> + + public ChannelType getChannelType() {
>> + return defaultChannelType;
>> + }
>> +
>> + + public void setChannelType(ChannelType channelType) {
>> + this.defaultChannelType = channelType;
>> + }
>> }
>>
>>
>> Modified:
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java&p2=trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/TCPConnectorHandlerTest.java
>> 2007-05-16 21:47:36+0000
>> @@ -53,7 +53,8 @@
>> public void testSimplePacket() throws IOException {
>> final Controller controller = createController();
>> ControllerUtils.startController(controller);
>> - final ConnectorHandler tcpConnector =
>> controller.acquireConnectorHandler();
>> + final ConnectorHandler tcpConnector = +
>> controller.acquireConnectorHandler(Controller.Protocol.TCP);
>>
>
>> try {
>> final byte[] testData = "Hello".getBytes();
>> @@ -82,7 +83,7 @@
>> int nRead = socketChannel.read(readBB);
>> if (nRead == 0 && readTry++ < 2){
>> key.attach(callbackHandler);
>> - controller.getSelectorHandler()
>> +
>> controller.getSelectorHandler(Controller.Protocol.TCP)
>> .register(key,SelectionKey.OP_READ);
>> } else {
>> responseArrivedLatch.countDown();
>> @@ -102,7 +103,7 @@
>>
>> if (nWrite == 0){
>> key.attach(callbackHandler);
>> - controller.getSelectorHandler()
>> +
>> controller.getSelectorHandler(Controller.Protocol.TCP)
>>
>> .register(key,SelectionKey.OP_WRITE);
>> return;
>> } @@ -144,6 +145,7 @@
>> } finally { try{
>> tcpConnector.close();
>> + controller.releaseConnectorHandler(tcpConnector);
>> controller.stop();
>> } catch (Throwable t){
>> t.printStackTrace();
>>
>> Added:
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java?view=auto&rev=181
>>
>> ==============================================================================
>>
>> --- (empty file)
>> +++
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/NonBlockingUDPIOClient.java
>> 2007-05-16 21:47:36+0000
>> @@ -0,0 +1,75 @@
>> +/*
>> + * The contents of this file are subject to the terms
>> + * of the Common Development and Distribution License
>> + * (the License). You may not use this file except in
>> + * compliance with the License.
>> + *
>> + * You can obtain a copy of the license at
>> + * https://glassfish.dev.java.net/public/CDDLv1.0.html or
>> + * glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * See the License for the specific language governing
>> + * permissions and limitations under the License.
>> + *
>> + * When distributing Covered Code, include this CDDL
>> + * Header Notice in each file and include the License file
>> + * at glassfish/bootstrap/legal/CDDLv1.0.txt.
>> + * If applicable, add the following below the CDDL Header,
>> + * with the fields enclosed by brackets [] replaced by
>> + * you own identifying information:
>> + * "Portions Copyrighted [year] [name of copyright owner]"
>> + *
>> + * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
>> + */
>> +
>> +package com.sun.grizzly.utils;
>> +
>> +import com.sun.grizzly.UDPConnectorHandler;
>> +import java.io.IOException;
>> +import java.net.InetSocketAddress;
>> +import java.net.SocketAddress;
>> +import java.nio.ByteBuffer;
>> +
>> +/**
>> + * UDP client that exercise the non blocking ConnectorHandler.
>> + *
>> + * @author Jeanfrancois Arcand
>> + */
>> +public class NonBlockingUDPIOClient {
>> + private String host;
>> + private int port;
>> + + private UDPConnectorHandler udpConnectorHandler;
>> + + public NonBlockingUDPIOClient(String host, int port) {
>> + this.host = host;
>> + this.port = port;
>> + }
>> + + public void connect() throws IOException {
>> + udpConnectorHandler = new UDPConnectorHandler();
>> + udpConnectorHandler.connect(new InetSocketAddress(host,port));
>> + }
>> + + public void send(byte[] msg) throws IOException {
>> + udpConnectorHandler.send(ByteBuffer.wrap(msg),
>> + new InetSocketAddress(host,port));
>> + }
>> + + public int receive(byte[] buf) throws IOException {
>> + return receive(buf, buf.length);
>> + }
>> +
>> + public int receive(byte[] buf, int length) throws IOException {
>> + ByteBuffer byteBuffer = ByteBuffer.wrap(buf,0,length);
>> + SocketAddress sa = udpConnectorHandler.receive(byteBuffer);
>> + System.out.println("sa: " + sa);
>> + System.out.println("nRead: " + byteBuffer.position());
>> + return byteBuffer.position();
>> + }
>> + + public void close() throws IOException {
>> + if (udpConnectorHandler != null) {
>> + udpConnectorHandler.close();
>> + }
>> + }
>> +}
>>
>> Modified:
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java?view=diff&rev=181&p1=trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java&p2=trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java
>> (original)
>> +++
>> trunk/modules/grizzly/src/test/java/com/sun/grizzly/utils/UDPIOClient.java
>> 2007-05-16 21:47:36+0000
>> @@ -45,6 +45,8 @@
>> public void connect() throws IOException {
>> socket = new DatagramSocket();
>> socket.connect(InetAddress.getByName(host), port);
>> + + }
>> public void send(byte[] msg) throws IOException {
>>
>> Modified:
>> trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java
>>
>> Url:
>> https://grizzly.dev.java.net/source/browse/grizzly/trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java?view=diff&rev=181&p1=trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java&p2=trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java&r1=180&r2=181
>>
>> ==============================================================================
>>
>> ---
>> trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java
>> (original)
>> +++
>> trunk/modules/http/src/main/java/com/sun/grizzly/http/SelectorThread.java
>> 2007-05-16 21:47:36+0000
>> @@ -597,7 +597,8 @@
>> * running of this thread.
>> */
>> public void registerKey(SelectionKey key){
>> -
>> controller.getSelectorHandler().register(key,SelectionKey.OP_READ);
>> + controller.getSelectorHandler(Controller.Protocol.TCP)
>> + .register(key,SelectionKey.OP_READ);
>> }
>> // --------------------------------------------------------------
>> Init //
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: commits-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: commits-help_at_grizzly.dev.java.net
>>
>>
>>
>>
>> ------------------------------------------------------------------------
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
>> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe_at_grizzly.dev.java.net
> For additional commands, e-mail: dev-help_at_grizzly.dev.java.net
>