Requestors

A requestor is a special object that converts a message exchange into an RPC style call. Under the covers, a requestor sends a message to a queue and waits for a reply from a temporary queue. The ReplyTo field in the message is set to the temporary queue prior to sending the message.

JMS supports two kinds of requestors:

These requestors support a request method, which sends or publishes a message and performs a blocking receive on the temporary topic set in the ReplyTo header field of the message. There is also a close method which closes the requestor.

A requestor is most commonly used with the P2P model since only one receiver will get each message. This makes it easier to understand who is replying. If more than one subscriber replies to a requestor message, all replies except the first one are lost.

The default requestors provided with the JMS API are quite simple and application developers may want to extend them. As an example, it would be quite natural to write a queue requestor which performs a receive with a timeout to avoid indefinite blocking if no reply occurs.

The sample application is this section consists of a calculator server, which can add any number of integers. The server receives a MapMessage and adds together all the properties in the message. It then returns a formatted TextMessage with the result.

There are two sample programs for this applications:

Client

Server

Client

The client program creates MapMessage's with random numbers, which are sent to the server using a queue requestor. When a reply is returned by the request method, the result of the calculation is printed out:

package queueRequestor;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.MapMessage;
import javax.jms.TextMessage;
import javax.jms.QueueSession;
import javax.jms.QueueRequestor;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
                                                                           
public class Client
{
    public static void main(String[] args) throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue object
    |   Queue queue = (Queue) ctx.lookup("queue/queue0");
    |                                                                      
    |   // lookup the queue connection factory
    |   QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.
    |       lookup("queue/connectionFactory");
    |                                                                      
    |   // create a queue connection
    |   QueueConnection queueConn = connFactory.createQueueConnection();
    |                                                                      
    |   queueConn.start();
    |                                                                      
    |   // create a queue session
    |   QueueSession queueSession = queueConn.createQueueSession(false,
    |       Session.DUPS_OK_ACKNOWLEDGE);
    |                                                                      
    |   // create a queue sender
    |   QueueRequestor requestor = new QueueRequestor(queueSession, queue);
    |                                                                      
    |   // create a simple message
    |   MapMessage message = queueSession.createMapMessage();
    |                                                                      
    |   // send the messages
    |   int numMsgs = 1000;
    |   for (int i = 0; i < numMsgs; i++) {
    |   |   int n = (int) (1 + 10 * Math.random());
    |   |   for (int j = 0; j < n; j++)
    |   |       message.setInt("item"+j, (int) (50 * Math.random()));
    |   |   TextMessage result = (TextMessage) requestor.request(message);
    |   |   message.clearBody();
    |   |   System.out.println(result.getText());
    |   }
    |                                                                      
    |   // close the queue connection
    |   queueConn.close();
    }
}

It is important to note that the client is not aware of the server even if an RPC-like mechanism is used. As always in queue based messaging, there can be any number of listeners on the queue. The client could be sending a request to a single server or a large number of servers load-balacing the incoming requests for calculation.

Server

The server de-composes the MapMessage to perform the calculation. It uses an asynchronous message listener and the result is calculated and returned to the ReplyTo destination in the onMessage method. The integers are retrived by enumerating over the names of the map message.

package queueRequestor;
                                                                           
import java.util.Enumeration;
                                                                           
import javax.naming.InitialContext;
                                                                           
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.MapMessage;
import javax.jms.TextMessage;
import javax.jms.MessageListener;
import javax.jms.JMSException;
import javax.jms.ExceptionListener;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
                                                                           
public class Server implements MessageListener, ExceptionListener
{
    private final QueueSession _session;
                                                                           
    public static void main(String[] args) throws Exception
    {
    |   Server server = new Server();
    |   server.waitForever();
    }
                                                                           
    public Server() throws Exception
    {
    |   // get the initial context
    |   InitialContext ctx = new InitialContext();
    |                                                                      
    |   // lookup the queue object
    |   Queue queue = (Queue) ctx.lookup("queue/queue0");
    |                                                                      
    |   // lookup the queue connection factory
    |   QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.
    |       lookup("queue/connectionFactory");
    |                                                                      
    |   // create a queue connection
    |   QueueConnection queueConn = connFactory.createQueueConnection();
    |                                                                      
    |   // create a queue session
    |   _session = queueConn.createQueueSession(false,
    |       Session.AUTO_ACKNOWLEDGE);
    |                                                                      
    |   // create a queue receiver
    |   QueueReceiver queueReceiver = _session.createReceiver(queue);
    |                                                                      
    |   // set an asynchronous message listener
    |   queueReceiver.setMessageListener(this);
    |                                                                      
    |   // set an asynchronous exception listener on the connection
    |   queueConn.setExceptionListener(this);
    |                                                                      
    |   // start the connection
    |   queueConn.start();
    |                                                                      
    |   System.out.println("server listening on " + queue);
    }
                                                                           
    /**
       This method is called asynchronously by JMS when a message arrives
       at the queue. Client applications must not throw any exceptions in
       the onMessage method.
       @param message A JMS message.
     */
    public void onMessage(Message message)
    {
    |   try
    |   {
    |   |   int sum = 0;
    |   |   String result = "";
    |   |   MapMessage calc = (MapMessage) message;
    |   |   Enumeration e = calc.getMapNames();
    |   |   while (e.hasMoreElements())
    |   |   {
    |   |   |   String item = (String) e.nextElement();
    |   |   |   int i = calc.getInt(item);
    |   |   |   sum += i;
    |   |   |   result += Integer.toString(i);
    |   |   |   if (e.hasMoreElements())
    |   |   |       result += " + ";
    |   |   }
    |   |   result += " = " + sum;
    |   |   TextMessage tm = _session.createTextMessage(result);
    |   |   Queue reply = (Queue) message.getJMSReplyTo();
    |   |   QueueSender sender = _session.createSender(reply);
    |   |   sender.send(tm);
    |   }
    |   catch (JMSException ex)
    |   {
    |   |   onException(ex);
    |   }
    }
                                                                           
    /**
       This method is called asynchronously by JMS when some error occurs.
       When using an asynchronous message listener it is recommended to use
       an exception listener also since JMS have no way to report errors
       otherwise.
       @param exception A JMS exception.
     */
    public void onException(JMSException exception)
    {
    |   System.err.println("something bad happended: " + exception);
    }
                                                                           
    // Handy utility to signal that I'm done
    synchronized void waitForever()
    {
    |   while (true) {
    |   |   try {
    |   |   |   wait();
    |   |   } catch (InterruptedException ex) { }
    |   }
    }
}

As mentioned above, you can start any number of servers to listen for calculation requests. With a queue between client and server applications you can scale an application very easily without affecting existing client application (no code changes or re-compilation necessary).



Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.