Novell Home

Connecting Asynchronous Services and Service Clients

Novell Cool Solutions: Feature
By J. Jeffrey Hanson

Digg This - Slashdot This

Posted: 14 Jul 2004
 

J. Jeffrey Hanson
Chief Architect
eReinsure.com, Inc.
jeff@jeffhanson.com

This article is the second of a series of articles which outline techniques and technologies that can be used to build Web services and Web service clients that interact asynchronously using JMS in a service-oriented environment. (First Article: Building Asynchronous Web Services with JMS)

In this article, we will extend our framework to demonstrate JMS clients and loosely-coupled, JMS-aware, business-logic components which can realize the benefits of a service-oriented architecture.

A Review of Integrating SOAP-Over-HTTP with JMS

JMS/MOM systems do not always provide the necessary infrastructure for communicating across HTTP and since most security-aware networks are limited to HTTP traffic, we must build the components to integrate the two environments.

For Web services applications, message senders can publish messages to a local topic where a small, waiting, proxy component will receive the message from the topic. The proxy then wraps the message in a SOAP envelope and transports the message across an HTTP connection. The proxy then waits for the HTTP response. Once the response is received, the proxy publishes it to the local topic. The callback method supplied by the original sender is called and the response is received. This creates an environment in which the message sender can operate in an asynchronous manner.

On the receiver's side of the interaction, a proxy is waiting for HTTP requests. When an HTTP request is received, the proxy extracts the SOAP envelope, extracts the message from the envelope, and publishes the messages to a local topic. Business-logic components registered as subscribers to the topic receive the message, execute business logic, and publish the response, if any, to the topic. The proxy then receives the message from the topic, wraps the message in a SOAP envelope, and passes it back to the sender as an HTTP response.

The following diagram illustrates the relationships and interactions between a sender, a receiver, topics and proxies:


Figure 1: The use of proxies for HTTP messaging
JMS Publishers

In a publish-and-subscribe messaging model, a producer sends messages to consumers by delivering the message to a single intermediary topic. A message producer is also called a publisher. In our framework, the publisher acts as a generic message transmitter to be used to pass messages from a client or server object to the topic. The topic will pass the messages along to one of our proxy servlets which will then send the messages across the wire.

Our generic message publisher exposes a method called publish which is used by callers to send text messages, as the following listing illustrates:

public class MessagePublisher
{
   private static InitialContext initialContext = null;
   private static TopicConnection connection = null;

   // ======================================================
   // private methods
   // ======================================================

   private static Context getJNDIContext()
      throws NamingException
   {
      if (initialContext == null)
      {
         Hashtable properties = new Hashtable();
         properties.put(Context.INITIAL_CONTEXT_FACTORY,
                        "org.exolab.jms.jndi.InitialContextFactory");
         properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");

         // connect to the JNDI server and get a reference to
         // root initialContext
         initialContext = new InitialContext(properties);
      }
      return initialContext;
   }

   private static Topic getTopic(TopicSession session,
                                 String topicName)
      throws JMSException
   {
      Topic topic = null;
      if (topic == null)
      {
         topic = session.createTopic(topicName);
      }
      return topic;
   }

   private static TopicConnection createConnection()
      throws NamingException, JMSException
   {
      Context context = getJNDIContext();
      // lookup the connection factory from the initialContext
      TopicConnectionFactory factory =
          (TopicConnectionFactory)context.lookup("JmsTopicConnectionFactory");

      TopicConnection connection = factory.createTopicConnection();
      connection.start();
      return connection;
   }

   // ======================================================
   // public methods
   // ======================================================

   public static void publish(String topicName,
                              TopicSession session,
                              String message,
                              int priority)
      throws MessageException
   {
      try
      {
         Topic topic = getTopic(session, topicName);

         TopicPublisher publisher = session.createPublisher(topic);

         // set the message's persistence
         int deliveryMode = DeliveryMode.PERSISTENT;

         // set the message's lifetime (in milliseconds). The default
         // is unlimited
         long timeToLive =
            org.exolab.jms.message.MessageImpl.DEFAULT_TIME_TO_LIVE;

         TextMessage textMessage = session.createTextMessage(message);

         publisher.publish(topic,
                           textMessage,
                           deliveryMode,
                           priority,
                           timeToLive);
      }
      catch (Exception e)
      {
         throw new MessageException(e.getMessage());
      }
   }

   public static TopicSession createSession()
      throws JMSException, NamingException
   {
      TopicSession session =
         getConnection().createTopicSession(false,
                                            Session.CLIENT_ACKNOWLEDGE);
      return session;
   }

