users@glassfish.java.net

Re: JMS problem.

From: Daniel Cavalcanti <dhcavalcanti_at_gmail.com>
Date: Thu, 17 May 2007 12:19:02 -0400

Sure.
I tried both: QueueBrowser and Thread.sleep approaches.

The first one did not work when I just create the browser and immediately
close it.
Then, I tried with create the browser, getEnumeration(), and close. Still
did not work.
There is another draw back here if it worked: If the destination is a Topic,
this is not a feasible solution.

The second option did not work as well. I tried with sleep times of 500,
1000, and even 3000 milliseconds.
Here is excerpt:

            Connection connection =
connectionFactory.createConnection(username,
password);
            Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
            Destination destination = (Destination) ic.lookup
(jndiDestination);
            Thread.sleep(3000);
            MessageConsumer consumer = session.createConsumer(destination);
            connection.start();
            BytesMessage jmsMessage = consumer.receiveNoWait();

I attached the imq and server log files.


On 5/17/07, Ramesh <rameshp_at_sun.com> wrote:
>
> Let me know how it goes, please drop me a note once you are able to
> receive messages.
> I will update the issue tracker with more details, would also be great
> if you can cc the glassfish users alias in your next e-mail, with your
> solution so that other developers can benefit from this.
>
> Thanks
> -Ramesh
>
> Daniel Cavalcanti wrote:
> > Yeap...
> >
> > Regarding your first paragraph. That's how our web clients expect to
> > use it. We put the consumer behind a web service because we need to
> > support clients running on other platforms other than Java.
> >
> > Second paragraph. That is clear as clean water now. Thanks.
> >
> > I will follow the QueueBrowser approach. I think the web service
> > client can deal with the extra minor overhead/delay.
> >
> > On 5/17/07, *Ramesh* <rameshp_at_sun.com <mailto:rameshp_at_sun.com>> wrote:
> >
> > Hi Daniel,
> > receiveNoWait() should be used only under circumstances when your
> app
> > wants to quickly return back to other things, and would be calling
> > receiveNoWait() again after executing these tasks.
> > In other words, receiveNoWait() returning without a message does
> > *not*
> > mean that the queue does not have any.
> >
> > The duration between sending the message and the time until you
> create
> > the receiver connection does not count, what is important is the
> time
> > interval between the create connection for the receiver and the
> > call to
> > receiveNoWait,
> >
> > Yes you are right, you would need a sleep() or someother delay,
> > between
> > the time you call createconnection and receiveNoWait(),
> > Instead of a sleep you can also open a queuebrowser, and close it,
> > this
> > should also ensure that receiveNoWait() works as you would expect
> >
> > Thanks
> > -Ramesh
> >
> > Daniel Cavalcanti wrote:
> > > I see. What throws me off a little is that even if I wait a long
> > time
> > > after I sent the message, the consumer won't get the message on
> the
> > > receiveNoWait(). I would assume the queueing system would have
> > enough
> > > time to have processed it already. But I see where the problem is.
> > >
> > > Here is an excerpt of the code attached, the consumer web service:
> > >
> > > //
> > > ******************************************************************
> > > // Establish JMS connectivity
> > > //
> > > ******************************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Establishing
> > > connection to JMS broker.");
> > > connection =
> > connectionFactory.createConnection(username,
> > > password);
> > >
> > > if (log.isTraceEnabled ()) log.trace("Creating
> > transacted
> > > session with JMS broker.");
> > > Session session = connection.createSession(true,
> > > Session.SESSION_TRANSACTED );
> > >
> > > //
> > > **************************************************************
> > > // Locate bound destination and create consumer
> > > //
> > > **************************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Searching for
> named
> > > destination: " + jndiDestination);
> > > Destination destination = (Destination)
> > > ic.lookup(jndiDestination);
> > >
> > > if (log.isTraceEnabled()) log.trace("Creating
> > consumer for
> > > named destination.");
> > > MessageConsumer consumer =
> > > session.createConsumer(destination);
> > >
> > > if (log.isTraceEnabled()) log.trace("Starting JMS
> > > connection.");
> > > connection.start();
> > >
> > > // read a message -- can be receive(),
> > receive(timeout),
> > > or receiveNoWait() depending on some param on the WS call.
> > >
> > > So, if I were to place a Thread.sleep(some time) after I create
> the
> > > connection and before I create the session, that would resolve the
> > > problem?
> > > Hum... that would defeat the purpose for this method in our
> > > application. Is that behavior true for other queueing systems as
> > well?
> > >
> > > This is what I'll will do. I'll discuss with my team the
> > implications
> > > of this behavior on receiveNoWait() and go on from there.
> > Nonetheless,
> > > I'll clear all the logs and make a clean, concise run of the app
> so
> > > that you get very specific logs and I'll post them in the issue
> > thread.
> > >
> > > thanks,
> > > Daniel.
> > > On 5/17/07, *Ramesh* <rameshp_at_sun.com <mailto:rameshp_at_sun.com>
> > <mailto:rameshp_at_sun.com <mailto:rameshp_at_sun.com>>> wrote:
> > >
> > > Hi Daniel,
> > > If you feel the explanation is satisfactory we could close
> > this issue,
> > > we would'nt need the source code.
> > > However, if you need more clarifications regarding this, we
> > would be
> > > glad to help
> > >
> > > Thanks
> > > -Ramesh
> > >
> > > Daniel Cavalcanti wrote:
> > > > Actually,
> > > > I opened issue 3011 and just got to read your e-mails. Let
> > me take a
> > > > closer look at the postings there.
> > > > If you still want the source code for that, I'll gladly
> > provide
> > > it in
> > > > the issue thread.
> > > > In the Producer WS and Consumer WS (the web service beans
> that
> > > clients
> > > > use to send and receive messages to/from the Destinations)
> > have
> > > a lot
> > > > of other business logic in it that I'd have to clean up
> > before I
> > > can
> > > > post the code.
> > > >
> > > > It will probably be easier to have all these discussions
> > in the
> > > issue
> > > > thread.
> > > > But thanks for the promptness.
> > > > Daniel.
> > > >
> > > > On 5/17/07, *Ramesh Parthasarathy * <
> > > Ramesh.Parthasarathy_at_sun.com
> > <mailto:Ramesh.Parthasarathy_at_sun.com> <mailto:
> > Ramesh.Parthasarathy_at_sun.com <mailto:Ramesh.Parthasarathy_at_sun.com>>
> > > > <mailto:Ramesh.Parthasarathy_at_sun.com
> > <mailto:Ramesh.Parthasarathy_at_sun.com>
> > > <mailto:Ramesh.Parthasarathy_at_sun.com
> > <mailto:Ramesh.Parthasarathy_at_sun.com>>>> wrote:
> > > >
> > > >
> https://glassfish.dev.java.net/issues/show_bug.cgi?id=3011
> > > >
> > > > might have explaination for your problem
> > > >
> > > > -Ramesh
> > > >
> > > > Sivakumar Thyagarajan wrote On 05/17/07 10:47,:
> > > > > > I also have a web app that browses the queue.
> > From the
> > > > QueueBrowser, I
> > > > > > am able to list all the messages in the queue.
> > > > >
> > > > > Is this the RequestQueue? If yes, your MDB was able to
> > > publish
> > > > response
> > > > > messages to the RequestQueue. How does the receiver
> > read the
> > > > message from
> > > > > the RequestQueue? Could you share sources of the
> > receiver,
> > > > ejb-jar.xml and
> > > > > sun-ejb-jar.xml and server/broker logs if you notice
> any
> > > > exceptions there.
> > > > >
> > > > > Thanks
> > > > > --Siva.
> > > > >
> > > > > Daniel Cavalcanti wrote:
> > > > >
> > > > >>I have an enterprise project and a client project to
> > test the
> > > > enterprise
> > > > >>project.
> > > > >>
> > > > >>The enterprise project has two web services: one to
> send
> > > > messages and
> > > > >>one to receive messages.
> > > > >>The send web service puts a message in a
> > InitialDestination
> > > > >>javax.jms.Destination.
> > > > >>Then, a MDB receives that message and
> > forwards/routes to a
> > > > RequestQueue
> > > > >>javax.jms.Queue.
> > > > >>The receiver web service reads messages from the
> > RequestQueue.
> > > > >>
> > > > >> From the client, everything works fine, except that
> > the
> > > web service
> > > > >>reading from the queue is not getting any message
> back.
> > > > >>I also have a web app that browses the queue. From the
> > > > QueueBrowser, I
> > > > >>am able to list all the messages in the queue.
> > > > >>Can someone point out to be what I'm doing wrong?
> > > > >>
> > > > >>I can share the code upon request outside the
> > mailing list
> > > since the
> > > > >>message won't get delivered if I attach the project...
> > > > >>
> > > > >>thanks,
> > > > >>Daniel.
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> ---------------------------------------------------------------------
> > > > > To unsubscribe, e-mail:
> > > users-unsubscribe_at_glassfish.dev.java.net
> > <mailto:users-unsubscribe_at_glassfish.dev.java.net>
> > > <mailto:users-unsubscribe_at_glassfish.dev.java.net
> > <mailto:users-unsubscribe_at_glassfish.dev.java.net>>
> > > > <mailto: users-unsubscribe_at_glassfish.dev.java.net
> > <mailto:users-unsubscribe_at_glassfish.dev.java.net>
> > > <mailto:users-unsubscribe_at_glassfish.dev.java.net
> > <mailto:users-unsubscribe_at_glassfish.dev.java.net>>>
> > > > > For additional commands, e-mail:
> > > > users-help_at_glassfish.dev.java.net
> > <mailto:users-help_at_glassfish.dev.java.net>
> > > <mailto:users-help_at_glassfish.dev.java.net
> > <mailto:users-help_at_glassfish.dev.java.net> >
> > > > <mailto: users-help_at_glassfish.dev.java.net
> > <mailto:users-help_at_glassfish.dev.java.net>
> > > <mailto:users-help_at_glassfish.dev.java.net
> > <mailto:users-help_at_glassfish.dev.java.net>>>
> > > > >
> > > >
> > > >
> > >
> >
> ---------------------------------------------------------------------
> > > > To unsubscribe, e-mail:
> > > users-unsubscribe_at_glassfish.dev.java.net
> > <mailto:users-unsubscribe_at_glassfish.dev.java.net>
> > > <mailto:users-unsubscribe_at_glassfish.dev.java.net
> > <mailto:users-unsubscribe_at_glassfish.dev.java.net>>
> > > > <mailto: users-unsubscribe_at_glassfish.dev.java.net
> > <mailto:users-unsubscribe_at_glassfish.dev.java.net>
> > > <mailto:users-unsubscribe_at_glassfish.dev.java.net
> > <mailto:users-unsubscribe_at_glassfish.dev.java.net>>>
> > > > For additional commands, e-mail:
> > > users-help_at_glassfish.dev.java.net
> > <mailto:users-help_at_glassfish.dev.java.net>
> > > <mailto:users-help_at_glassfish.dev.java.net
> > <mailto:users-help_at_glassfish.dev.java.net>>
> > > > <mailto:users-help_at_glassfish.dev.java.net
> > <mailto:users-help_at_glassfish.dev.java.net>
> > > <mailto:users-help_at_glassfish.dev.java.net
> > <mailto:users-help_at_glassfish.dev.java.net>>>
> > > >
> > > >
> > >
> > >
> > >
> >
> ------------------------------------------------------------------------
> > >
> > > /*
> > > * Local Matters, Inc. Confidential
> > > * OCO Source Materials
> > > * (C) Local Matters, Inc. 2007. All Rights Reserved.
> > > */
> > > package com.localmatters.flexiq.collector.consumer;
> > >
> > > import static
> > com.localmatters.flexiq.collector.CollectorConstants.*;
> > > import com.localmatters.flexiq.collector.jaxb.ResultMessage;
> > > import java.io.ByteArrayInputStream;
> > > import java.io.ByteArrayOutputStream;
> > > import java.io.IOException;
> > > import java.util.ArrayList ;
> > > import java.util.Collections;
> > > import java.util.List;
> > > import java.util.zip.DataFormatException;
> > > import java.util.zip.Inflater;
> > > import javax.annotation.PostConstruct;
> > > import javax.annotation.PreDestroy;
> > > import javax.annotation.Resource;
> > > import javax.ejb.Stateless;
> > > import javax.jms.BytesMessage;
> > > import javax.jms.Connection;
> > > import javax.jms.ConnectionFactory ;
> > > import javax.jms.Destination;
> > > import javax.jms.JMSException;
> > > import javax.jms.MessageConsumer;
> > > import javax.jms.Queue;
> > > import javax.jms.QueueBrowser;
> > > import javax.jms.Session ;
> > > import javax.jws.WebMethod;
> > > import javax.jws.WebParam;
> > > import javax.jws.WebService;
> > > import javax.naming.InitialContext;
> > > import javax.naming.NamingException;
> > > import javax.xml.bind.JAXBContext ;
> > > import javax.xml.bind.JAXBException;
> > > import javax.xml.bind.Unmarshaller;
> > > import org.apache.commons.logging.Log;
> > > import org.apache.commons.logging.LogFactory;
> > >
> > > /**
> > > * The collector message producer implementation.
> > > *
> > > * @author Daniel Cavalcanti
> > > */
> > > @Stateless()
> > > @WebService()
> > > public class ConsumerWS {
> > >
> > > //
> >
> **********************************************************************
> >
> > > // Instance variables
> > > //
> >
> **********************************************************************
> > >
> > > // Logging facility
> > *****************************************************
> > >
> > > /**
> > > * The class logger.
> > > */
> > > private Log log = LogFactory.getLog(getClass());
> > >
> > > // Pre-configured JMS managed objects
> > ***********************************
> > >
> > > /**
> > > * The JMS connection factory.
> > > */
> > > @Resource(mappedName="jms/CollectorConnectionFactory")
> > > private ConnectionFactory connectionFactory;
> > >
> > > // JMS Connection parameters
> > ********************************************
> > >
> > > /**
> > > * The JMS connection username.
> > > */
> > > @Resource(name="username")
> > > private String username;
> > >
> > > /**
> > > * The JMS connection password.
> > > */
> > > @Resource(name="password")
> > > private String password;
> > >
> > > // Server context
> > *******************************************************
> > >
> > > /**
> > > * The server initial context.
> > > */
> > > private InitialContext ic;
> > >
> > > // Compression objects
> > **************************************************
> > >
> > > /**
> > > * The compressor
> > > */
> > > private Inflater decompressor;
> > >
> > > //
> >
> **********************************************************************
> > > // Constructors
> > > //
> >
> **********************************************************************
> > >
> > > /**
> > > * Creates a new instance of ConsumerWS.
> > > */
> > > public ConsumerWS() {
> > > if ( log.isTraceEnabled()) log.trace("Creating collector
> > consumer bean.");
> > > }
> > >
> > > //
> >
> **********************************************************************
> > > // Life-cycle methods
> > > //
> >
> **********************************************************************
> > >
> > > /**
> > > * Initialize resources.
> > > */
> > > @PostConstruct()
> > > private void init() {
> > >
> > > //
> > ******************************************************************
> > > // Log initial parameters
> > > //
> > ******************************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Initializing
> > collector consumer bean.");
> > >
> > > try {
> > >
> > > //
> > **************************************************************
> > > // Establish JMS connectivity
> > > //
> > **************************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Getting server
> > initial context.");
> > > ic = new InitialContext();
> > >
> > > //
> > **************************************************************
> > > // Initialize decompressor
> > > //
> > **************************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Initializing
> > decompressor.");
> > > decompressor = new Inflater();
> > >
> > >
> > > if (log.isTraceEnabled()) log.trace ("Collector
> > consumer bean started successfully.");
> > >
> > > } catch (NamingException ex) {
> > > log.error("Failed to get server initial context.",
> ex);
> > > } catch (Throwable ex) {
> > > log.error("Unexpected error.", ex);
> > > }
> > >
> > > }
> > >
> > > /**
> > > * Releases resources.
> > > */
> > > @PreDestroy()
> > > private void release() {
> > > if (log.isTraceEnabled()) log.trace("Collector consumer
> > stopped.");
> > > }
> > >
> > > //
> >
> **********************************************************************
> > > // ProducerService methods
> > > //
> >
> **********************************************************************
> > >
> > > /**
> > > * Retrieves <code>maxMessages</code> messages from the
> > named queue.
> > > * This method blocks until <code>maxMessages</code> are
> > retrieved from the
> > > * queue.
> > > *
> > > * @param username The JMS connection username.
> > > * @param password The JMS connection password.
> > > * @param jndiDestination The destination JNDI.
> > > * @param maxMessages The max number of messages to return.
> > > * @param compressed Determine if messages should be left
> > compressed in case they are compressed.
> > > * @throws
> > com.localmatters.flexiq.collector.consumer.ConsumerException
> > Thrown if there is an exception/error.
> > > * @return An array of collector messages.
> > > */
> > > @WebMethod()
> > > public ResultMessage[] retrieve(@WebParam(name="username")
> > String username, @WebParam(name="password") String password,
> > @WebParam(name="jndiDestination") String jndiDestination,
> > @WebParam(name="maxMessages") int maxMessages,
> > @WebParam(name="compressed") boolean compressed)
> > > throws ConsumerException {
> > > if (log.isTraceEnabled()) log.trace("Blocking retrieve
> > request.");
> > > return doRetrieve(RetrieveType.BLOCKING, username,
> > password, jndiDestination, maxMessages, compressed, -1);
> > > }
> > >
> > > /**
> > > * Retrieves up to <code>maxMessages</code> messages from
> > the named queue.
> > > * This method does not block, so if the queue does not have
> > any available
> > > * messages, the returned array will contain only those
> > messages that were
> > > * available at the time of the call up to the established
> > limit.
> > > *
> > > * @param username The JMS connection username.
> > > * @param password The JMS connection password.
> > > * @param jndiDestination The destination JNDI.
> > > * @param maxMessages The max number of messages to return.
> > > * @param compressed Determine if messages should be left
> > compressed in case they are compressed.
> > > * @throws
> > com.localmatters.flexiq.collector.consumer.ConsumerException
> > Thrown if there is an exception/error.
> > > * @return An array of collector messages.
> > > */
> > > @WebMethod()
> > > public ResultMessage[]
> > retrieveImmediate(@WebParam(name="username") String username,
> > @WebParam(name="password") String password,
> > @WebParam(name="jndiDestination") String jndiDestination,
> > @WebParam(name="maxMessages") int maxMessages,
> > @WebParam(name="compressed") boolean compressed)
> > > throws ConsumerException {
> > > if (log.isTraceEnabled()) log.trace("Non-blocking
> > retrieve request.");
> > > return doRetrieve(RetrieveType.IMMEDIATE, username,
> > password, jndiDestination, maxMessages, compressed, -1);
> > > }
> > >
> > > /**
> > > * Retrieves up to <code>maxMessages</code> messages from
> > the named queue.
> > > * This method blocks up to <code>timeout</code>
> > milliseconds on each
> > > * attempt to retrieve a message. Hence, if no messages were
> > available and
> > > * none became available in the duration of the call, the
> > method will block
> > > * for <code>maxMessages</code> * <code>timeout</code>
> > milliseconds. The
> > > * returned array will contain the number of messages that
> > were available
> > > * at the time of the call, up to the established limit.
> > > *
> > > * @param username The JMS connection username.
> > > * @param password The JMS connection password.
> > > * @param jndiDestination The destination JNDI.
> > > * @param maxMessages The max number of messages to return.
> > > * @param compressed Determine if messages should be left
> > compressed in case they are compressed.
> > > * @param timeout The timeout in milliseconds for each
> > message retrieval attempt.
> > > * @throws
> > com.localmatters.flexiq.collector.consumer.ConsumerException
> > Thrown if there is an exception/error.
> > > * @return An array of collector messages.
> > > */
> > > @WebMethod()
> > > public ResultMessage[]
> > retrieveTimed(@WebParam(name="username") String username,
> > @WebParam(name="password") String password,
> > @WebParam(name="jndiDestination") String jndiDestination,
> > @WebParam(name="maxMessages") int maxMessages,
> > @WebParam(name="compressed") boolean compressed,
> > @WebParam(name="timeout") long timeout)
> > > throws ConsumerException {
> > > if (log.isTraceEnabled()) log.trace("Timed retrieve
> > request.");
> > > return doRetrieve(RetrieveType.TIMED, username,
> > password, jndiDestination, maxMessages, compressed, timeout);
> > > }
> > >
> > > //
> >
> **********************************************************************
> > > // Auxiliary methods
> > > //
> >
> **********************************************************************
> >
> > >
> > > /**
> > > * Performs the actual message retrieval.
> > > *
> > > * @param type The retrieval type.
> > > * @param username The JMS connection username.
> > > * @param password The JMS connection password.
> > > * @param jndiDestination The destination JNDI.
> > > * @param maxMessages The max number of messages to return.
> > > * @param compressed Determine if messages should be left
> > compressed in case they are compressed.
> > > * @param timeout The timeout in milliseconds for each
> > message retrieval attempt.
> > > * @throws
> > com.localmatters.flexiq.collector.consumer.ConsumerException
> > Thrown if there is an exception/error.
> > > * @return An array of collector messages.
> > > */
> > > private ResultMessage[] doRetrieve(RetrieveType type, String
> > username, String password, String jndiDestination, int
> > maxMessages, boolean compressed, long timeout)
> > > throws ConsumerException {
> > >
> > > //
> > ******************************************************************
> > > // Validate parameters
> > > //
> > ******************************************************************
> > >
> > > if (username == null)
> > > throw new ConsumerException(new
> > NullPointerException("JMS connection username cannot be null."));
> > > if (password == null)
> > > throw new ConsumerException(new
> > NullPointerException("JMS connection password cannot be null."));
> > > if (!this.username.equals(username))
> > > throw new ConsumerException(new
> > IllegalArgumentException("Invalid JMS connection username."));
> > > if (!this.password.equals(password))
> > > throw new ConsumerException(new
> > IllegalArgumentException("Invalid JMS connection password."));
> > >
> > > // The JMS connection
> > > Connection connection = null;
> > >
> > > try {
> > >
> > > //
> > ******************************************************************
> > > // Establish JMS connectivity
> > > //
> > ******************************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Establishing
> > connection to JMS broker.");
> > > connection =
> > connectionFactory.createConnection(username, password);
> > >
> > > if ( log.isTraceEnabled()) log.trace("Creating
> > transacted session with JMS broker.");
> > > Session session = connection.createSession(true,
> > Session.SESSION_TRANSACTED);
> > >
> > > //
> > **************************************************************
> > > // Locate bound destination and create consumer
> > > //
> > **************************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Searching for
> > named destination: " + jndiDestination);
> > > Destination destination = (Destination)
> > ic.lookup(jndiDestination);
> > >
> > > if (log.isTraceEnabled()) log.trace("Creating
> > consumer for named destination.");
> > > MessageConsumer consumer =
> > session.createConsumer(destination);
> > >
> > > if (log.isTraceEnabled()) log.trace("Starting JMS
> > connection.");
> > > connection.start();
> > >
> > > //
> > **************************************************************
> > > // Consume messages
> > > //
> > **************************************************************
> > >
> > > if (log.isDebugEnabled()) log.trace("Creating
> > retrieval containers.");
> > > List<ResultMessage> processedMessages = new
> > ArrayList<ResultMessage>(maxMessages);
> > > BytesMessage jmsMessage = null;
> > >
> > > for (int i = 0 ; i < maxMessages ; i++) {
> > >
> > > //
> > **********************************************************
> > > // Attempt message retrieve
> > > //
> > **********************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Attempting
> > retrieval: " + i);
> > > switch (type) {
> > > case BLOCKING :
> > > jmsMessage = (BytesMessage)
> > consumer.receive();
> > > break;
> > > case IMMEDIATE :
> > > jmsMessage = (BytesMessage)
> > consumer.receiveNoWait();
> > > break;
> > > case TIMED :
> > > jmsMessage = (BytesMessage)
> > consumer.receive(timeout);
> > > break;
> > > }
> > >
> > > //
> > **********************************************************
> > > // Process retrieved message
> > > //
> > **********************************************************
> > >
> > > if (jmsMessage != null) {
> > > if ( log.isTraceEnabled())
> > log.trace("Message retrieved\n" + jmsMessage);
> > >
> > > //
> > ******************************************************
> > > // Extract message
> > > //
> > ******************************************************
> > >
> > > if (log.isTraceEnabled())
> > log.trace("Extracting result message container from JMS message.");
> > > byte[] extracted = new byte[(int)
> > jmsMessage.getBodyLength()];
> > > jmsMessage.readBytes(extracted);
> > >
> > > //
> > ******************************************************
> > > // Deserialize message
> > > //
> > ******************************************************
> > >
> > > if (log.isTraceEnabled())
> > log.trace("Deserializing result message container.");
> > > ResultMessage resultMessage =
> > deserialize(extracted);
> > >
> > > //
> > ******************************************************
> > > // Decrypt message
> > > //
> > ******************************************************
> > >
> > > // TODO decrypt
> > > if
> > (jmsMessage.getBooleanProperty(ENCRYPTED_HEADER)) {
> > > if (log.isTraceEnabled())
> > log.trace("Decrypting message.");
> > >
> > > }
> > >
> > > //
> > ******************************************************
> > > // Decompress message
> > > //
> > ******************************************************
> > >
> > > if
> > (jmsMessage.getBooleanProperty(COMPRESSED_HEADER) && !compressed) {
> > > if (log.isTraceEnabled())
> > log.trace("Decompressing message.");
> > >
> > resultMessage.setMessage(decompress(resultMessage.getMessage()));
> > > }
> > >
> > > //
> > ******************************************************
> > > // Done processing message
> > > //
> > ******************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Message
> > added to retrieval container.");
> > > processedMessages.add(resultMessage);
> > >
> > > } else
> > > if (log.isTraceEnabled()) log.trace("No
> > message was available.");
> > >
> > > }
> > >
> > > //
> > **************************************************************
> > > // Package return container
> > > //
> > **************************************************************
> > >
> > > if (log.isTraceEnabled()) log.trace("Packing
> > retrieved messages to return.");
> > > ResultMessage[] collectorMessages = new
> > ResultMessage[processedMessages.size()];
> > > for (int i = 0 ; i < collectorMessages.length ; i++)
> > > collectorMessages[i] = processedMessages.get(i);
> > >
> > > if (log.isTraceEnabled()) log.trace("Returning " +
> > collectorMessages.length + " messages.");
> > > return collectorMessages;
> > >
> > > } catch (NamingException ex) {
> > > log.error("Unable to locate named queue: " +
> > jndiDestination, ex);
> > > throw new ConsumerException("Unable to locate named
> > queue: " + jndiDestination, ex);
> > > } catch (JMSException ex) {
> > > log.error("Unable to consumer messages.", ex);
> > > throw new ConsumerException("Unable to consume
> > messages.", ex);
> > > } catch (Throwable ex) {
> > > log.error("Unexpected error.", ex);
> > > throw new ConsumerException("Unexpected error.", ex);
> > > } finally {
> > > try {
> > > if (connection != null) {
> > > if (log.isTraceEnabled()) log.trace("Closing
> > JMS connection.");
> > > connection.close();
> > > }
> > > } catch (Throwable ex) {
> > > log.error("Unexpected error.", ex);
> > > }
> > > }
> > >
> > > }
> > >
> > > //
> >
> **********************************************************************
> >
> > > // Auxiliary methods
> > > //
> >
> **********************************************************************
> > >
> > > /**
> > > * Decompresses a collector message.
> > > *
> > > * @return The decompressed collector message.
> > > * @param collectorMessage The collector message.
> > > * @throws java.io.IOException If there is an exception/error.
> > > * @throws java.util.zip.DataFormatException If there is an
> > exception/error.
> > > */
> > > private byte[] decompress(byte[] collectorMessage)
> > > throws IOException, DataFormatException {
> > >
> > > decompressor.reset();
> > > decompressor.setInput(collectorMessage);
> > >
> > > ByteArrayOutputStream baos = new
> > ByteArrayOutputStream(collectorMessage.length);
> > > byte[] buf = new byte[1024];
> > > while (!decompressor.finished()) {
> > > int count = decompressor.inflate(buf);
> > > baos.write(buf, 0, count);
> > > }
> > > baos.close();
> > >
> > > if (log.isTraceEnabled()) log.trace("Message
> > decompressed.");
> > >
> > > return baos.toByteArray();
> > >
> > > }
> > >
> > > /**
> > > * Deserializes the result message.
> > > *
> > > * @return The deserializes the result message.
> > > * @param resultMessage The resultMessage.
> > > * @throws javax.xml.bind.JAXBException If there is an
> > exception/error.
> > > * @throws IOException If there is an exception/error.
> > > */
> > > private ResultMessage deserialize(byte[] resultMessage)
> > > throws JAXBException, IOException {
> > >
> > > ByteArrayInputStream bais = new
> > ByteArrayInputStream(resultMessage);
> > >
> > > JAXBContext jaxbc =
> > JAXBContext.newInstance("com.localmatters.flexiq.collector.jaxb ");
> > > Unmarshaller unmarshaller = jaxbc.createUnmarshaller();
> > >
> > > ResultMessage unmarshalled = (ResultMessage)
> > unmarshaller.unmarshal(bais);
> > > bais.close();
> > >
> > > return unmarshalled;
> > >
> > > // ByteArrayInputStream bais = new
> > ByteArrayInputStream(resultMessage);
> > > // ObjectInputStream ois = new ObjectInputStream(bais);
> > > //
> > > // ResultMessage deserialized = (ResultMessage)
> > ois.readObject();
> > > //
> > > // ois.close();
> > > // bais.close();
> > > //
> > > // return deserialized;
> > >
> > > }
> > >
> > > //
> >
> **********************************************************************
> >
> > > // Auxiliary classes
> > > //
> >
> **********************************************************************
> > >
> > > /**
> > > * Enumeration for the retrieval types.
> > > */
> > > private enum RetrieveType {
> > > BLOCKING,
> > > IMMEDIATE,
> > > TIMED
> > > };
> > >
> > > }
> > >
> >
> >
>