users@grizzly.java.net

Re: Issue with TunnelFilter and StringFilter example

From: Donald Walters <donald.walters_at_gmail.com>
Date: Mon, 12 Sep 2011 06:27:30 -0500

Hello Oleksiy,

Please see below the essential classes attached. this is after many desperate attempts to solve the issue but still this does not work.
Let me know if I need to provide something else.

Basically what should happen is that I receive a some information from one server passing to the next but between then i must do some checks. The data is basically some XML but the first 4 bytes of data is the length of the incoming data in Little Endian reverse.

(little endian size of incoming data)+XML data.

...<upms><msg type="bind"/></upms>….<upms><msg type="bind"/></upms>...<upms><msg type="bind"/></upms>

I need the StringFilter to delimit the stream then i process it and return the result. My result must be calculated the same with the length of the bytes before the XML and sent to which ever party. I use this delimiter because for some reason when I try to make the little indian calculation it fails with the creating the buffers. So i delimit it, process it and it it is ok i pass the original xml and size that i got but if the stream is continuos i need to split it up and process it then pass along the separated with the calculated indian format.

I f i need to provide some else let me know. I really need this

Thanks, Donald

public class ProxyServer {

    private static final Logger log = Logger.getLogger("PROXY");

    public static void main(String[] args) throws IOException {
     

        // Parse command line options.
        int localPort = Integer.parseInt(config.getProperty(Config.RUNTIME_LOCAL_PORT));
        String remoteHost = config.getProperty(Config.RUNTIME_REMOTE_HOST);
        int remotePort = Integer.parseInt(config.getProperty(Config.RUNTIME_REMOTE_PORT));
        
        
        log.info(
                "Proxying *:" + localPort + " to "
                + remoteHost + ':' + remotePort + " ...");

        // Create TCP transport
        final TCPNIOTransport transport =
                TCPNIOTransportBuilder.newInstance().build();

        // Create a FilterChain using FilterChainBuilder
        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();

        // Add TransportFilter, which is responsible
        // for reading and writing data to the connection
        filterChainBuilder.add(new TransportFilter());
        
        /delimit requests coming in
        filterChainBuilder.add(new StringFilter( Charset.defaultCharset(),"</upms>"));
        //to convert string to Buffer
       filterChainBuilder.add(new InterceptingFilter());

        filterChainBuilder.add(new CoreFilter());
        
        //proxy filter for communication between end points
        filterChainBuilder.add(new ProxyFilter(TCPNIOConnectorHandler.builder(transport).build(), remoteHost, remotePort));

        transport.setProcessor(filterChainBuilder.build());

        try {
            // binding transport to start listen on certain host and port
            transport.bind("localhost", localPort);

            // start the transport
            transport.start();
        } catch(Exception e){
            log.error("error in startup ", e);
        }
    }
}
===========================

public class InterceptingFilter extends BaseFilter {
    private static final Logger log = Logger.getLogger(InterceptingFilter.class);

    @Override
    public NextAction handleRead(FilterChainContext ctx)
            throws IOException {
        Object peerAddress = ctx.getAddress();

        String message = ctx.getMessage();
        
        log.debug("incoming "+message.getBytes().length + "\n" +message);
        
        
        ByteArrayOutputStream writer = new ByteArrayOutputStream();
//
            writer.write(message.getBytes());
            
            String out = writer.toString();
        log.debug(out.toString());
        
        HeapBuffer buf = new HeapMemoryManager().allocate(out.getBytes().length);
        buf.put(out.getBytes());
        buf.position(0);
        buf.get();
        
        log.debug(buf.toStringContent());
        
        ctx.setMessage(buf);

// ctx.write(peerAddress, message, null);

        return ctx.getInvokeAction();
    
    }
}

====================================

public class CoreFilter extends BaseFilter {

    private static final String UPMS = "<upms>";
    private static final Logger log = Logger.getLogger(CoreFilter.class.getName());
    private static final Logger eventslog = Logger.getLogger("events");
    private Executor scheduler = Executors.newCachedThreadPool();