   public static TopicConnection getConnection()
      throws NamingException, JMSException
   {
      if (connection == null)
      {
         connection = createConnection();
      }
      return connection;
   }

   public static void closeConnection()
      throws JMSException
   {
      connection.close();
      connection = null;
   }
}

Using the MessagePublisher Component

The following example illustrates how the MessagePublisher is used to transmit a simple text message:

public TestPublisher()
{
   try
   {
      TopicConnection conn = MessagePublisher.getConnection();
      TopicSession session = MessagePublisher.createSession();
      MessagePublisher.publish("mytopic",
                               session,
                               "A test message",
                       org.exolab.jms.message.MessageImpl.DEFAULT_PRIORITY);
      MessagePublisher.closeConnection();
   }
   catch (JMSException e)
   {
      System.out.println(e.toString());
   }
   catch (NamingException e)
   {
      System.out.println(e.toString());
   }
   catch (MessageException e)
   {
      System.out.println(e.toString());
   }
}
JMS Subscribers

In a publish-and-subscribe messaging model, a message consumer is called a subscriber. In our framework, the subscriber acts as a generic message receiver to be used to receive messages from the topic, which receives messages from one of our proxy servlets.

Our generic message subscriber exposes a method called subscribe which requires a MessageListener parameter to be used by callers to register to listen for text messages, as the following listing illustrates:

public class MessageSubscriber
   implements MessageListener,
   TimerListener,
   ExceptionListener
{
   // ====================================================================
   // static fields
   // ====================================================================

   private static TopicSubscriber subscriber;
   private static TopicConnection connection;
   private static int ackMode = Session.AUTO_ACKNOWLEDGE;
   private static Timer timer;
   private static int timeout = 20; // seconds
   private static int received;
   private static Context initialContext = null;
   private static TopicConnectionFactory connFactory = null;
   private static boolean stopped = true;

   // ====================================================================
   // instance fields
   // ====================================================================

   private Topic topic = null;
   private MessageListener externalListener = null;

   // ====================================================================
   // private static methods
   // ====================================================================

   private static TopicConnection createConnection()
      throws MessageException
   {
      try
      {
         if (connFactory != null)
         {
            String factoryName = "JmsTopicConnectionFactory";
            connFactory =
               (TopicConnectionFactory)getJNDIContext().lookup(factoryName);
         }

         connection = connFactory.createTopicConnection();
         connection.start();
      }
      catch (NamingException e)
      {
         throw new MessageException(e.getMessage());
      }
      catch (JMSException e)
      {
         throw new MessageException(e.getMessage());
      }

      return connection;
   }

   private static Context getJNDIContext()
      throws NamingException
   {
      if (initialContext == null)
      {
         Hashtable properties = new Hashtable();
         properties.put(Context.INITIAL_CONTEXT_FACTORY,
                        "org.exolab.jms.jndi.InitialContextFactory");
         properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");

         // connect to the JNDI server and get a reference to
         // root initialContext
         initialContext = new InitialContext(properties);
      }
      return initialContext;
   }

   // ====================================================================
   // public static methods
   // ====================================================================

   public static TopicConnection getConnection()
      throws MessageException
   {
      if (connection == null)
      {
         connection = createConnection();
      }
      return connection;
   }

   public static void closeConnection()
   {
      try
      {
         connection.close();
         timer.stop();
      }
      catch (Exception error)
      {
         error.printStackTrace();
      }
   }

   public static TopicSession createSession()
      throws JMSException, MessageException
   {
      TopicSession session =
         getConnection().createTopicSession(false,
                                            Session.AUTO_ACKNOWLEDGE);
      return session;
   }

   public static void subscribe(TopicSession session,
                                String topicName,
                                MessageListener externalListener)
      throws JMSException
   {
      MessageSubscriber msgSubscriber =
         new MessageSubscriber(externalListener);

      if (timeout > 0)
      {
         timer = new Timer(msgSubscriber, timeout * 1000);
         timer.start();
      }

      msgSubscriber.setTopic(session.createTopic(topicName));
      subscriber = session.createSubscriber(msgSubscriber.getTopic());
      subscriber.setMessageListener(msgSubscriber);
      connection.setExceptionListener(msgSubscriber);
   }

   public void stopSubscription()
   {
      stopped = true;
      stopSubscriber();
   }

   // ====================================================================
   // MessageSubscriber methods
   // ====================================================================

   private MessageSubscriber(MessageListener externalListener)
   {
      this.externalListener = externalListener;
   }

   public Topic getTopic()
   {
      return topic;
   }

   private void setTopic(Topic topic)
   {
      this.topic = topic;
   }

   // ====================================================================
   // MessageListener methods
   // ====================================================================

   /**
    * All messages that for this consumer are received by this method
    *
    * @param message the message
    */
   public void onMessage(Message message)
   {
      if (timer != null)
      {
         timer.reset();
      }

      externalListener.onMessage(message);

      received++;

      acknowledgeMessage(message);

      if (stopped)
      {
         stopSubscriber();
      }
   }

   private void stopSubscriber()
   {
      try
      {
         subscriber.setMessageListener(null);
      }
      catch (JMSException exception)
      {
         System.err.println("Could not unset the listener: " + exception);
      }

      final String msg = getSummaryMessage();

      Thread shutdown = new Thread()
      {
         public void run()
         {
            MessageSubscriber.this.exit(msg);
         }
      };

      shutdown.start();
   }

   private void acknowledgeMessage(Message message)
   {
      try
      {
         if (ackMode == Session.CLIENT_ACKNOWLEDGE)
         {
            message.acknowledge();
         }
      }
      catch (JMSException exception)
      {
         System.err.println("Failed to acknowledge message: " + exception);
      }
   }

   private String getSummaryMessage()
   {
      String summary = "The consumer received " + received + " messages";

      try
      {
         summary += " from topic " + subscriber.getTopic();
      }
      catch (JMSException exception)
      {
         exception.printStackTrace();
      }
      return summary;
   }

   public void onTimeout()
   {
      exit("No message received in the last " + timeout +
           " seconds, exiting");
   }

   // implementation of ExceptionListener.onException

   public void onException(JMSException exception)
   {
      exit("Received onException notification");
   }

   private void exit(String message)
   {
      System.err.println(message);
      stopped = true;
   }
}

