Concurrent Processing of Messages

To enable concurrent processing of messages, you need to use an advanced JMS facility called a connection consumer. As the name indicates, these consumers are created and managed at the connection level. There is a connection consumer for both pub/sub and P2P messaging.

A connection consumer is is some respects similar to a regular consumer. It supports selectors and in the pub/sub model, you can have connection consumers, which are durable. A connection consumer is created using the createConnectionConsumer method on the connection.

The connection consumer architecture was designed to be used in an application server context for message driven beans. A message driven bean can use a connection consumer for both concurrent dispatch of messages, but also to hide transaction code from the bean developer.

There are three sample programs for this applications:

Connection Consumer

Server Session Pool

Server Session and Message Listener

Connection Consumer

The Concurrent class contains the main code for setting up a connection consumer. This example shows how to create a non-durable connection consumer for a topic. The code for creating a P2P based connection consumer is very similar as seen earlier.

The createConnectionConsumer method on the TopicConnection class takes four parameters:

  1. The topic to which this connection consumer should connect.

  2. The message selector, which should be null if a connection consumer does not require a selector.

  3. A server session pool. This is a factory that creates server sessions for dispatching incoming messages. See below for more details.

  4. The maximum number of messages that will be dispatched by each server session. A higher number will cause the server session to make fewer thread context switches to dispatch messages.

To create a durable topic connection consumer, you need to use the createDurableConnectionConsumer method, which has an additional parameter to indicate the name of the durable subscriber.

Once the connection consumer has been created, the Concurrent class creates a regular topic publisher, which publishes 10,000 messages to the topic. The source code for the Concurrent class is shown below:

package connConsumer;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Topic;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.TopicSession;
import javax.jms.TopicPublisher;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.ConnectionConsumer;
import javax.jms.ServerSessionPool;
                                                                           
public class Concurrent
{
    public static void main(String[] args) throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the topic object
    |   Topic topic = (Topic) ctx.lookup("topic/topic0");
    |                                                                      
    |   // lookup the topic connection factory
    |   TopicConnectionFactory connFactory =
    |       (TopicConnectionFactory) ctx.lookup("topic/connectionFactory");
    |                                                                      
    |   // create a topic connection
    |   TopicConnection topicConn = connFactory.createTopicConnection();
    |                                                                      
    |   // create a server session pool
    |   MyServerSessionPool ssPool = new MyServerSessionPool(topicConn);
    |   topicConn.setExceptionListener(ssPool);
    |                                                                      
    |   // create a topic connection consumer
    |   ConnectionConsumer connConsumer =
    |       topicConn.createConnectionConsumer(topic, null, ssPool, 10);
    |                                                                      
    |   // start the connection
    |   topicConn.start();
    |                                                                      
    |   // send some messages to the newly created connection consumer
    |   int ackMode = Session.AUTO_ACKNOWLEDGE;
    |   TopicSession session = topicConn.createTopicSession(false, ackMode);
    |   TopicPublisher publisher = session.createPublisher(topic);
    |                                                                      
    |   Message msg = session.createMessage();
    |                                                                      
    |   for (int i = 0; i < 1000; i++)
    |   {
    |   |   publisher.publish(msg);
    |   }
    |                                                                      
    |   System.out.println("sent 1000 messages");
    |                                                                      
    |   publisher.close();
    |                                                                      
    |   // wait for connection consumer
    |   while (true)
    |   {
    |   |   Thread.sleep(10000);
    |   }
    }
}

As is always recommended, we set an exception listener on the connection. In this case, the server session pool is also an ExceptionListener. Although the maximum number of messages is set to 10 in this example, the actual number of messages that each server session gets to dispatch will depend on the load on the topic.

Server Session Pool

A server session pool is responsible for creating server sessions that can dispatch incoming messages to a message listener. A real-world server session pool would typically maintain a pool of sessions to avoid constructing objects but here we just have a simple pool that create a new server session each time:

package connConsumer;
                                                                           
import javax.jms.TopicConnection;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.JMSException;
import javax.jms.ExceptionListener;
                                                                           
/**
   This is a simple implementation of a server session pool
   which does not manage a pool but merely created a new server
   session each time it is called.
 */
                                                                           
public class MyServerSessionPool
    implements ServerSessionPool, ExceptionListener
{
    private final TopicConnection _conn;
                                                                           
    MyServerSessionPool(TopicConnection conn)
    {
    |   _conn = conn;
    }
                                                                           
    public ServerSession getServerSession()
    {
    |   return new MyServerSession(_conn);
    }
                                                                           
    public void onException(JMSException ex)
    {
    |   ex.printStackTrace();
    }
}