    @Override
    public NextAction handleRead(final FilterChainContext ctx)
            throws IOException {

        if (ctx.state() == FilterChainContext.State.RUNNING) {
            return ctx.getInvokeAction();
        }

        if (ctx.getMessage() == null) {
            return ctx.getStopAction();
        }
        
// String raw = ctx.getMessage();

        final Buffer raw = ctx.getMessage();
        eventslog.debug("raw message \n" + raw.toStringContent());

        final NextAction suspendAction = ctx.getSuspendAction();

        // suspend the current execution
        ctx.suspend();

        // schedule async work
        scheduler.execute(new ProxyHandler(ctx, raw));

        // return suspend status

        return suspendAction;
    }

    private boolean isBegin(String message) {
        if(GenericValidator.isBlankOrNull(message))
            return false;
        
        boolean begin = false;

        try {
            XPathFactory xpathf = XPathFactory.newInstance();
            XPath xpath = xpathf.newXPath();
            String expr = "/upms/msg[@type='begin']/_at_type";
            String result = xpath.evaluate(expr, new InputSource(new StringReader(message)));

            if (!GenericValidator.isBlankOrNull(result)) {
                begin = true;
            }
        } catch (Exception e) {
            log.error("\n"+message);
            log.error("error checking incoming message ", e);
            
        }

        return begin;
    }

    private void sendEndMessage(FilterChainContext context, String incomingXml, String msgId)
            throws Exception {

        try {
            
            String endXml = Util.getEndMessage(msgId, incomingXml);
            
            int msgLen = endXml.length();

            StreamWriter writer = StandaloneProcessor.INSTANCE.getStreamWriter(context.getConnection());

            writer.writeByte((byte) msgLen);

            for (int i = 1; i <= 3; i++) {
                writer.writeByte((byte) (msgLen >> i * 8));
            }

            writer.writeByteArray(endXml.getBytes(), 0, msgLen);
            Future writeFuture = writer.flush();
            writeFuture.get();
            eventslog.debug("write proxy end message " + endXml);
        } catch (Exception e) {
            log.error("error writing end message", e);
            throw new Exception(e);
        }
    }

    private class ProxyHandler implements Runnable {

        private FilterChainContext ctx;
        private Buffer raw;

        public ProxyHandler(FilterChainContext context, Buffer incomingXml) {
            this.ctx = context;
            this.raw = incomingXml;
        }

        @Override
        public void run() {

            try {
                String msg = raw.toStringContent();

                msg = msg.substring(msg.indexOf(UPMS));
                if(!msg.contains("</upms>"))
                    msg += "</upms>";
                
                log.debug("parsed message \n" + msg);

                if (isBegin(msg.trim())) {
                    int status = CoreHandler.charge(msg);
                    log.debug("PROXY Charging status=" + status);


                    switch (status) {
                        case CoreHandler.ERROR:
                            sendEndMessage(ctx, msg, "error");
                            ctx.setMessage(null);
                            break;
                        case CoreHandler.INSUFFICEINT_FUNDS:
                            sendEndMessage(ctx, msg, "no.funds");
                            ctx.setMessage(null);
                            break;
                        default:
                            log.debug("successful--passing on " + msg);
                            ctx.setMessage(raw);
                            break;
                    }

                } else {
                    ctx.setMessage(raw);
                }
            } catch (Exception e) {
                log.error("error processing", e);
                ctx.setMessage(null);
            }


            // Resume the FilterChain IOEvent processing
            ctx.resume();
        }
    }
}

=====================

public class ProxyFilter extends BaseFilter {

    private static final Logger logger = Logger.getLogger(ProxyFilter.class);
    private Attribute<Connection> peerConnectionAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("TunnelFilter.peerConnection");
    private final SocketConnectorHandler transport;
    private final SocketAddress redirectAddress;


    public ProxyFilter(SocketConnectorHandler transport, String host, int port) {
        this(transport, new InetSocketAddress(host, port));
    }

    public ProxyFilter(SocketConnectorHandler transport, SocketAddress redirectAddress) {
        this.transport = transport;
        this.redirectAddress = redirectAddress;
    }

    @Override
    public NextAction handleRead(final FilterChainContext ctx)
            throws IOException {

        Connection connection = ctx.getConnection();
        final Connection peerConnection = (Connection) this.peerConnectionAttribute.get(connection);

        if (!connection.isOpen()) {
            return ctx.getStopAction();
        }

        if (peerConnection == null) {
            this.transport.connect(this.redirectAddress, new ConnectCompletionHandler(ctx));

            return ctx.getSuspendAction();
        }


        redirectToPeer(ctx, peerConnection);


        return ctx.getStopAction();
    }