// =======================================================
// interface TimerListener
// =======================================================

interface TimerListener extends EventListener
{
   public void onTimeout();
}

// =======================================================
// class Timer
// =======================================================

class Timer
{

   private TimerListener listener;
   private long millisecs;
   private Thread thread;

   public Timer(TimerListener listener, long millisecs)
   {
      this.listener = listener;
      this.millisecs = millisecs;
   }

   public void start()
   {
      this.thread = new Thread(new Runner());
      this.thread.start();
   }

   public void stop()
   {
      if ((this.thread != null) && (Thread.currentThread() != this.thread))
      {
         while (this.thread.isAlive())
         {
            this.thread.interrupt();
         }
      }
   }

   public void reset()
   {
      stop();
      start();
   }

   public void reset(long milliseconds)
   {
      this.millisecs = milliseconds;
      reset();
   }

   private class Runner implements Runnable
   {

      public Runner()
      {
      }

      public void run()
      {
         try
         {
            Thread.currentThread().sleep(Timer.this.millisecs);
             Timer.this.listener.onTimeout();
         }
         catch (InterruptedException ignore)
         {
         }
      }
   }
}

Using the MessageSubscriber Component

The following example illustrates how the MessageSubscriber is used to listen for and accept text messages:

public TestSubscriber()
{
   try
   {
      TopicConnection conn = MessageSubscriber.getConnection();
      TopicSession session = MessageSubscriber.createSession();
      MessageSubscriber.subscribe(session, "mytopic", this);
      MessageSubscriber.closeConnection();
   }
   catch (MessageException e)
   {
      System.err.println("Error: " + e + "\nExiting.....");
   }
   catch (JMSException e)
   {
      System.err.println("Error: " + e + "\nExiting.....");
   }
}

Now that we have our components in place, let's look at the interactions and relationships involved in our asynchronous messaging framework:


Figure 2: Proxies, topics, callers, and receivers
Conclusion

JMS systems do not always provide the necessary infrastructure for communicating messages across HTTP, however with the power of servlets, it is not very complicated to build the components to integrate the two environments.

In this article, we extended our framework to demonstrate JMS clients and loosely-coupled, JMS-aware, business-logic components which realize the benefits of a service-oriented architecture.

In our next article, we will discuss the details of formatting messages to be transported across HTTP using SOAP envelope structures. We will also see how to design service-oriented, business-logic components which can be used in a J2EE environment or in a .NET environment.

For Additional Information

For more information about the technologies discussed in this Cool Solution for Developers, refer to the following resources:


Novell Cool Solutions (corporate web communities) are produced by WebWise Solutions. www.webwiseone.com

© 2014 Novell