メッセージの同時処理

メッセージを同時に処理するには、接続コンシューマと呼ばれる高度なJMS機能を使用する必要があります。接続コンシューマは、名前が示すとおり、接続レベルで作成および管理されます。接続コンシューマは、pub/subとP2Pメッセージングの両方で利用できます。

接続コンシューマは、通常のコンシューマと共通点があります。接続コンシューマではセレクタを利用でき、pub/subモデルの場合、永続的な接続コンシューマを使用できます。接続コンシューマは、接続でcreateConnectionConsumerメソッドを使用して作成されます。

接続コンシューマのアーキテクチャは、メッセージ駆動型Beanのアプリケーションサーバコンテキストで使用されるように設計されています。メッセージ駆動型Beanは、両方のメッセージの同時送信に接続コンシューマを使用できます。また、トランザクションコードをBean開発者に対して非表示にすることもできます。

このアプリケーションには、次の3つのサンプルプログラムがあります。

接続コンシューマ

サーバセッションプール

サーバセッションおよびメッセージリスナ

1 接続コンシューマ

Concurrentクラスには、接続コンシューマを設定するメインコードが含まれています。この例では、トピックの非永続的接続コンシューマを作成する方法を示します。P2Pベースの接続コンシューマを作成するコードは、前に示したとおり非常に簡単です。

TopicConnectionクラスのcreateConnectionConsumerメソッドは次の4つのパラメータを受け取ります。

  1. この接続コンシューマが接続するトピック。
  2. メッセージセレクタ - 接続コンシューマがセレクタを必要としない場合nullです。
  3. サーバセッションプール - これは、着信メッセージを送信するサーバセッションを作成するファクトリです。 詳細については、以下の説明を参照してください。
  4. 各サーバセッションにより送信されるメッセージの最大数 - 最大数を大きくすると、サーバセッションで、メッセージを配信するスレッドコンテキストスイッチが少なくなります。
永続的トピック接続コンシューマを作成するには、createDurableConnectionConsumerメソッドを使用する必要があります。このメソッドには、永続的サブスクライバの名前を示すパラメータがあります。

接続コンシューマが作成されると、Concurrentクラスは、10,000メッセージをトピックにパブリッシュする、通常のトピックパブリッシュを作成します。Concurrentクラスのソースコードは、次のとおりです。

package connConsumer;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Topic;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.TopicSession;
import javax.jms.TopicPublisher;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.ConnectionConsumer;
import javax.jms.ServerSessionPool;
                                                                           
public class Concurrent
{
    public static void main(String[] args) throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the topic object
    |   Topic topic = (Topic) ctx.lookup("topic/topic0");
    |                                                                      
    |   // lookup the topic connection factory
    |   TopicConnectionFactory connFactory =
    |       (TopicConnectionFactory) ctx.lookup("topic/connectionFactory");
    |                                                                      
    |   // create a topic connection
    |   TopicConnection topicConn = connFactory.createTopicConnection();
    |                                                                      
    |   // create a server session pool
    |   MyServerSessionPool ssPool = new MyServerSessionPool(topicConn);
    |   topicConn.setExceptionListener(ssPool);
    |                                                                      
    |   // create a topic connection consumer
    |   ConnectionConsumer connConsumer =
    |       topicConn.createConnectionConsumer(topic, null, ssPool, 10);
    |                                                                      
    |   // start the connection
    |   topicConn.start();
    |                                                                      
    |   // send some messages to the newly created connection consumer
    |   int ackMode = Session.AUTO_ACKNOWLEDGE;
    |   TopicSession session = topicConn.createTopicSession(false, ackMode);
    |   TopicPublisher publisher = session.createPublisher(topic);
    |                                                                      
    |   Message msg = session.createMessage();
    |                                                                      
    |   for (int i = 0; i < 1000; i++)
    |   {
    |   |   publisher.publish(msg);
    |   }
    |                                                                      
    |   System.out.println("sent 1000 messages");
    |                                                                      
    |   publisher.close();
    |                                                                      
    |   // wait for connection consumer
    |   while (true)
    |   {
    |   |   Thread.sleep(10000);
    |   }
    }
}
常にお勧めしていることですが、接続には例外リスナが設定されています。この場合、サーバセッションプールも例外リスナです。この例では、メッセージの最大数が10に設定されていますが、各サーバセッションが送信する実際のメッセージ数は、トピックの負荷により異なります。

2 サーバセッションプール

サーバセッションプールは、着信メッセージをメッセージリスナに送信できるサーバセッションを作成します。実際のサーバセッションプールでは通常、オブジェクトが作成されないようにセッションのプールを保守しますが、ここでは、毎回新しいサーバセッションを作成する簡単なプールを作成します。
package connConsumer;
                                                                           
import javax.jms.TopicConnection;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.JMSException;
import javax.jms.ExceptionListener;
                                                                           
/**
   This is a simple implementation of a server session pool
   which does not manage a pool but merely created a new server
   session each time it is called.
 */
                                                                           
public class MyServerSessionPool
    implements ServerSessionPool, ExceptionListener
{
    private final TopicConnection _conn;
                                                                           
    MyServerSessionPool(TopicConnection conn)
    {
    |   _conn = conn;
    }
                                                                           
    public ServerSession getServerSession()
    {
    |   return new MyServerSession(_conn);
    }
                                                                           
    public void onException(JMSException ex)
    {
    |   ex.printStackTrace();
    }
}
上の例で示すように、
ServerSessionPoolインタフェースには、getServerSession というメソッドが1つあり、このメソッドはServerSessionオブジェクトを返す必要があります。 サーバセッションの詳細については、こちらを参照してください。 Concurrentクラスで説明したように、プールも例外リスナです。

