例外リスナ

Novell exteNd Messaging PlatformのJMSでは、非同期的に発生する重大なエラーを例外リスナに通知する場合に使用される、いくつかの特殊な例外がサポートされています。 このセクションでは、JMSサーバが停止またはクラッシュしたかどうかを検出する方法、およびメッセージリスナonMessageメソッドを利用して配信できないメッセージを回復する方法を示す3つの例を挙げます。

切断された接続

onMessageの例外

サーバクラッシュ後の再接続

1 切断された接続

Clientクライアントアプリケーションは、例外リスナのonExceptionメソッドでJMQConnectionLostExceptionをリッスンして、サーバが停止またはクラッシュしているかどうか検出できます。この例外は、サーバのソケットが壊れていて再接続できない場合に発生します。

次の例では、トピックのパブリッシャおよびサブスクライバを作成する一般的な発行/購読アプリケーションを示します。この例の場合、コードの重要な部分はJMQConnectionLostExceptionをチェックするonExceptionの実装です。

package exceptionListener;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.QueueSender;
import javax.jms.QueueReceiver;
import javax.jms.QueueConnection;
import javax.jms.MessageListener;
import javax.jms.ExceptionListener;
import javax.jms.QueueConnectionFactory;
                                                                           
import com.sssw.jms.api.JMQConnectionLostException;
                                                                           
public class LostConnection implements ExceptionListener
{
    public static void main(String[] args) throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue object
    |   Queue queue = (Queue) ctx.lookup("queue/queue0");
    |                                                                      
    |   // lookup the queue connection factory
    |   QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.
    |       lookup("queue/connectionFactory");
    |                                                                      
    |   // create a queue connection
    |   QueueConnection queueConn = connFactory.createQueueConnection();
    |                                                                      
    |   // set an exception listener
    |   LostConnection lost = new LostConnection();
    |   queueConn.setExceptionListener(lost);
    |                                                                      
    |   // create a queue session
    |   QueueSession queueSession = queueConn.createQueueSession(false,
    |       Session.AUTO_ACKNOWLEDGE);
    |                                                                      
    |   // create a queue receiver
    |   QueueReceiver queueReceiver = queueSession.createReceiver(queue);
    |                                                                      
    |   // set message listener that prints messages
    |   queueReceiver.setMessageListener(new MessageListener() {
    |   |   public void onMessage(Message message) {
    |   |   |   System.out.println(message);
    |   |   }
    |   });
    |                                                                      
    |   // start the connection
    |   queueConn.start();
    |                                                                      
    |   // create a queue sender
    |   QueueSender queueSender = queueSession.createSender(queue);
    |                                                                      
    |   // create and send a message
    |   TextMessage message = queueSession.createTextMessage("hello");
    |   queueSender.send(message);
    |                                                                      
    |   synchronized (lost) {
    |   |   while (lost.active) lost.wait();
    |   }
    }
                                                                           
    public synchronized void onException(JMSException ex)
    {
    |   if (ex instanceof JMQConnectionLostException)
    |   {
    |   |   System.out.println("server down");
    |   |   active = false;
    |   |   notify();
    |   }
    |   else
    |   {
    |   |   System.out.println("unexpected exception: " + ex);
    |   }
    }
                                                                           
    private boolean active = true;
}
例外リスナによりJMQConnectionLostExceptionが検出された場合、エラーメッセージが出力され、サーバがアクティブでなくなったことが他のスレッドに通知されます。 より洗練されたクライアントアプリケーションは、サーバを再接続するループを開始できます。

2 onMessageの例外

非同期的に発生する別のエラー状況は、メッセージリスナのonMessageメソッドにメッセージを配信するときの予期せぬ例外です。 JMSサーバは、各メッセージを複数回配信しようとしますが、onMessageメソッドにより例外がスローされ続けると、メッセージは配信されなくなります。

このような未配信メッセージを回復するには、コンシューマアプリケーションにより例外リスナを登録して、JMQMessageNotDeliveredException'sを参照する必要があります。この例外には該当するメッセージを取得するメソッドが含まれており、アプリケーションはこれを処理できます。例は、次のとおりです。

package exceptionListener;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Topic;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.TopicSession;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicConnection;
import javax.jms.MessageListener;
import javax.jms.ExceptionListener;
import javax.jms.TopicConnectionFactory;
                                                                           
import com.sssw.jms.api.JMQMessageNotDeliveredException;
                                                                           
