To enable concurrent processing of messages, you need to use an advanced JMS facility called a connection consumer. As the name indicates, these consumers are created and managed at the connection level. There is a connection consumer for both pub/sub and P2P messaging.
A connection consumer is is some respects similar to a regular consumer.
It supports selectors and in the pub/sub model, you can have connection
consumers, which are durable. A connection consumer is created using
the createConnectionConsumer
method on the connection.
The connection consumer architecture was designed to be used in an application server context for message driven beans. A message driven bean can use a connection consumer for both concurrent dispatch of messages, but also to hide transaction code from the bean developer.
There are three sample programs for this applications:
The Concurrent class contains the main code for setting up a connection consumer. This example shows how to create a non-durable connection consumer for a topic. The code for creating a P2P based connection consumer is very similar as seen earlier.
The createConnectionConsumer
method on the
TopicConnection
class takes four parameters:
The topic to which this connection consumer should connect.
The message selector, which should be null
if a
connection consumer does not require a selector.
A server session pool. This is a factory that creates server sessions for dispatching incoming messages. See below for more details.
The maximum number of messages that will be dispatched by each server session. A higher number will cause the server session to make fewer thread context switches to dispatch messages.
To create a durable topic connection consumer, you need to use the
createDurableConnectionConsumer
method, which has an additional
parameter to indicate the name of the durable subscriber.
Once the connection consumer has been created, the Concurrent class creates a regular topic publisher, which publishes 10,000 messages to the topic. The source code for the Concurrent class is shown below:
package connConsumer; import javax.naming.InitialContext; import javax.jms.Topic; import javax.jms.Session; import javax.jms.Message; import javax.jms.TopicSession; import javax.jms.TopicPublisher; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.ConnectionConsumer; import javax.jms.ServerSessionPool; public class Concurrent { public static void main(String[] args) throws Exception { | // get the initial context | InitialContext ctx = new InitialContext(); | | // lookup the topic object | Topic topic = (Topic) ctx.lookup("topic/topic0"); | | // lookup the topic connection factory | TopicConnectionFactory connFactory = | (TopicConnectionFactory) ctx.lookup("topic/connectionFactory"); | | // create a topic connection | TopicConnection topicConn = connFactory.createTopicConnection(); | | // create a server session pool | MyServerSessionPool ssPool = new MyServerSessionPool(topicConn); | topicConn.setExceptionListener(ssPool); | | // create a topic connection consumer | ConnectionConsumer connConsumer = | topicConn.createConnectionConsumer(topic, null, ssPool, 10); | | // start the connection | topicConn.start(); | | // send some messages to the newly created connection consumer | int ackMode = Session.AUTO_ACKNOWLEDGE; | TopicSession session = topicConn.createTopicSession(false, ackMode); | TopicPublisher publisher = session.createPublisher(topic); | | Message msg = session.createMessage(); | | for (int i = 0; i < 1000; i++) | { | | publisher.publish(msg); | } | | System.out.println("sent 1000 messages"); | | publisher.close(); | | // wait for connection consumer | while (true) | { | | Thread.sleep(10000); | } } }
As is always recommended, we set an exception listener on the connection. In this case, the server session pool is also an ExceptionListener. Although the maximum number of messages is set to 10 in this example, the actual number of messages that each server session gets to dispatch will depend on the load on the topic.
A server session pool is responsible for creating server sessions that can dispatch incoming messages to a message listener. A real-world server session pool would typically maintain a pool of sessions to avoid constructing objects but here we just have a simple pool that create a new server session each time:
package connConsumer; import javax.jms.TopicConnection; import javax.jms.ServerSession; import javax.jms.ServerSessionPool; import javax.jms.JMSException; import javax.jms.ExceptionListener; /** This is a simple implementation of a server session pool which does not manage a pool but merely created a new server session each time it is called. */ public class MyServerSessionPool implements ServerSessionPool, ExceptionListener { private final TopicConnection _conn; MyServerSessionPool(TopicConnection conn) { | _conn = conn; } public ServerSession getServerSession() { | return new MyServerSession(_conn); } public void onException(JMSException ex) { | ex.printStackTrace(); } }
As can be seen above, the
ServerSessionPool
interface has a single method getServerSession
which must return a
ServerSession
object. The server session is desribed in more detail below.
As described in the Concurrent class,
the pool is also an exception listener.
The server session pool is associated with the connection consumer since
it is passed in as one of the parameters when creating the connection consumer.
A programmer will not need to call the getServerSession
as this
is done automatically by JMS as needed.
The last task involved when implementing a connection consumer is the server session. The server session performs the actual work of dispatching incoming messages to a message listener. As always, the application developer is responsible for implementing the message listener.
The ServerSession interface supports two methods:
getSession
- this method must return a session object
which corresponds to the connection type, e.g. in this case a
topic session.
start
- this method is automatically called by JMS when
the session has been loaded with messages and is ready to dispatch
messages to the message listener. The start
method is
required to call the run
method of the session to
make the message dispatch happen.
In summary, an application programmer would typically never call the methods on a server session. These are automatically called by JMS when messages have arrived at a topic and are ready for dispatch on the client. What the server session API provides is the ability for an application to control the thread which gets to dispatch messages.
Below is the source code for the MyServerSession class, which has a very
simple implementation of the start
method, which just creates
a new thread to dispatch the messages:
package connConsumer; import javax.jms.Session; import javax.jms.Message; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.ServerSession; import javax.jms.MessageListener; import javax.jms.JMSException; /** This is a very simple implementation of a server session, which creates a new thread for performing asynchronous message processing each time it is called. */ public class MyServerSession implements ServerSession { private final TopicConnection _conn; private TopicSession _topicSession; MyServerSession(TopicConnection conn) { | _conn = conn; } // get or create the session for this server session // when creating a session a message listener is set public synchronized Session getSession() throws JMSException { | if (_topicSession == null) { | | _topicSession = _conn.createTopicSession(false, | | Session.AUTO_ACKNOWLEDGE); | | MessageListener listener; | | listener = new MyMessageListener(_topicSession); | | _topicSession.setMessageListener(listener); | } | | return _topicSession; } public void start() throws JMSException { | Thread t = new Thread(_topicSession); | t.start(); } // a simple message listener that counts 100 messages static class MyMessageListener implements MessageListener { | private final TopicSession _topicSession; | | MyMessageListener(TopicSession topicSession) | { | | _topicSession = topicSession; | } | | // must be thread-safe | public void onMessage(Message msg) | { | | if ( (_msgCount%100) == 0) | | { | | | System.out.print("."); | | } | | | | if (++_msgCount == 1000) | | { | | | System.out.println("done"); | | | System.exit(0); | | } | } } static int _msgCount = 0; }
A session extends the Runnable interface, which means that a thread can
be created with the session as it argument. It the above case, the
run
method is called implicitly by starting the thread. A
real world server session would typically use a thread pool.
The server session creates a MessageListener, which is set using the session's
setMessageListener
method. This method can only be called for
sessions which do not have any regular consumers, i.e. it is reserved for
session used by connection consumers.
Below is a summary of what happens for message dispatch using connection consumers:
A connection consumer is associated with a server session pool and a number which indicates the maximum number of messages to dispatch at a time.
When the JMS provider receives some messages for a connection consumer
is invoked the getServerSession
method on the pool to get
a server session (thread) to execute the message dispatch.
Then JMS invokes the getSession
method on the server
session to get a session. The application must set a message
listener on this session to which messages are delivered.
Internally, JMS loads the returned session up with the messages that are ready for dispatch. The number of messages is limited by the maximum set for the connection consumer.
Finally, JMS calls start
on the server server to indicate
that it is ready to dispatch. The server session must in turn call
the run
method on the session.
When the session's run
method executes (possibly using a
thread created by the server session) messages are delivered
concurrently to the onMessage
method of the session's
message listener.
Since multiple threads can execute concurrently, the onMessage
method on the session's message listener must be thread-safe. Another
consequence of the connection consumer architecture is that message order
may be impacted due to the concurrent nature of message dispatching.
Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.