The Novell exteNd Messaging Platform's JMS supports several special exceptions,
which are used to notify an
exception listeners of severe errors that occur asynchronously. This section
provides three examples to illustrate how to detect if the JMS server
stops or crashes, and how to recover messages that can not be delivered via
a message listeners onMessage
method:
Client applications may detect if the server stops or crashes by
listening for a
JMQConnectionLostException
in the onException
method of an exception listener. This exception
is raised if the socket to the server breaks and can not be re-established.
The following example shows a familiar pub/sub application that creates a
publisher and subscriber for a topic. In terms of this example, the interesting
part of the code is the onException
implementation, where we check
for JMQConnectionLostException
's:
package exceptionListener; import javax.naming.InitialContext; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Message; import javax.jms.TextMessage; import javax.jms.JMSException; import javax.jms.QueueSession; import javax.jms.QueueSender; import javax.jms.QueueReceiver; import javax.jms.QueueConnection; import javax.jms.MessageListener; import javax.jms.ExceptionListener; import javax.jms.QueueConnectionFactory; import com.sssw.jms.api.JMQConnectionLostException; public class LostConnection implements ExceptionListener { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the queue object | Queue queue = (Queue) ctx.lookup("queue/queue0"); | | // lookup the queue connection factory | QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx. | lookup("queue/connectionFactory"); | | // create a queue connection | QueueConnection queueConn = connFactory.createQueueConnection(); | | // set an exception listener | LostConnection lost = new LostConnection(); | queueConn.setExceptionListener(lost); | | // create a queue session | QueueSession queueSession = queueConn.createQueueSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a queue receiver | QueueReceiver queueReceiver = queueSession.createReceiver(queue); | | // set message listener that prints messages | queueReceiver.setMessageListener(new MessageListener() { | | public void onMessage(Message message) { | | | System.out.println(message); | | } | }); | | // start the connection | queueConn.start(); | | // create a queue sender | QueueSender queueSender = queueSession.createSender(queue); | | // create and send a message | TextMessage message = queueSession.createTextMessage("hello"); | queueSender.send(message); | | synchronized (lost) { | | while (lost.active) lost.wait(); | } } public synchronized void onException(JMSException ex) { | if (ex instanceof JMQConnectionLostException) | { | | System.out.println("server down"); | | active = false; | | notify(); | } | else | { | | System.out.println("unexpected exception: " + ex); | } } private boolean active = true; }
If the exception listener encounters a JMQConnectionLostException
, it prints
an error message and notifies other threads that the server is no longer
active. A more sophisticated client application
could start a loop where it tries to re-connect to the server.
onMessage
Another error situation that can occur asynchronously is unexpected exceptions
during message delivery to a message listener's onMessage
method.
The JMS server will attempt to deliver each message a number of times
but if the onMessage
method keeps throwing exception, it
eventually gives up.
To recover such un-delivered messages, a consumer application must register an exception listener and look for JMQMessageNotDeliveredException's. This exception contains methods to get the message in question and the application can deal with it. Below is an example:
package exceptionListener; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.Message; import javax.jms.TextMessage; import javax.jms.JMSException; import javax.jms.TopicSession; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicConnection; import javax.jms.MessageListener; import javax.jms.ExceptionListener; import javax.jms.TopicConnectionFactory; import com.sssw.jms.api.JMQMessageNotDeliveredException; public class LostMessage implements ExceptionListener { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx. | lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | | // set an exception listener | LostMessage lost = new LostMessage(); | topicConn.setExceptionListener(lost); | | // create a topic session | TopicSession topicSession = topicConn.createTopicSession(false, | Session.AUTO_ACKNOWLEDGE); | | // create a topic subscriber | TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); | | // set message listener that always raises exception | topicSubscriber.setMessageListener(new MessageListener() { | | public void onMessage(Message message) { | | | throw new RuntimeException("unexpected"); | | } | }); | | // start the connection | topicConn.start(); | | // create a topic publisher | TopicPublisher topicPublisher = topicSession.createPublisher(topic); | | // create and send a message | TextMessage message = topicSession.createTextMessage("hello"); | topicPublisher.publish(message); | | // wait for message delivery | lost.waitForMessage(); | | // close the topic connection | topicConn.close(); } public synchronized void waitForMessage() { | while (!delivered) { | | try { | | | wait(); | | } catch (InterruptedException ex) { } | } } public synchronized void onException(JMSException ex) { | if (ex instanceof JMQMessageNotDeliveredException) | { | | JMQMessageNotDeliveredException mnde = | | (JMQMessageNotDeliveredException) ex; | | | | System.out.println("message = " + mnde.getJMSMessage()); | | System.out.println("exception = " + mnde.getLinkedException()); | | | | delivered = true; | | notify(); | } | else | { | | System.out.println("unexpected exception: " + ex); | } } private boolean delivered = false; }
As can be seen from the LostMessage
class above, we register a message
listener with the consumer, which always throws a RuntimeException to trigger
to error. The JMQMessageNotDeliveredException
class supports the
getJMSMessage
method, which can be used to retrieve the message,
which could not be delivered by JMS.
Since JMQMessageNotDeliveredException
extends a regular
/JMSException
you may also use the getLinkedException
method, which returns
the root cause of the problem. Note that depending on the error, the linked
exception may sometimes be null.
The final example in this section expands the
lost connection example above by including logic for
re-connecting to the server if a crash is detected. The Reconnect
class below
is a topic subscriber which continously attempts to re-connect when a
JMQConnectionLostException
is encountered:
package exceptionListener; import javax.jms.Topic; import javax.jms.Message; import javax.jms.Session; import javax.jms.JMSException; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.TopicSubscriber; import javax.jms.MessageListener; import javax.jms.ExceptionListener; import javax.jms.TopicConnectionFactory; import javax.naming.InitialContext; import com.sssw.jms.api.JMQConnectionLostException; /** This program shows how to write a fault-tolerant topic subscriber. If the jBroker MQ server fails, the program will enter a continous loop and attempt to reconnect to the server. When the server is restarted, the subscriber will continue to retrieve messages. */ public class Reconnect implements MessageListener, ExceptionListener { TopicSubscriber _subscriber; // topic subscriber /** Runs the program. @param args Program arguments. */ public static void main(String[] args) { | Reconnect ftp = new Reconnect(); } /** The constructor will start a reconnect thread and subscribe. */ public Reconnect() { | // create a thread that handles reconnection | Thread t = new Thread() { | | public void run() { | | | while (true) { | | | | reconnect(); // return value not used here | | | } | | } | }; | | t.start(); // won't start because I have lock ... | | // attempt to create a subscriber | try { | | _subscriber = createSubscriber(); | } catch (JMSException ex) { | | System.out.println("caught: " + ex); | | synchronized (this) { | | | notify(); // tigger reconnect thread | | } | } } /** Receive a message asynchronously. @param message The JMS message. */ public void onMessage(Message message) { | System.out.print(","); } /** This method tries to reconnect to the jBroker MQ server. @return True if reconnect succeeded, false otherwise. */ public synchronized boolean reconnect() { | doWait(); | System.out.print("trying to reconnect ..."); | for (int i = 0; i < 10; i++) { | | try { | | | _subscriber = createSubscriber(); | | | System.out.println("ok"); | | | return true; // succeeded | | } catch (Exception ex) { | | | System.out.print("."); | | | try { | | | | Thread.sleep(500); // sleep a little before trying again | | | } catch (InterruptedException exc) { } | | } | } | System.err.println("failed after 10 attempts"); | return false; // failed } /** Create a subscriber. @return Newly created topic subscriber. @exception JMSException If creation failed. This might for instance fail if the jBroker MQ server is not running. */ public TopicSubscriber createSubscriber() throws JMSException { | TopicSubscriber subscriber = null; | | try { | | InitialContext ctx = new InitialContext(); | | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | TopicConnectionFactory fac = (TopicConnectionFactory) ctx. | | lookup("topic/connectionFactory"); | | TopicConnection con = fac.createTopicConnection(); | | con.start(); | | TopicSession session = con.createTopicSession(false, | | Session.AUTO_ACKNOWLEDGE); | | con.setExceptionListener(this); | | subscriber = session.createSubscriber(topic); | } catch (Exception ex) { | | throw new JMSException("can't initialize: " + ex); | } | | subscriber.setMessageListener(this); | | return subscriber; } /** The JMS exception listener. @param ex JMS exception raised on client. */ public synchronized void onException(JMSException ex) { | if (ex instanceof JMQConnectionLostException) | { | | try { | | | _subscriber.close(); | | } catch (JMSException exc) { } | | _subscriber = null; | | System.out.println("caught: " + ex); | | notify(); // trigger reconnect thread | } | else | { | | ex.printStackTrace(); // other error handling ... | } } /** Utility to wait for notify. */ public synchronized void doWait() { | try { | | wait(); | } catch (InterruptedException exc) { | | exc.printStackTrace(); | } } }
The Reconnect
class uses a separate thread to perform re-connections. This
thread is normally waiting for another thread to instruct it to start the
re-connection process. If the exception listener detects a
JMQConnectionLostException
in the onException
method, it
notifies the re-connect thread and waits for it to succeed.
The reconnect
method calls the createSubscriber
method a number of times, which in turn attempts to re-establish the initial
context and re-subscribe the topic subscriber. Note that the Reconnect client
will enter the re-connect loop immediately if the server is unavailable when
the client first starts.
Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.