public class LostMessage implements ExceptionListener
{
    public static void main(String[] args) throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the topic object
    |   Topic topic = (Topic) ctx.lookup("topic/topic0");
    |                                                                      
    |   // lookup the topic connection factory
    |   TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.
    |       lookup("topic/connectionFactory");
    |                                                                      
    |   // create a topic connection
    |   TopicConnection topicConn = connFactory.createTopicConnection();
    |                                                                      
    |   // set an exception listener
    |   LostMessage lost = new LostMessage();
    |   topicConn.setExceptionListener(lost);
    |                                                                      
    |   // create a topic session
    |   TopicSession topicSession = topicConn.createTopicSession(false,
    |       Session.AUTO_ACKNOWLEDGE);
    |                                                                      
    |   // create a topic subscriber
    |   TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
    |                                                                      
    |   // set message listener that always raises exception
    |   topicSubscriber.setMessageListener(new MessageListener() {
    |   |   public void onMessage(Message message) {
    |   |   |   throw new RuntimeException("unexpected");
    |   |   }
    |   });
    |                                                                      
    |   // start the connection
    |   topicConn.start();
    |                                                                      
    |   // create a topic publisher
    |   TopicPublisher topicPublisher = topicSession.createPublisher(topic);
    |                                                                      
    |   // create and send a message
    |   TextMessage message = topicSession.createTextMessage("hello");
    |   topicPublisher.publish(message);
    |                                                                      
    |   // wait for message delivery
    |   lost.waitForMessage();
    |                                                                      
    |   // close the topic connection
    |   topicConn.close();
    }
                                                                           
    public synchronized void waitForMessage()
    {
    |   while (!delivered) {
    |   |   try {
    |   |   |   wait();
    |   |   } catch (InterruptedException ex) { }
    |   }
    }
                                                                           
    public synchronized void onException(JMSException ex)
    {
    |   if (ex instanceof JMQMessageNotDeliveredException)
    |   {
    |   |   JMQMessageNotDeliveredException mnde =
    |   |       (JMQMessageNotDeliveredException) ex;
    |   |                                                                  
    |   |   System.out.println("message = " + mnde.getJMSMessage());
    |   |   System.out.println("exception = " + mnde.getLinkedException());
    |   |                                                                  
    |   |   delivered = true;
    |   |   notify();
    |   }
    |   else
    |   {
    |   |   System.out.println("unexpected exception: " + ex);
    |   }
    }
                                                                           
    private boolean delivered = false;
}
上のLostMessageクラスに示されているように、この例では、RuntimeExceptionを常にスローして例外をトリガするメッセージリスナをコンシューマとともに登録しています。JMQMessageNotDeliveredExceptionクラスでは、getJMSMessageメソッドがサポートされています。このメソッドは、JMSにより配信できなかったメッセージを取得する場合に使用できます。

JMQMessageNotDeliveredExceptionは、通常の/JMSExceptionが拡張されたものなので、問題の原因を返すgetLinkedExceptionメソッドを使用することもできます。エラーによっては、リンクされた例外はNULLになることもあるので注意してください。

3 サーバクラッシュ後の再接続

このセクションの最後の例では、クラッシュが検出された場合にサーバに再接続するロジックを含めることにより、上の 切断された接続 の例を拡張します。次のReconnectクラスは、JMQConnectionLostExceptionが発生したときに繰り返し再接続を試みるトピックサブスクライバです。
package exceptionListener;
                                                                           
import javax.jms.Topic;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.JMSException;
import javax.jms.TopicSession;
import javax.jms.TopicConnection;
import javax.jms.TopicSubscriber;
import javax.jms.MessageListener;
import javax.jms.ExceptionListener;
import javax.jms.TopicConnectionFactory;
                                                                           
import javax.naming.InitialContext;
                                                                           
import com.sssw.jms.api.JMQConnectionLostException;
                                                                           
/**
   This program shows how to write a fault-tolerant topic subscriber.
   If the jBroker MQ server fails, the program will enter a continous
   loop and attempt to reconnect to the server.When the server is
   restarted, the subscriber will continue to retrieve messages.
 */
                                                                           
public class Reconnect implements MessageListener, ExceptionListener
{
    TopicSubscriber _subscriber; // topic subscriber
                                                                           
    /**
       Runs the program.
       @param args Program arguments.
     */
    public static void main(String[] args)
    {
    |   Reconnect ftp = new Reconnect();
    }
                                                                           
