HomeDigital EditionSys-Con RadioSearch Java Cd
Advanced Java AWT Book Reviews/Excerpts Client Server Corba Editorials Embedded Java Enterprise Java IDE's Industry Watch Integration Interviews Java Applet Java & Databases Java & Web Services Java Fundamentals Java Native Interface Java Servlets Java Beans J2ME Libraries .NET Object Orientation Observations/IMHO Product Reviews Scalability & Performance Security Server Side Source Code Straight Talking Swing Threads Using Java with others Wireless XML
 

In the last couple of years Sun has introduced a number of APIs targeted toward enterprise application development. One of the most exciting of these is the Java Message Service, or JMS. The JMS API is designed to do for messaging in the enterprise what JNDI does for naming and directory services and JDBC does for database access. JMS is an API that's designed to provide a common facility for enterprise messaging, leaving the underlying implementation of the messaging to whatever application server or other enterprise messaging software technology you wish to use. This is an exciting advance for those involved with the creation or use of message-oriented middleware (MOM) and especially for Java developers who need to utilize such facilities within their own products. With JMS you should be able to write one set of code for messaging against the JMS API and then use it across any messaging system provider that offers JMS support.

In this article I'll show you how to create a series of message producers and consumers that utilize most of the functions of the JMS API using persistent topics and queues, creating temporary destinations, filtering via message selectors, and so on. I'll implement these examples against one of the first vendor implementations of JMS, the one provided with the WebLogic application server from BEA (details on this server and configuring JMS are available online in the file "install.txt" at www.JavaDevelopersJournal.com). In this process I'll take you through the necessary steps to implement these applications, including the modifications to the application server to support the example code you'll create.

Before you continue with the example code here, I recommend that you first obtain and install the latest release of the BEA/WebLogic application server. You can download a free 30-day trial copy from BEA at www.weblogic.com.

The Code
Included online are four source files that demonstrate the use of the two types of messaging in JMS publish/subscribe and point-to-point. Sender.java and Receiver.java demonstrate the use of Queues within JMS for point-to-point communication, and Publisher.java and Subscriber.java demonstrate the use of Topics for publish/subscribe communication. A "readme.txt" file, also online, contains the instructions for running the various applications and any last-minute information about the code. Before you attempt to run these applications, make sure you've read the "install.txt" file and made the necessary modifications to the WebLogic server as indicated. As the sample code also contains comments and further JMS details that can't be completely covered in this article, the reader is encouraged to examine the source code and run the applications.

The Java Message Service
The Java Message Service was designed as a set of interfaces that would be implemented by MOM providers and various other vendors that wish to support messaging (application server providers, database server providers, etc.). These interfaces provide a common API for client applications that wish to implement code that uses messaging against potentially any given number of underlying messaging systems.

JMS was also designed to be flexible enough to provide this wide support for existing messaging systems while remaining portable. As a result, it doesn't support every possible messaging option from every messaging system. While you may be familiar with any given MOM product, it's not automatic that JMS will support every aspect of that product.

The primary concept in JMS is that of Destinations. A Destination is nothing more than an association for message producers and message consumers. Destinations break down into two types, Topics and Queues. Separate interfaces defined for each Destination type allow a provider to support one or both, depending on their messaging facility. A Topic is designed for publish/subscribe messaging, and a Queue is designed for client-to-client (point-to-point) messaging. Both Destination types support persistence. JMS also provides support for transactions. Transactions enable you to support commits or rollbacks on your messaging operations. Thus, if you have a failure within the boundaries of your transaction, you can roll back all of the messaging operations that took place within the boundaries of that transaction. Likewise, if everything completes successfully, you can perform a commit on your messaging operations to make them durable. As JMS Transactions are beyond the scope of this article, I leave you to investigate that aspect of JMS on your own.

Initializing JMS
Connection Factories
To initialize JMS you use a ConnectionFactory a tagging interface, one with no methods, that is extended via either TopicConnectionFactory or QueueConnectionFactory. A provider implements one or both of these factory interfaces to provide access to their specific messaging service implementation. WebLogic provides generic implementations of each of the Factory types. The Factories, as well as Topics and Queues, are considered "administered" objects and as such are created by the WebLogic server upon start-up (in our case we're using WebLogic's JMS implementation). These administered objects are then retrievable via JNDI. You use a JNDI naming context to retrieve the administered objects.

Don't worry if this seems a little confusing the code examples will make it easy to understand. You may also create your own Factory objects in WebLogic instead of using the default Factories; this is done within the weblogic.properties file. User-defined Factories are discussed later, in the section on Topics, while the details of defining your own Factories are contained in the "install.txt" file included online with the article source code.