As can be seen above, the ServerSessionPool interface has a single method getServerSession which must return a ServerSession object. The server session is desribed in more detail below. As described in the Concurrent class, the pool is also an exception listener.

The server session pool is associated with the connection consumer since it is passed in as one of the parameters when creating the connection consumer. A programmer will not need to call the getServerSession as this is done automatically by JMS as needed.

Server Session and Message Listener

The last task involved when implementing a connection consumer is the server session. The server session performs the actual work of dispatching incoming messages to a message listener. As always, the application developer is responsible for implementing the message listener.

The ServerSession interface supports two methods:

In summary, an application programmer would typically never call the methods on a server session. These are automatically called by JMS when messages have arrived at a topic and are ready for dispatch on the client. What the server session API provides is the ability for an application to control the thread which gets to dispatch messages.

Below is the source code for the MyServerSession class, which has a very simple implementation of the start method, which just creates a new thread to dispatch the messages:

package connConsumer;
                                                                           
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.TopicSession;
import javax.jms.TopicConnection;
import javax.jms.ServerSession;
import javax.jms.MessageListener;
import javax.jms.JMSException;
                                                                           
/**
   This is a very simple implementation of a server session,
   which creates a new thread for performing asynchronous message
   processing each time it is called.
 */
                                                                           
public class MyServerSession implements ServerSession
{
    private final TopicConnection _conn;
    private       TopicSession    _topicSession;
                                                                           
    MyServerSession(TopicConnection conn)
    {
    |   _conn = conn;
    }
                                                                           
    // get or create the session for this server session
    // when creating a session a message listener is set
    public synchronized Session getSession() throws JMSException
    {
    |   if (_topicSession == null) {
    |   |   _topicSession = _conn.createTopicSession(false,
    |   |       Session.AUTO_ACKNOWLEDGE);
    |   |   MessageListener listener;
    |   |   listener = new MyMessageListener(_topicSession);
    |   |   _topicSession.setMessageListener(listener);
    |   }
    |                                                                      
    |   return _topicSession;
    }
                                                                           
    public void start() throws JMSException
    {
    |   Thread t = new Thread(_topicSession);
    |   t.start();
    }
                                                                           
    // a simple message listener that counts 100 messages
    static class MyMessageListener implements MessageListener
    {
    |   private final TopicSession _topicSession;
    |                                                                      
    |   MyMessageListener(TopicSession topicSession)
    |   {
    |   |   _topicSession = topicSession;
    |   }
    |                                                                      
    |   // must be thread-safe
    |   public void onMessage(Message msg)
    |   {
    |   |   if ( (_msgCount%100) == 0)
    |   |   {
    |   |   |   System.out.print(".");
    |   |   }
    |   |                                                                  
    |   |   if (++_msgCount == 1000)
    |   |   {
    |   |   |   System.out.println("done");
    |   |   |   System.exit(0);
    |   |   }
    |   }
    }
                                                                           
    static int _msgCount = 0;
}

A session extends the Runnable interface, which means that a thread can be created with the session as it argument. It the above case, the run method is called implicitly by starting the thread. A real world server session would typically use a thread pool.

The server session creates a MessageListener, which is set using the session's setMessageListener method. This method can only be called for sessions which do not have any regular consumers, i.e. it is reserved for session used by connection consumers.

Below is a summary of what happens for message dispatch using connection consumers:

  1. A connection consumer is associated with a server session pool and a number which indicates the maximum number of messages to dispatch at a time.

  2. When the JMS provider receives some messages for a connection consumer is invoked the getServerSession method on the pool to get a server session (thread) to execute the message dispatch.

  3. Then JMS invokes the getSession method on the server session to get a session. The application must set a message listener on this session to which messages are delivered.

  4. Internally, JMS loads the returned session up with the messages that are ready for dispatch. The number of messages is limited by the maximum set for the connection consumer.

  5. Finally, JMS calls start on the server server to indicate that it is ready to dispatch. The server session must in turn call the run method on the session.

  6. When the session's run method executes (possibly using a thread created by the server session) messages are delivered concurrently to the onMessage method of the session's message listener.

Since multiple threads can execute concurrently, the onMessage method on the session's message listener must be thread-safe. Another consequence of the connection consumer architecture is that message order may be impacted due to the concurrent nature of message dispatching.



Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.