サーバセッションプールは、接続コンシューマの作成時に1つのパラメータとして渡されるので、接続コンシューマに関連付けられています。プログラマは、getServerSessionを呼び出す必要はありません。これは、必要に応じて、JMSにより自動的に呼び出されます。

3 サーバセッションおよびメッセージリスナ

接続コンシューマの実装時に呼び出される最後のタスクは、セーバセッションです。サーバセッションは、着信メッセージをメッセージリスナに送信する実際の作業を実行します。通常通り、アプリケーション開発者は、メッセージリスナを実装する必要があります。

ServerSessionインタフェースは、次の2つのメソッドをサポートしています。

つまり、アプリケーションプログラマは、サーバセッションでメソッドを呼び出しません。これらは、メッセージがトピックに送信され、クライアントに送信する準備ができると、JMSにより自動的に呼び出されます。サーバセッションAPIが提供する機能により、アプリケーションは、メッセージを送信するスレッドを制御できます。

MyServerSessionクラスのソースコードは、次のとおりです。このクラスは、メッセージを送信する新しいスレッドを作成するだけの非常に簡単なstartメソッドの実装を含みます。

package connConsumer;
                                                                           
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.TopicSession;
import javax.jms.TopicConnection;
import javax.jms.ServerSession;
import javax.jms.MessageListener;
import javax.jms.JMSException;
                                                                           
/**
   This is a very simple implementation of a server session,
   which creates a new thread for performing asynchronous message
   processing each time it is called.
 */
                                                                           
public class MyServerSession implements ServerSession
{
    private final TopicConnection _conn;
    private       TopicSession    _topicSession;
                                                                           
    MyServerSession(TopicConnection conn)
    {
    |   _conn = conn;
    }
                                                                           
    // get or create the session for this server session
    // when creating a session a message listener is set
    public synchronized Session getSession() throws JMSException
    {
    |   if (_topicSession == null) {
    |   |   _topicSession = _conn.createTopicSession(false,
    |   |       Session.AUTO_ACKNOWLEDGE);
    |   |   MessageListener listener;
    |   |   listener = new MyMessageListener(_topicSession);
    |   |   _topicSession.setMessageListener(listener);
    |   }
    |                                                                      
    |   return _topicSession;
    }
                                                                           
    public void start() throws JMSException
    {
    |   Thread t = new Thread(_topicSession);
    |   t.start();
    }
                                                                           
    // a simple message listener that counts 100 messages
    static class MyMessageListener implements MessageListener
    {
    |   private final TopicSession _topicSession;
    |                                                                      
    |   MyMessageListener(TopicSession topicSession)
    |   {
    |   |   _topicSession = topicSession;
    |   }
    |                                                                      
    |   // must be thread-safe
    |   public void onMessage(Message msg)
    |   {
    |   |   if ( (_msgCount%100) == 0)
    |   |   {
    |   |   |   System.out.print(".");
    |   |   }
    |   |                                                                  
    |   |   if (++_msgCount == 1000)
    |   |   {
    |   |   |   System.out.println("done");
    |   |   |   System.exit(0);
    |   |   }
    |   }
    }
                                                                           
    static int _msgCount = 0;
}
セッションは、Runnableインタフェースを拡張します。つまり、引数としてセッションを使用しスレッドを作成できます。上の例では、runメソッドはスレッドの開始により暗黙的に呼び出されます。実際のサーバセッションでは、スレッドプールが使用されます。

サーバセッションは、MessageListenerを作成します。これは、セッションのsetMessageListenerメソッド使用して設定されます。このメソッドは、通常のコンシューマがないセッションにのみ呼び出されます。つまり、接続コンシューマで使用されるセッションに予約されています。

接続コンシューマを使用したメッセージ送信について、次にまとめてあります。

  1. 接続コンシューマは、サーバセッションプール、および一度に送信するメッセージの最大数を示す数値に関連します。
  2. JMSプロバイダは、接続コンシューマからいくつかのメッセージ受信すると、プールでgetServerSessionメソッドが呼び出され、メッセージ送信を実行するサーバセッションが取得されます。
  3. 次に、JMSは、サーバセッションでgetSessionメソッドを呼び出し、セッションを取得します。アプリケーションは、メッセージが送信されるこのセッションのメッセージリスナを設定する必要があります。
  4. 内部的に、JMSは送信の準備ができたメッセージとともに返されたセッションをロードします。メッセージの数は、接続コンシューマに設定されている最大数により制限されます。
  5. 最後に、JMSはサーバでstartを呼び出し、送信の準備ができたことを示します。次に、サーバセッションは、そのセッションでrunメソッドを呼び出す必要があります。
  6. セッションのrunメソッドを実行すると(通常サーバセッションで作成されるスレッドを使用します)、メッセージがセッションのメッセージリスナのonMessageメソッドに同時に配信されます。
複数のスレッドは同時に実行されるので、セッションのメッセージリスナのonMessageメソッドは、スレッドセーフであることが必要です。接続コンシューマアーキテクチャを使用した場合、メッセージ順序が、メッセージ同時配信の性質の影響を受けます。

トップに戻る


Copyright © 2000-2003, Novell, Inc.All rights reserved.