    /**
       The constructor will start a reconnect thread and subscribe.
     */
    public Reconnect()
    {
    |   // create a thread that handles reconnection
    |   Thread t = new Thread() {
    |   |   public void run() {
    |   |   |   while (true) {
    |   |   |   |   reconnect(); // return value not used here
    |   |   |   }
    |   |   }
    |   };
    |                                                                      
    |   t.start(); // won't start because I have lock ...
    |                                                                      
    |   // attempt to create a subscriber
    |   try {
    |   |   _subscriber = createSubscriber();
    |   } catch (JMSException ex) {
    |   |   System.out.println("caught: " + ex);
    |   |   synchronized (this) {
    |   |   |   notify(); // tigger reconnect thread
    |   |   }
    |   }
    }
                                                                           
    /**
       Receive a message asynchronously.
       @param message The JMS message.
     */
    public void onMessage(Message message)
    {
    |   System.out.print(",");
    }
                                                                           
    /**
       This method tries to reconnect to the jBroker MQ server.
       @return True if reconnect succeeded, false otherwise.
     */
    public synchronized boolean reconnect() {
    |   doWait();
    |   System.out.print("trying to reconnect ...");
    |   for (int i = 0; i < 10; i++) {
    |   |   try {
    |   |   |   _subscriber = createSubscriber();
    |   |   |   System.out.println("ok");
    |   |   |   return true; // succeeded
    |   |   } catch (Exception ex) {
    |   |   |   System.out.print(".");
    |   |   |   try {
    |   |   |   |   Thread.sleep(500); // sleep a little before trying again
    |   |   |   } catch (InterruptedException exc) { }
    |   |   }
    |   }
    |   System.err.println("failed after 10 attempts");
    |   return false; // failed
    }
                                                                           
    /**
       Create a subscriber.
       @return Newly created topic subscriber.
       @exception JMSException If creation failed.This might for instance
       fail if the jBroker MQ server is not running.
     */
    public TopicSubscriber createSubscriber() throws JMSException
    {
    |   TopicSubscriber subscriber = null;
    |                                                                      
    |   try {
    |   |   InitialContext ctx = new InitialContext();
    |   |   Topic topic = (Topic) ctx.lookup("topic/topic0");
    |   |   TopicConnectionFactory fac = (TopicConnectionFactory) ctx.
    |   |       lookup("topic/connectionFactory");
    |   |   TopicConnection con = fac.createTopicConnection();
    |   |   con.start();
    |   |   TopicSession session = con.createTopicSession(false,
    |   |       Session.AUTO_ACKNOWLEDGE);
    |   |   con.setExceptionListener(this);
    |   |   subscriber = session.createSubscriber(topic);
    |   } catch (Exception ex) {
    |   |   throw new JMSException("can't initialize: " + ex);
    |   }
    |                                                                      
    |   subscriber.setMessageListener(this);
    |                                                                      
    |   return subscriber;
    }
                                                                           
    /**
       The JMS exception listener.
       @param ex JMS exception raised on client.
     */
    public synchronized void onException(JMSException ex)
    {
    |   if (ex instanceof JMQConnectionLostException)
    |   {
    |   |   try {
    |   |   |   _subscriber.close();
    |   |   } catch (JMSException exc) { }
    |   |   _subscriber = null;
    |   |   System.out.println("caught: " + ex);
    |   |   notify(); // trigger reconnect thread
    |   }
    |   else
    |   {
    |   |   ex.printStackTrace(); // other error handling ...
    |   }
    }
                                                                           
    /**
       Utility to wait for notify.
     */
    public synchronized void doWait() {
    |   try {
    |   |   wait();
    |   } catch (InterruptedException exc) {
    |   |   exc.printStackTrace();
    |   }
    }
}
Reconnectクラスは、異なるスレッドを使用して、再接続を実行します。このスレッドは通常、別のスレッドにより再接続プロセスの開始が指示されるのを待機します。例外リスナがonExceptionメソッドでJMQConnectionLostExceptionを検出すると、再接続スレッドに通知し、正常に実行されるまで待機します。

reconnectメソッドは、createSubscriberメソッドを複数回呼び出します。createSubscriberメソッドは、初期コンテキストを再確立して、トピックサブスクライバを再びサブスクライブしようとします。Reconnectクライアントは、クライアントの最初起動時にサーバが使用できない場合、すぐに再接続ループに入ります。

トップに戻る


Copyright © 2000-2003, Novell, Inc.All rights reserved.