The Novell exteNd Messaging Platform's JMS supports several special exceptions, which are used to notify an exception listeners of severe errors that occur asynchronously. This section provides three examples to illustrate how to detect if the JMS server stops or crashes, and how to recover messages that can not be delivered via a message listenersonMessage
method:1 Lost Connection
Client applications may detect if the server stops or crashes by listening for a JMQConnectionLostException in theonException
method of an exception listener. This exception is raised if the socket to the server breaks and can not be re-established.The following example shows a familiar pub/sub application that creates a publisher and subscriber for a topic. In terms of this example, the interesting part of the code is the
onException
implementation, where we check for JMQConnectionLostException's:If the exception listener encounters a JMQConnectionLostException, it prints an error message and notifies other threads that the server is no longer active. A more sophisticated client application could start a loop where it tries to re-connect to the server.package exceptionListener; import javax.naming.InitialContext; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Message; import javax.jms.TextMessage; import javax.jms.JMSException; import javax.jms.QueueSession; import javax.jms.QueueSender; import javax.jms.QueueReceiver; import javax.jms.QueueConnection; import javax.jms.MessageListener; import javax.jms.ExceptionListener; import javax.jms.QueueConnectionFactory; import com.sssw.jms.api.JMQConnectionLostException; public class LostConnection implements ExceptionListener { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the queue object | Queue queue = (Queue) ctx.lookup("queue/queue0"); | | // lookup the queue connection factory | QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx. | lookup("queue/connectionFactory"); | | // create a queue connection | QueueConnection queueConn = connFactory.createQueueConnection(); | | // set an exception listener | LostConnection lost = new LostConnection(); | queueConn.setExceptionListener(lost); | | // create a queue session | QueueSession queueSession = queueConn.createQueueSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a queue receiver | QueueReceiver queueReceiver = queueSession.createReceiver(queue); | | // set message listener that prints messages | queueReceiver.setMessageListener(new MessageListener() { | | public void onMessage(Message message) { | | | System.out.println(message); | | } | }); | | // start the connection | queueConn.start(); | | // create a queue sender | QueueSender queueSender = queueSession.createSender(queue); | | // create and send a message | TextMessage message = queueSession.createTextMessage("hello"); | queueSender.send(message); | | synchronized (lost) { | | while (lost.active) lost.wait(); | } } public synchronized void onException(JMSException ex) { | if (ex instanceof JMQConnectionLostException) | { | | System.out.println("server down"); | | active = false; | | notify(); | } | else | { | | System.out.println("unexpected exception: " + ex); | } } private boolean active = true; }2 Exceptions in onMessage
Another error situation that can occur asynchronously is unexpected exceptions during message delivery to a message listener'sonMessage
method. The JMS server will attempt to deliver each message a number of times but if theonMessage
method keeps throwing exception, it eventually gives up.To recover such un-delivered messages, a consumer application must register an exception listener and look for JMQMessageNotDeliveredException's. This exception contains methods to get the message in question and the application can deal with it. Below is an example:
As can be seen from the LostMessage class above, we register a message listener with the consumer, which always throws a RuntimeException to trigger to error. The JMQMessageNotDeliveredException class supports thepackage exceptionListener; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.Message; import javax.jms.TextMessage; import javax.jms.JMSException; import javax.jms.TopicSession; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicConnection; import javax.jms.MessageListener; import javax.jms.ExceptionListener; import javax.jms.TopicConnectionFactory; import com.sssw.jms.api.JMQMessageNotDeliveredException; public class LostMessage implements ExceptionListener { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | | // set an exception listener | LostMessage lost = new LostMessage(); | topicConn.setExceptionListener(lost); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a topic subscriber | TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); | | // set message listener that always raises exception | topicSubscriber.setMessageListener(new MessageListener() { | | public void onMessage(Message message) { | | | throw new RuntimeException("unexpected"); | | } | }); | | // start the connection | topicConn.start(); | | // create a topic publisher | TopicPublisher topicPublisher = topicSession.createPublisher(topic); | | // create and send a message | TextMessage message = topicSession.createTextMessage("hello"); | topicPublisher.publish(message); | | // wait for message delivery | lost.waitForMessage(); | | // close the topic connection | topicConn.close(); } public synchronized void waitForMessage() { | while (!delivered) { | | try { | | | wait(); | | } catch (InterruptedException ex) { } | } } public synchronized void onException(JMSException ex) { | if (ex instanceof JMQMessageNotDeliveredException) | { | | JMQMessageNotDeliveredException mnde = | | (JMQMessageNotDeliveredException) ex; | | | | System.out.println("message = " + mnde.getJMSMessage()); | | System.out.println("exception = " + mnde.getLinkedException()); | | | | delivered = true; | | notify(); | } | else | { | | System.out.println("unexpected exception: " + ex); | } } private boolean delivered = false; }getJMSMessage
method, which can be used to retrieve the message, which could not be delivered by JMS.Since JMQMessageNotDeliveredException extends a regular /JMSException you may also use the
getLinkedException
method, which returns the root cause of the problem. Note that depending on the error, the linked exception may sometimes be null.3 Re-connecting After Server Crash
The final example in this section expands the lost connection example above by including logic for re-connecting to the server if a crash is detected. The Reconnect class below is a topic subscriber which continously attempts to re-connect when a JMQConnectionLostException is encountered:The Reconnect class uses a separate thread to perform re-connections. This thread is normally waiting for another thread to instruct it to start the re-connection process. If the exception listener detects a JMQConnectionLostException in thepackage exceptionListener; import javax.jms.Topic; import javax.jms.Message; import javax.jms.Session; import javax.jms.JMSException; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.TopicSubscriber; import javax.jms.MessageListener; import javax.jms.ExceptionListener; import javax.jms.TopicConnectionFactory; import javax.naming.InitialContext; import com.sssw.jms.api.JMQConnectionLostException; /** This program shows how to write a fault-tolerant topic subscriber. If the jBroker MQ server fails, the program will enter a continous loop and attempt to reconnect to the server. When the server is restarted, the subscriber will continue to retrieve messages. */ public class Reconnect implements MessageListener, ExceptionListener { TopicSubscriber _subscriber; // topic subscriber /** Runs the program. @param args Program arguments. */ public static void main(String[] args) { | Reconnect ftp = new Reconnect(); } /** The constructor will start a reconnect thread and subscribe. */ public Reconnect() { | // create a thread that handles reconnection | Thread t = new Thread() { | | public void run() { | | | while (true) { | | | | reconnect(); // return value not used here | | | } | | } | }; | | t.start(); // won't start because I have lock ... | | // attempt to create a subscriber | try { | | _subscriber = createSubscriber(); | } catch (JMSException ex) { | | System.out.println("caught: " + ex); | | synchronized (this) { | | | notify(); // tigger reconnect thread | | } | } } /** Receive a message asynchronously. @param message The JMS message. */ public void onMessage(Message message) { | System.out.print(","); } /** This method tries to reconnect to the jBroker MQ server. @return True if reconnect succeeded, false otherwise. */ public synchronized boolean reconnect() { | doWait(); | System.out.print("trying to reconnect ..."); | for (int i = 0; i < 10; i++) { | | try { | | | _subscriber = createSubscriber(); | | | System.out.println("ok"); | | | return true; // succeeded | | } catch (Exception ex) { | | | System.out.print("."); | | | try { | | | | Thread.sleep(500); // sleep a little before trying again | | | } catch (InterruptedException exc) { } | | } | } | System.err.println("failed after 10 attempts"); | return false; // failed } /** Create a subscriber. @return Newly created topic subscriber. @exception JMSException If creation failed. This might for instance fail if the jBroker MQ server is not running. */ public TopicSubscriber createSubscriber() throws JMSException { | TopicSubscriber subscriber = null; | | try { | | InitialContext ctx = new InitialContext(); | | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | TopicConnectionFactory fac = (TopicConnectionFactory) ctx. | | lookup("topic/connectionFactory"); | | TopicConnection con = fac.createTopicConnection(); | | con.start(); | | TopicSession session = con.createTopicSession(false, | | Session.AUTO_ACKNOWLEDGE); | | con.setExceptionListener(this); | | subscriber = session.createSubscriber(topic); | } catch (Exception ex) { | | throw new JMSException("can't initialize: " + ex); | } | | subscriber.setMessageListener(this); | | return subscriber; } /** The JMS exception listener. @param ex JMS exception raised on client. */ public synchronized void onException(JMSException ex) { | if (ex instanceof JMQConnectionLostException) | { | | try { | | | _subscriber.close(); | | } catch (JMSException exc) { } | | _subscriber = null; | | System.out.println("caught: " + ex); | | notify(); // trigger reconnect thread | } | else | { | | ex.printStackTrace(); // other error handling ... | } } /** Utility to wait for notify. */ public synchronized void doWait() { | try { | | wait(); | } catch (InterruptedException exc) { | | exc.printStackTrace(); | } } }onException
method, it notifies the re-connect thread and waits for it to succeed.The
reconnect
method calls thecreateSubscriber
method a number of times, which in turn attempts to re-establish the initial context and re-subscribe the topic subscriber. Note that the Reconnect client will enter the re-connect loop immediately if the server is unavailable when the client first starts.
Copyright © 2000-2003, Novell, Inc. All rights reserved. |