The following code from within the Sender.java example shows how you initialize JMS and obtain the default ConnectionFactory for Queues from JNDI:

public final String JMS_FACTORY = "javax.jms.QueueConnectionFactory";

...

queueFx = (QueueConnectionFactory) initCtx.lookup(JMS_FACTORY);

The name of the default ConnectionFactory for Queues is passed in to the lookup() method of the initial JNDI naming context. A reference to an implementation of QueueConnectionFactory is then returned to us from the WebLogic application server (the getInitialContext() method in the Sender.java sample code that shows the details of initializing JNDI and obtaining the initial naming context from WebLogic you can obtain details on the API from Sun's Web site at: http://java.sun.com/products/jndi/1.2/javadoc/index.html).

Because ConnectionFactories are administered objects, the WebLogic application server takes care of creating default ConnectionFactories for Queues and Topics, as well as any user-defined Factories, and binding them to names in the WebLogic JNDI implementation for lookup by client code. Thus the name in JNDI for the default ConnectionFactory for Queues is "javax.jms.QueueConnectionFactory" and a lookup of this name through JNDI will return a reference to a default implementation of this interface, provided by WebLogic. There is a default ConnectionFactory for Topics as well, and it follows the same format; "javax.jms.TopicConnectionFactory". A lookup through JNDI will return you a reference to a default implementation of TopicConnectionFactory, also provided by WebLogic.

Connections
After successful creation of the appropriate ConnectionFactory, the next step is to create a Connection, an interface defined by JMS that represents a client connection to the underlying messaging provider. The ConnectionFactory is responsible for returning a Connection implementation that can communicate with the underlying messaging system. Clients will typically use a single connection. According to the JMS documentation, the purpose of a Connection is to "encapsulate an open connection with a JMS provider" and to represent "an open TCP/IP socket between a client and a provider service daemon." The documentation also states that the Connection is where client authentication should take place, and that clients can specify unique identifiers, among other things. Like ConnectionFactories, Connections come in two flavors, depending on the Destination type you're working with: QueueConnection and TopicConnection both extend the base Connection interface and you'll typically work with only one or the other, depending on the kind of messaging your client is using. The creation of a Connection is done via the ConnectionFactory. Connection contains two important methods, start and stop, that start and stop the sending and receiving of messages on the connection. See the full code block in Listing 1.

Sessions
Once you've obtained a Connection from the ConnectionFactory, you must create one or more sessions from the Connection. Session is the base interface, and again, there are two Destination-specific interfaces that extend it: QueueSession and TopicSession, which provide Queue- or Topic-specific Session methods. A Session is a type of factory that produces message consumers or producers for its Destination type (if it's a QueueSession, it creates Queue-oriented consumers and producers; if it's a TopicSession, it creates Topic-oriented producers and consumers).

A Session may be transacted or not, and you typically set this by passing a boolean parameter to the appropriate creation method on the Connection. You also typically pass a parameter to the Connection's Session creation method that sets the message acknowledgment mode of the Session being created this mode specifies whether the client or the consumer will acknowledge any messages it receives (this parameter will be ignored if transactioning is being used). Other methods provided by Session are various message creation methods that let you create JMS messages of specific types containing text, bytes, properties or even serialized Java objects (more on the Message interface below). See Figure 1 for a diagram of the actual JMS interface inheritance and creation relationships see also the code in Listing 1.

Figure 1
Figure 1:

Destinations
Before you can create any message producers or consumers, you must have a specific Destination with which to associate any producers or consumers. Remember, Destinations are administered objects, like ConnectionFactories. This means that a Destination is maintained by WebLogic and you must retrieve it through a JNDI lookup. This also means that in this case the Destination must be defined in advance. This is not to say that Destinations must always be created in advance. The JMS API provides the ability to create temporary Destinations that are available only for the life of the Session that created it as well as permanent Destinations through the JMS API at runtime. However, in the current WebLogic implementation of JMS, it should be noted that if you create Destinations via a Session, Destination will exist only as long as the WebLogic server is running. If the server crashes or is brought down, the Destination will be gone. The only way to create a truly persistent Destination is to create it within the weblogic.properties file (see the "install.txt" file for details on how to do this).

For the purposes of this article, our Destinations are created in advance, through the weblogic.properties file. Destinations defined in this file will be created by the WebLogic application server when it starts up, and will be made available to your client code through JNDI. Listing 1 shows the code from the Sender.java file, which demonstrates how to create a QueueConnection and a QueueSession, and retrieve our own Destination a Queue we defined in weblogic.properties named "jdj.article.queue.sender" the same as the package hierarchy for the Sender application.

Message Consumers and Message Producers
The final stage of the JMS initialization process is the creation of MessageConsumers and MessageProducers. Like ConnectionFactory, Connection and Session, these are base interfaces that have Destination-specific interfaces that extend them for use with either Topics or Queues (I use the terms Consumer and Producer interchangeably with MessageProducer and MessageConsumer). MessageConsumers are used to receive messages sent to a Destination, and MessageProducers are used to send messages to a Destination. Both are created by a Session instance. MessageProducer is extended by Destination-specific interfaces called QueueSender and TopicPublisher. MessageConsumer is extended by the interfaces QueueReceiver and TopicSubscriber.

Once you've created your MessageConsumer and/or MessageProducer, you have everything in place to begin receiving and/or sending messages. Since the creation of consumers and producers is specific to either Queues or Topics, I'll discuss the process for both types in the relevant sections below, specific to the Destination type.

Messages
Before I take the final step and begin to delve into the specifics of sending and receiving from Topics and Queues, we need to discuss one other interface, Message, which represents the JMS Message itself. This is the object that contains the information to be sent to a Destination and received by consumers that are listening on that Destination. Messages are created by a Session instance, and are composed of three sections: a header, properties and a body. The header is used to identify and route the message and the client developer typically doesn't see or deal with header information. Properties support application-specific values passed with a message. These property fields are predefined; a full description of them is available in the JMS Documentation. I use a few of these properties within the sample code. The body section of the message is the actual "payload" of the message and can be of five types represented by five specific interfaces: StreamMessage, MapMessage, ObjectMessage, TextMessage and BytesMessage (see Figure 1).

Persistence
JMS has an additional important aspect worth discussing: it supports persistent delivery of messages. Quite simply, this means that if a Consumer isn't running or is unavailable when a message is sent to a Destination, the message will be held until the next time that Consumer connects to the Destination. If you have five applications listening on a single Topic, for instance, and one of them should crash, the next time that application starts and connects to that specific Topic, any messages sent to that Topic while the application was down will be delivered to the application when it begins listening again. If this seems difficult to follow, it'll make more sense when you run the sample code.

Queues
A Queue is designed for "point-to-point" or "one-to-one" message delivery. This means that a Queue should have only one client sending messages to it, and only one application consuming messages from that Queue.

In reality, you can have multiple clients sending to a Queue, but you should never have more than one application consuming messages from that Queue. If you have more than one Consumer on a Queue, there's no guarantee about which Consumer will receive the message, but it'll only be one. If you need multiple Consumers on a Destination and would like all of them to receive the same message, you should use a Topic (see the following page).

The Sender.java and Receiver.java files contain the code that shows how to use a Queue. The code demonstrates the initialization process for JMS and retrieval of our predefined Queue, and how to create both a MessageProducer and a MessageConsumer for sending and receiving messages on the Queue.

There are two specific interfaces for consuming and producing messages on a Queue. QueueSender, which is returned from a QueueSession via a call to one of the createQueueSender() methods of the QueueSession, is used to send messages to the Queue. QueueReceiver, which is returned from a QueueSession via a call to one of the createQueueReceiver() methods of the QueueSession, is used to receive messages from the Queue.

Listing 2 is the code from the sendMsg method of Sender.java that shows how you create a MessageProducer from your Session, then construct a TextMessage and send it. In this code we're creating a QueueSender, then creating a TextMessage with text retrieved from a TextField in the UI of the application. We then use the QueueSender method "send" to deliver the message.

There are two ways to handle the receipt of incoming messages on a MessageConsumer synchronous and asynchronous. The first step is to create the MessageConsumer itself; the next is to decide whether you want synchronous or asynchronous message delivery on the Consumer.

After you create a MessageConsumer from your Session, if you want to synchronously receive the next message produced for the Destination, you can call the receive method on your MessageConsumer this method takes no parameters and will block until the next Message is received, which it will return to the caller. For asynchronous message receipt, you register an implementation of the MessageListener interface with your MessageConsumer. This goes for both Topics and Queues. MessageListener has one method that you must implement, onMessage, which receives one parameter, a Message. This method will be called on your implementation of onMessage for every message delivered to the Destination. This is demonstrated in the implementation of onMessage in both Sender.java and Receiver.java. In Receiver.java we have the following code in the initializeJMS method, which creates the MessageConsumer (a QueueReceiver) and sets the implementation of MessageListener:

// Create a Receiver for the Queue...
receiver = session.createReceiver(queue);

// Set the listener (this class)
receiver.setMessageListener(this);

Once the Connection's "start" method is called, messages will begin to come in to the Consumer as they arrive at the Destination.

ReplyTo Using Temporary Queues
You'll also notice that both Sender.java and Receiver.java implement MessageConsumers and MessageProducers. Each implements MessageListener. This demonstrates one of the interesting features of JMS, that is, the use of temporary Destinations. An application that wishes to receive responses to the messages it produces can create a temporary Queue or Topic and pass this Destination in the Message it sends.

One of the properties of Message is the JMSReplyTo property. This property is intended to be used for this purpose. You can create a temporary Queue or Topic and place it into the JMSReplyTo property of the Message. Consumers who receive this message have a private, temporary Destination that they can use to respond to the sender. This is demonstrated by two methods, one in each of the files. In Sender.java we have the following sections of code, which create the temporary Queue and place it in the JMSReplyTo property of the TextMessage:

// Create a temporary queue for replies...
tempQueue = (Queue) session.createTemporaryQueue();

The line of code above is found in the initializeJMS method of Sender.java. This code creates a temporary Queue on application start-up that will exist for the lifetime of the application. The next line of code, found in the sendMsg method of Sender.java, shows how to set the JMSReplyTo property to contain the temporary Queue.

// Set ReplyTo to temporary queue...
msg.setJMSReplyTo(tempQueue);

When this message is received by the QueueReceiver of Receiver.java, the temporary Queue is extracted from the JMSReplyTo field, and a QueueSender is constructed by the application to send a response message back to Sender.java. This demonstrates how to use the properties of a JMS Message and shows the usefulness of a private, temporary Destination. It also demonstrates how clients can be both Producers and Consumers of messages. The following code from Receiver.java demonstrates how to handle the extraction of the temporary Queue from the JMS Message; this code is found in the onMessage method:

// Get the temporary queue from the JMSReplyTo
// property of the message...
tempQueue = (Queue) msg.getJMSReplyTo();

The following block from the sendReplyToMsg method shows how to create a QueueSender and send the reply:

// create a Sender for the temporary queue
if (sender == null)
sender = session.createSender(tempQueue);

TextMessage msg = session.createTextMessage();
msg.setText(REPLYTO_TEXT);

...

// Send the message to the temporary queue...
sender.send(msg);

Topics
A Topic is designed to implement a "publish/subscribe" message delivery mechanism. Whereas Queues are designed to have one Producer and one Consumer, a Topic is designed to allow multiple Producers to send messages to it, and to have multiple Consumers receiving delivery of the same Message from the Topic. A Topic is a "many-to-many" model.

The creation process for MessageProducers and MessageConsumers of a Topic is similar to that of a Queue. You use your Session to create the TopicPublishers and TopicSubscribers. These mirror the QueueSender and QueueReceiver in that each provides Topic-specific capabilities and implements the base MessageProducer and MessageConsumer interfaces.

The creation process for a TopicPublisher is nearly identical to that of the QueueSender. Following is the code from the sendMsg method of Publisher.java that shows the creation of a TopicPublisher and how to publish the message to the Topic:

// create a Publisher if there isn't one...
if (publisher == null)
publisher = session.createPublisher(topic);

TextMessage msg = session.createTextMessage();
msg.setText(text);

...

// Publish it to the topic...
publisher.publish(msg);

Durable Subscribers
One of the interesting aspects of TopicSubscribers is that it is a durable subscriber a unique name provided by a client that identifies it within a Session. A Session, in turn, has a Client ID associated with it, defined either at runtime through a method call on the Connection or as part of the administered ConnectionFactory (in our case it's defined in this way within the weblogic.properties file). Thus a Connection provides a Session with a Client ID, and durable subscriber names are unique identifiers within Sessions, associated with a specific Client ID. The purpose of durable subscribers is to create unique, persistent Consumers for a given Topic.

An application that creates a TopicSubscriber via a call to the createDurableSubscriber method of a TopicSession must pass in a durable subscriber name (a string) as one of its parameters; for instance, you could set the durable subscriber name to the name of the user currently logged into the application, and so on. This name will uniquely identify a particular subscriber to a Topic (in conjunction with a unique Client ID for the Connection/Session). Once this durable subscriber has registered with the Topic, the Topic will persistently ensure delivery of messages to that subscriber. This means that if a particular durable subscriber is unavailable, messages will be held until the next time that durable subscriber (with the same unique, durable subscriber name and Client ID) registers as a Consumer. The Subscriber.java file demonstrates the creation of durable subscribers and allows you to use a default subscriber name or to set this identifier from the command line (see the "readme.txt" file for further details on running the application and demonstrating the message persistence with durable subscribers). The following code fragment from the initializeJMS method of Subscriber.java shows how to create a durable subscriber from your TopicSession:

subscriber = session.createDurableSubscriber(

topic,
subscriberID,
SELECTOR,
false);

// Set the listener (this class)
subscriber.setMessageListener(this);

TopicSubscribers that aren't created as durable subscribers won't receive messages persistently, and will receive them only while they're running.

For further details on durable subscribers, and Topics in general, please see Sun's JMS documentation. One other item about the preceding code: note that the third parameter passed to the method is SELECTOR. This is where message selectors are associated with a Consumer (see below for details on message selectors).

Filtering Using Message Selectors
The final area of JMS I'll touch on is the message selector a filter applied to MessageConsumers that can act on the properties and header sections of incoming messages (but not on the body of the message) and determine whether the message is actually to be consumed. Message selectors are strings, and are based on a syntax that is a subset of SQL-92, according to the JMS documentation. You can apply message selectors as part of the creation of a MessageConsumer. The way these selectors are applied to incoming messages differs slightly based on whether the MessageConsumer is a QueueReceiver or a TopicSubscriber. I encourage you to examine the syntax of message collectors, and how they can be applied, by examining the Subscriber.java file and running the Publisher and Subscriber applications. The following section of code defines our message selector in Subscriber.java, and the application itself lets you change the selector from a text field:

public final String SELECTOR = "JMSType = 'TOPIC_PUBLISHER'";

This selector examines the JMSType property of the incoming message from our Topic and determines whether the value is equal to TOPIC_PUBLISHER. If it is, the message is passed to our MessageListener implementation; if it isn't, the message is ignored. See the "readme.txt" file for details on running the applications and demonstrating this behavior. I also encourage you to check out Sun's JMS documentation.

Summary
JMS is a fascinating and powerful technology for creating portable messaging code within your applications. It allows for both "point-to-point" and "publish/subscribe" message delivery, and also supports transactions and persistence. The WebLogic application server from BEA offers a robust and complete implementation of JMS that works in conjunction with the other technology provided by the application server, such as EJBs and servlets. This creates interesting potential for persistent, asynchronous messaging with transactioning support between various enterprise objects and services. It is my hope that this article will encourage you to explore the online sample code in conjunction with the article text, and to examine the possibilities that WebLogic, and JMS in particular, offers.

Author Bio
Scott Grant, a director of development at Rapid Logic, Inc., and a Sun-certified Java developer, has 14 years of diversified engineering experience. He was the chief architect and lead developer of Rapid Logic's RapidControl for Applets product and currently leads the development of their enterprise technology products.
He can be reached at: [email protected].

	

Listing 1:

public final String JMS_FACTORY = "javax.jms.QueueConnectionFactory";
public final String QUEUE = "jdj.article.queue.sender";
...
private void initializeJMS(boolean transacted)
{
try
   {
    if (initCtx != null)
       {
        // Look up the default QueueCon-
     // nectionFactory...
     queueFx = (QueueConnectionFactory)
           initCtx.lookup(JMS_FACTORY);


     // Create a QueueConnection from
     // the Connection Factory...
     conn = queueFx.createQueueConnec-
     tion();


     // Create a QueueSession from the
     // QueueConnection. The first
     // parameter is a boolean that
     // specifies transacted or not
     // transacted. The second param
     // specifies that our Session will
     // automatically Acknowledge a
     // client's receipt of a message.
     session = conn.createQueueSes-
     sion(transacted,
             Session.AUTO_ACKNOWLEDGE);


     // Look up the Destination we want
     // to use for our Consumers and
     // Producers for this session. In
     // this case a Queue called
     // "jdj.article.queue.sender".
     queue = (Queue) initCtx.lookup(QUEUE);


             ...
      }
   }
   ...
}


Listing 2:

// create a Sender
if (sender == null)
    sender = session.createSender(queue);



TextMessage msg = session.createTextMessage();
msg.setText(text);


// Override default, to insure it's
// using persistent delivery...
msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT);


// Set ReplyTo to temporary queue...
msg.setJMSReplyTo(tempQueue);


// Send the message...
sender.send(msg);




Download Assoicated Source Files (Zip format - 21.6 KB)
 

All Rights Reserved
Copyright ©  2004 SYS-CON Media, Inc.
  E-mail: [email protected]

Java and Java-based marks are trademarks or registered trademarks of Sun Microsystems, Inc. in the United States and other countries. SYS-CON Publications, Inc. is independent of Sun Microsystems, Inc.