メッセージを同時に処理するには、接続コンシューマと呼ばれる高度なJMS機能を使用する必要があります。接続コンシューマは、名前が示すとおり、接続レベルで作成および管理されます。接続コンシューマは、pub/subとP2Pメッセージングの両方で利用できます。接続コンシューマは、通常のコンシューマと共通点があります。接続コンシューマではセレクタを利用でき、pub/subモデルの場合、永続的な接続コンシューマを使用できます。接続コンシューマは、接続で
createConnectionConsumer
メソッドを使用して作成されます。接続コンシューマのアーキテクチャは、メッセージ駆動型Beanのアプリケーションサーバコンテキストで使用されるように設計されています。メッセージ駆動型Beanは、両方のメッセージの同時送信に接続コンシューマを使用できます。また、トランザクションコードをBean開発者に対して非表示にすることもできます。
このアプリケーションには、次の3つのサンプルプログラムがあります。
1 接続コンシューマ
Concurrentクラスには、接続コンシューマを設定するメインコードが含まれています。この例では、トピックの非永続的接続コンシューマを作成する方法を示します。P2Pベースの接続コンシューマを作成するコードは、前に示したとおり非常に簡単です。TopicConnectionクラスの
createConnectionConsumer
メソッドは次の4つのパラメータを受け取ります。永続的トピック接続コンシューマを作成するには、
- この接続コンシューマが接続するトピック。
- メッセージセレクタ - 接続コンシューマがセレクタを必要としない場合
null
です。- サーバセッションプール - これは、着信メッセージを送信するサーバセッションを作成するファクトリです。 詳細については、以下の説明を参照してください。
- 各サーバセッションにより送信されるメッセージの最大数 - 最大数を大きくすると、サーバセッションで、メッセージを配信するスレッドコンテキストスイッチが少なくなります。
createDurableConnectionConsumer
メソッドを使用する必要があります。このメソッドには、永続的サブスクライバの名前を示すパラメータがあります。接続コンシューマが作成されると、Concurrentクラスは、10,000メッセージをトピックにパブリッシュする、通常のトピックパブリッシュを作成します。Concurrentクラスのソースコードは、次のとおりです。
常にお勧めしていることですが、接続には例外リスナが設定されています。この場合、サーバセッションプールも例外リスナです。この例では、メッセージの最大数が10に設定されていますが、各サーバセッションが送信する実際のメッセージ数は、トピックの負荷により異なります。package connConsumer; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.Message; import javax.jms.TopicSession; import javax.jms.TopicPublisher; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.ConnectionConsumer; import javax.jms.ServerSessionPool; public class Concurrent { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = | (TopicConnectionFactory) ctx.lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | | // create a server session pool | MyServerSessionPool ssPool = new MyServerSessionPool(topicConn); | topicConn.setExceptionListener(ssPool); | | // create a topic connection consumer | ConnectionConsumer connConsumer = | topicConn.createConnectionConsumer(topic, null, ssPool, 10); | | // start the connection | topicConn.start(); | | // send some messages to the newly created connection consumer | int ackMode = Session.AUTO_ACKNOWLEDGE; | TopicSession session = topicConn.createTopicSession(false, ackMode); | TopicPublisher publisher = session.createPublisher(topic); | | Message msg = session.createMessage(); | | for (int i = 0; i < 1000; i++) | { | | publisher.publish(msg); | } | | System.out.println("sent 1000 messages"); | | publisher.close(); | | // wait for connection consumer | while (true) | { | | Thread.sleep(10000); | } } }2 サーバセッションプール
サーバセッションプールは、着信メッセージをメッセージリスナに送信できるサーバセッションを作成します。実際のサーバセッションプールでは通常、オブジェクトが作成されないようにセッションのプールを保守しますが、ここでは、毎回新しいサーバセッションを作成する簡単なプールを作成します。上の例で示すように、ServerSessionPoolインタフェースには、package connConsumer; import javax.jms.TopicConnection; import javax.jms.ServerSession; import javax.jms.ServerSessionPool; import javax.jms.JMSException; import javax.jms.ExceptionListener; /** This is a simple implementation of a server session pool which does not manage a pool but merely created a new server session each time it is called. */ public class MyServerSessionPool implements ServerSessionPool, ExceptionListener { private final TopicConnection _conn; MyServerSessionPool(TopicConnection conn) { | _conn = conn; } public ServerSession getServerSession() { | return new MyServerSession(_conn); } public void onException(JMSException ex) { | ex.printStackTrace(); } }getServerSession
というメソッドが1つあり、このメソッドはServerSessionオブジェクトを返す必要があります。 サーバセッションの詳細については、こちらを参照してください。 Concurrentクラスで説明したように、プールも例外リスナです。サーバセッションプールは、接続コンシューマの作成時に1つのパラメータとして渡されるので、接続コンシューマに関連付けられています。プログラマは、
getServerSession
を呼び出す必要はありません。これは、必要に応じて、JMSにより自動的に呼び出されます。3 サーバセッションおよびメッセージリスナ
接続コンシューマの実装時に呼び出される最後のタスクは、セーバセッションです。サーバセッションは、着信メッセージをメッセージリスナに送信する実際の作業を実行します。通常通り、アプリケーション開発者は、メッセージリスナを実装する必要があります。ServerSessionインタフェースは、次の2つのメソッドをサポートしています。
つまり、アプリケーションプログラマは、サーバセッションでメソッドを呼び出しません。これらは、メッセージがトピックに送信され、クライアントに送信する準備ができると、JMSにより自動的に呼び出されます。サーバセッションAPIが提供する機能により、アプリケーションは、メッセージを送信するスレッドを制御できます。
getSession
- 接続タイプ(この場合トピックセッション)に対応するセッションオブジェクトを返します。start
- セッションにメッセージがロードされ、メッセージがメッセージリスナに送信できるようになると、JMSにより自動的に呼び出されます。start
は、セッションのrun
メソッドを呼び出して、メッセージを送信するときに必要です。MyServerSessionクラスのソースコードは、次のとおりです。このクラスは、メッセージを送信する新しいスレッドを作成するだけの非常に簡単な
start
メソッドの実装を含みます。セッションは、Runnableインタフェースを拡張します。つまり、引数としてセッションを使用しスレッドを作成できます。上の例では、package connConsumer; import javax.jms.Session; import javax.jms.Message; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.ServerSession; import javax.jms.MessageListener; import javax.jms.JMSException; /** This is a very simple implementation of a server session, which creates a new thread for performing asynchronous message processing each time it is called. */ public class MyServerSession implements ServerSession { private final TopicConnection _conn; private TopicSession _topicSession; MyServerSession(TopicConnection conn) { | _conn = conn; } // get or create the session for this server session // when creating a session a message listener is set public synchronized Session getSession() throws JMSException { | if (_topicSession == null) { | | _topicSession = _conn.createTopicSession(false, | | Session.AUTO_ACKNOWLEDGE); | | MessageListener listener; | | listener = new MyMessageListener(_topicSession); | | _topicSession.setMessageListener(listener); | } | | return _topicSession; } public void start() throws JMSException { | Thread t = new Thread(_topicSession); | t.start(); } // a simple message listener that counts 100 messages static class MyMessageListener implements MessageListener { | private final TopicSession _topicSession; | | MyMessageListener(TopicSession topicSession) | { | | _topicSession = topicSession; | } | | // must be thread-safe | public void onMessage(Message msg) | { | | if ( (_msgCount%100) == 0) | | { | | | System.out.print("."); | | } | | | | if (++_msgCount == 1000) | | { | | | System.out.println("done"); | | | System.exit(0); | | } | } } static int _msgCount = 0; }run
メソッドはスレッドの開始により暗黙的に呼び出されます。実際のサーバセッションでは、スレッドプールが使用されます。サーバセッションは、MessageListenerを作成します。これは、セッションの
setMessageListener
メソッド使用して設定されます。このメソッドは、通常のコンシューマがないセッションにのみ呼び出されます。つまり、接続コンシューマで使用されるセッションに予約されています。接続コンシューマを使用したメッセージ送信について、次にまとめてあります。
複数のスレッドは同時に実行されるので、セッションのメッセージリスナの
- 接続コンシューマは、サーバセッションプール、および一度に送信するメッセージの最大数を示す数値に関連します。
- JMSプロバイダは、接続コンシューマからいくつかのメッセージ受信すると、プールで
getServerSession
メソッドが呼び出され、メッセージ送信を実行するサーバセッションが取得されます。- 次に、JMSは、サーバセッションで
getSession
メソッドを呼び出し、セッションを取得します。アプリケーションは、メッセージが送信されるこのセッションのメッセージリスナを設定する必要があります。- 内部的に、JMSは送信の準備ができたメッセージとともに返されたセッションをロードします。メッセージの数は、接続コンシューマに設定されている最大数により制限されます。
- 最後に、JMSはサーバで
start
を呼び出し、送信の準備ができたことを示します。次に、サーバセッションは、そのセッションでrun
メソッドを呼び出す必要があります。- セッションの
run
メソッドを実行すると(通常サーバセッションで作成されるスレッドを使用します)、メッセージがセッションのメッセージリスナのonMessage
メソッドに同時に配信されます。onMessage
メソッドは、スレッドセーフであることが必要です。接続コンシューマアーキテクチャを使用した場合、メッセージ順序が、メッセージ同時配信の性質の影響を受けます。
Copyright © 2000-2003, Novell, Inc.All rights reserved. |