Hi all! I am trying to implement resend functionality with use of JMS and
selectors. Now in project there are 2 beans. TestBean posts 10 messages with
random delay into jms.resendQueue. ResendProcessorBean takes "expired"
messages from queue and sends them to final destination. Problem is that in
local GF 3.1 with Jms service configured as EMBEDDED it works like a charm.
But in production GF 3.1 with JMS configured as REMOTE (openMq 4.5 on same
machine) ResendProcessorBean doesnt get any messages from queue or gets them
randomly...There are no exceptions in server.log and logs of open mq also
indicate no problems. ResendProcessorBean: import org.slf4j.Logger; import
org.slf4j.LoggerFactory; import javax.annotation.*; import javax.ejb.*;
import javax.jms.*; import javax.transaction.UserTransaction; import
java.io.Serializable; import java.util.Date; @Local @Startup @Singleton
@TransactionManagement(TransactionManagementType.BEAN) public class
ResendProcessorBean { private static final Logger log =
LoggerFactory.getLogger(ResendProcessorBean.class); @Resource(name =
"jms.connectionFactory") private ConnectionFactory jmsConnectionFactory;
@Resource(name = "jms.resendQueue") private Queue resendQueue; @Resource
private TimerService timerService; @Resource private SessionContext context;
private QueueConnection connection; @PostConstruct public void init() {
log.info("Starting resend processor..."); prepareJmsConnection();
scheduleRun(); } private void prepareJmsConnection() { try { connection =
((QueueConnectionFactory) jmsConnectionFactory).createQueueConnection();
connection.start(); } catch (JMSException ex) { log.error("Exception
preparing JMS connection.", ex); } } private String createMessageSelector() {
return "deliveryTime<=" + System.currentTimeMillis(); } private void
scheduleRun() { log.debug("Scheduling next resend check...."); TimerConfig
timerConfig = new TimerConfig(); timerConfig.setPersistent(false);
timerService.createSingleActionTimer(2000, timerConfig); } @Timeout
@SuppressWarnings({"ConstantConditions", "UnusedDeclaration"}) public void
checkResendQueue() { log.debug("Checking resend queue..."); QueueSession
session = null; MessageConsumer consumer = null; try { log.debug("Opening
session and consumer..."); session = connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(resendQueue,
createMessageSelector()); int i = 0; Message message; log.debug("Starting to
receive messages..."); do { message = consumer.receiveNoWait(); if (message
!= null) { ObjectMessage objectMessage = (ObjectMessage) message;
log.debug("Got message for redelivery to {} at {}",
objectMessage.getStringProperty("destinationQueueName"), new Date((long)
objectMessage.getDoubleProperty(""))); UserTransaction tx =
context.getUserTransaction(); tx.begin(); resend(session,
objectMessage.getObject(),
objectMessage.getStringProperty("destinationQueueName")); i++; tx.commit(); }
} while (message != null && i <= 200); if (i > 0) { log.debug("Successfully
resent {} items.", i); } } catch (Exception e) { log.error("Exception
processing messages from resend queue", e); } finally { scheduleRun(); try {
if (consumer != null) { consumer.close(); } if (session != null) {
session.close(); } } catch (Exception ignore) {} } } private void
resend(QueueSession session, Serializable message, String queueName) throws
JMSException { QueueSender sender = null; try { sender =
session.createSender(session.createQueue(queueName)); javax.jms.Message
jmsMessage = session.createObjectMessage(message); log.debug("Redelivering
{}", message); sender.send(jmsMessage); } finally { if (sender != null)
sender.close(); } } } TestBean: import java.util.*; import
javax.annotation.Resource; import javax.ejb.*; import javax.jms.*; import
javax.jms.Queue; import org.slf4j.*; @Local @Startup @Singleton public class
TestBean { private static final Logger log =
LoggerFactory.getLogger(TestBean.class); @Resource(name = "jms.resendQueue")
private Queue resendQueue; @Resource(name = "jms.connectionFactory") private
ConnectionFactory jmsConnectionFactory; private Random generator = new
Random(); @SuppressWarnings({"FieldCanBeLocal"}) private String queueName =
"jms.testQueue"; private static int ii = 0;
@SuppressWarnings({"UnusedDeclaration"}) @Schedule(hour = "*", minute = "*",
second = "*/30") public void sendTest() { log.debug("Scheduling message for
resending..."); Connection connection = null; Session session = null;
TestObject object = null; try { connection =
jmsConnectionFactory.createConnection(); session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE); for (int i = 0; i
< 10; i++ ) { int delayMilliseconds = generator.nextInt(7) * 1000; long
deliveryTime = System.currentTimeMillis() + delayMilliseconds;
MessageProducer producer = session.createProducer(resendQueue); object = new
TestObject(delayMilliseconds, "TEST " + ii); Message jmsMessage =
session.createObjectMessage(object);
jmsMessage.setStringProperty("destinationQueueName", queueName);
jmsMessage.setDoubleProperty("deliveryTime", deliveryTime);
log.debug("Scheduled message {} for delivery at: {}", ii, new
Date(deliveryTime)); producer.send(jmsMessage); ii++; } } catch (JMSException
e) { log.error("Exception resending message {} to queue {}", new Object[]
{object, queueName}, e); } finally { try { if (session != null) {
session.close(); } if (connection != null) { connection.close(); } } catch
(JMSException ignore) {} } } }
--
[Message sent by forum member 'tyutchev_alex']
View Post: http://forums.java.net/node/781328