    @Override
    public NextAction handleClose(FilterChainContext ctx)
            throws IOException {
        Connection connection = ctx.getConnection();
        Connection peerConnection = (Connection) this.peerConnectionAttribute.get(connection);

        if ((peerConnection != null) && (peerConnection.isOpen())) {
            peerConnection.close();
        }

        return ctx.getInvokeAction();
    }

    private static void redirectToPeer(FilterChainContext context, Connection peerConnection)
            throws IOException {
        Connection srcConnection = context.getConnection();
        Buffer message = context.getMessage();
        logger.debug("Proxying messaging from " + srcConnection.getPeerAddress() + " to " + peerConnection.getPeerAddress());
        
        
        
// String message = context.getMessage();
        logger.debug(message.toStringContent());
   
        peerConnection.write(message);
// sendMessage(peerConnection, message.toStringContent());
    
    }
    
    private static void sendMessage(Connection connection, String outgoingXml){

        try {
            String endXml = outgoingXml.substring(outgoingXml.indexOf("<upms>")).trim()+"</upms>";
        ByteArrayOutputStream writer = new ByteArrayOutputStream();
            
            int msgLen = (endXml).trim().getBytes().length;


            writer.write((byte) msgLen);

            for (int i = 1; i <= 3; i++) {
                writer.write((byte) (msgLen >> i * 8));
            }

            writer.write(endXml.getBytes());
            
            String out = writer.toString();
            
            logger.debug(out.getBytes().length+" final \n"+out);
            
            
            connection.write(out);
        } catch (Exception e) {
            logger.error("error writing end message", e);
        }
    }

    private class ConnectCompletionHandler implements CompletionHandler<Connection> {

        private final FilterChainContext context;

        private ConnectCompletionHandler(FilterChainContext context) {
            this.context = context;
        }

        @Override
        public void cancelled() {
            close(this.context.getConnection());
            resumeContext();
        }

        @Override
        public void failed(Throwable throwable) {
            close(this.context.getConnection());
            resumeContext();
        }

        @Override
        public void completed(Connection peerConnection) {
            Connection connection = this.context.getConnection();

            ProxyFilter.this.peerConnectionAttribute.set(connection, peerConnection);
            ProxyFilter.this.peerConnectionAttribute.set(peerConnection, connection);

            resumeContext();
        }

        @Override
        public void updated(Connection peerConnection) {
        }

        private void resumeContext() {
            this.context.resume();
        }

        private void close(Connection connection) {
            try {
                connection.close();
            } catch (IOException e) {
                logger.error("error closing connection", e);
            }
        }
    }
}








On Sep 12, 2011, at 4:22 AM, Oleksiy Stashok wrote:

> Hi,
>
> can you pls. share the entire project sources, or at least part of it so we can reproduce the problem?
> What are you using as peer?
>
> Thanks.
>
> WBR,
> Alexey.
>
> On 09/11/2011 07:12 PM, donald.walters_at_gmail.com wrote:
>> I created a filter chain which includes a StringFilter and the
>> TunnelFilter example but for some reason the redirecttoPeer method in
>> the TunnelFilter is not working.
>>
>> filterChainBuilder.add(new StringFilter(Charset.defaultCharset(),
>> "</jps>"));
>> filterChainBuilder.add(new
>> TunnelFilter(TCPNIOConnectorHandler.builder(transport).build(),
>> remoteHost, remotePort));
>>
>> ....
>>
>> in the TunnelFilter
>>
>> private static void redirectToPeer(FilterChainContext context,
>> Connection peerConnection)
>> throws IOException {
>> Connection srcConnection = context.getConnection();
>> logger.debug("Proxying messaging from " +
>> srcConnection.getPeerAddress() + " to " +
>> peerConnection.getPeerAddress());
>>
>> String message = context.getMessage();
>> logger.debug(message);
>>
>> peerConnection.write(message);
>>
>> }
>>
>> when that method is called nothing happens. I see the output and the
>> messages are ok but nothing is passed to the peer. Please let me know
>> what I am doing wrong here.
>