HomeDigital EditionSys-Con RadioSearch Java Cd
Advanced Java AWT Book Reviews/Excerpts Client Server Corba Editorials Embedded Java Enterprise Java IDE's Industry Watch Integration Interviews Java Applet Java & Databases Java & Web Services Java Fundamentals Java Native Interface Java Servlets Java Beans J2ME Libraries .NET Object Orientation Observations/IMHO Product Reviews Scalability & Performance Security Server Side Source Code Straight Talking Swing Threads Using Java with others Wireless XML
 

Motivation
One nice feature of the Java language is having many useful programming facilities (or APIs) built into its core language packages. To write a client/server application, one can use the socket classes in the java.net package by modifying from any existing client/server codes. For instance, David Flanagan's "Java in a Nutshell" book (pp.145 to 148) [1] provides sample codes for implementing client and server with multi-threading features. However, it depends on your application. You may need to replace the readLine() and println() methods of the DataInputStream and PrintStream classes respectively with appropriate I/O methods (e.g. readInt(), writeFloat() etc.) of the respective subclasses of the InputStream and OutputStream I/O classes in the java.io package.

In this way, one can write client/server applications by manually modifying and extending Java client/server exemplary codes according to the specification of a new problem. After experiencing this sort of "cut-and-paste" model, one realizes that this is not the best way of developing applications. This approach has at least these pitfalls:

  1. the resultant codes tend to be monolithic and heavy weight,
  2. the modification process is tedious and error-prone,
  3. specific I/O classes and their associated methods have to be replaced according to new application needs, and so on.

To have a more modular and reusable approach, we can build communication objects to encapsulate and generalize the necessary send and receive behaviors. In this article, we would like to introduce a simple yet effective Java communication component called Channel. With this communication object class, you can create new Channels dynamically in any distributed application for communicating general objects among your application components. This is like adding a plug-and-play ear and mouth-like sensory module to a robot (or softbot) to allow it to communicate with other robots (or softbots).

The construction of Channel objects is in the spirit of a distributed objects component model to build generic reusable and useful software components that can be assembled dynamically to form new applications seamlessly. The Channel component is compact and lightweight so that it can be created and used easily without any additional programming. While the Java Beans [2] allow components to communicate dynamically by providing the context for arrangement and interaction in container, the Channel component simply provides an alternative encapsulation for dynamic interaction among application components.

What is a Channel?
When an application component needs to communicate with other components, it creates a Channel component (Figure 1). The Channel component then starts a thread to listen for incoming messages on a specified port using socket objects provided by the java.net package in its constructor. Any incoming message intended for the application component is immediately enqueued onto a queue of generic message objects (MessageQueue object) as specified by the application component during Channel creation. The application component can receive and process these messages in a separate thread subject to the producer/consumer synchronization mechanism of the message queue.

Figure 1
Figure 1

If an application component wishes to send a message to another application component with an active Channel component, it invokes its own Channel component's send() method to open a socket connection with the specified host and port and transmits the message across the connection. The send() method ensures the delivery of the message by waiting for a simple acknowledgment message from the receiving Channel. The application component can send messages to many different Channels of different application components. Indeed, an application component can have more than one communication channel by creating multiple Channel objects with different port numbers. The acknowledgment required by the send() method also eliminates premature socket reset if a second message is sent to the same channel too soon.

Using Object Serialization [3], the Channel component deals with generic message format instead of hardcoding the anticipated primitive data types for messages into the message receiving codes where in some applications (we will point out later) the number of possible message types is tremendous to cope with. Unconstrained message format also allows message interpretation (and invocation of relevant message handler) to be delayed at a higher and more appropriate application level.

Figure 2 shows the schematic diagram of a Channel component, its interactions with the application components, the internal view of a Channel, its interaction with socket and the use of object serialization.

Figure 2
Figure 2

Building a Communication Channel
The Channel class in Listing 1 defines a communication component which can be created and used by an application component for sending messages to and receiving messages from other application components. Now we will look at the Java implementation in detail. Note that only essential functional codes are shown in this article. Details such as exception handling and method variants are kept to a minimum to ease reading and understanding.

A Channel object has two private protected variables: a ServerSocket, listen_socket: and a MessageQueue, mailbox. Upon construction, a new Channel object creates a ServerSocket (listen_socket) for a specified port and associates its MessageQueue (mailbox) to the reference of the mailbox provided by the application component which creates it. As a subclass of Thread, the new Channel object then starts itself to listen on the specified port with the ServerSocket, ready to accept connection and receive messages from other application components.

Receiving Messages
In run(), a persistent while loop listens for a connection,

Socket client = listen_socket.accept();

and creates a client Socket to accept connection when requested. Anticipating generic message object rather than specific data types, an ObjectInputStream (sin) from the Object Serialization package [3] is created from the client socket,

ObjectInputStream sin = new ObjectInputStream(client.getInputStream() );

and a DataOutputStream (sout) is also created for sending simple acknowledgment bytes on receiving messages.

Once engaged in an accepted connection, the Channel thread enters an inner while-loop to receive message as serialized object from the "client",

Message msg = (Message)sin.readObject();

and cast it to the appropriate Message type. A single acknowledgment byte is replied as an acknowledgment via the DataOutputStream. The received message, message, is then immediately appended to the message queue provided by the application component,

mailbox.put( message );

and the Channel object is ready to receive the next incoming message.

When the client has finished sending the message and closed its connection, an EOFException exception will be caught by the server Channel object waiting to receive message. The server will then clean up by closing the relevant socket and streams. Figure 3 shows the handshaking between the sender and receiver.

Figure 3
Figure 3

With a perpetual thread to listen for messages by a Channel object on a given port, an application component can process the messages (perhaps by a different thread) at its own convenience. This strategy decouples and thus modularizes the message receiving and message processing processes. Though it is possible to have a multi-threaded Channel server that spawns a new thread for each client connection request such as the one described in [1] (p.146), it is usually not necessary as the message receiving process is short and simple.

Sending Messages
To send a message to another application component, an application component simply invokes the send() method of its Channel object, specifying the message to be sent, and the host and port to which the message will be sent,

public void send( Object message, String hostname, int port ).

In this send() method, a client Socket (sender) is created,

Socket sender = new Socket( hostname, port );

As in the case of message receiving but conversely, an ObjectOutputStream, out, is created to serialize message object across the socket's output stream,

ObjectOutputStream out = new ObjectOutputStream(sender.getOutputStream() );
out.writeObject( message );
out.flush();

Before closing the connection and clean up, a DataInputStream (in) is created to receive a simple acknowledgment byte from the message receiver.

An application component can therefore use its communication Channel to send messages to different hosts and ports by invoking this send() method conveniently as long as the hosts and ports are known.

A variant assuming communication within local host would be to omit the hostname,

public void send( Object message, int port )

which will find the local host using InetAddress's (in java.net package) class and instance methods in that order,

String hostname = InetAddress.getLocalHost().getHostName();

and invoke the more general send() method as described above.

Another variant is to specify an Internet IP address instead of hostname. Yet another variant is to send an array of messages by a single invocation of the send() method by replacing the first argument with "Object[] messages" and writing the message objects in a loop. We shall omit their details here.

Message Queue
While the Message class referred to in the Channel component is deliberately left unspecified to emphasize its generality and the need of object serialization, we briefly touch on the implementation of the MessageQueue class used.

The MessageQueue class (Listing 2) extends a circular Queue data structure with synchronization mechanism for a producer/consumer type of concurrent access using Semaphore objects. As queue and semaphore are fundamental data structure and synchronization mechanism respectively in computer science, and their Java implementations are easily available (for example, see http://www.digitalfocus.com/digitalfocus/faq/howdoi.html#T_1), we only show their abstract classes in Listings 3 and 4 instead of our full implementation to save space.

The constructor of the MessageQueue class basically invokes its super(Queue) class's constructor to allocate and initialize a queue of objects followed by creating two Semaphores.

The put() and get() methods follow the semantics of the append() and remove() methods of a first-in-first-out queue respectively. They are implemented by simply calling the corresponding super class's instance methods surrounded by appropriate synchronization primitives.

An Example to Use Channel
In Listing 5, we show a simple example of creating and using the Channel class together with a sample definition of a Message.

This Test (class) program sends a "Hello" message with the local hostname and current time and date to any number of hosts as specified on the argument list and displays messages that it continues to receive. If no argument is specified, no message will be sent out.

The Test program first creates a MessageQueue, m, and a Channel, c. Then, it constructs a "Hello" message: "Hello" string in the intention field, local hostname as sender, and current time and date as content. A loop is used to send the constructed message using Channel's send() method,

for (int i = 0; i c.send( msg, args[i], PORT );

where PORT is a default port for simplicity. This program will not work for multiple Test programs running on same host. To do so, the program could be modified to accept both remote hostnames and ports on the argument list and send the message to the specified hosts and ports accordingly.

Finally, the Test program loops forever to listen for incoming messages. When its Channel thread has deposited message in its message queue, it removes them from the queue and displays them. Otherwise, it will be blocked when its message queue is empty,

msg = (Message)c.receive();

To test this program, you can run a copy of it without argument on a host to act as "server" and run other copies on other hosts to send messages to the server. An exemplary output is shown in Listing 6 where "grus", "marten" and "corona" are hostnames.

A simple modification to the Test program actually gives us a template for writing a server program (or service agent) that provides appropriate services based on messages received from clients (Listing 7). This example and the template illustrate how Channel object can provide a high-level programming interface for application components to perform generic message communication.

Limitations
The solution proposed in this article is not as elaborate as a component model in Java Beans [2], where containers are designed to support the relationships and interaction among components. It is closer to the agentification approaches [4] in software agent research where non-agent software components are enhanced with agent communication capabilities with Channel objects, among other agent attributes. Software agent research will provide alternative architectures and methodologies or add values to component models.

The Channel object described here presents a first step towards engineering communication software components for distributed computing applications. More sophisticated re-synchronization and recovery mechanisms should be in place to achieve reliability and robustness in mission critical applications.

Conclusion and Future Outlook
In this article, we have introduced a Java communication component called Channel that encapsulates necessary Java socket programming for generic message passing built upon Java object serialization mechanism with an example of usage. This Channel component, when assembled with other non-communicative software components in an appropriate manner, enhances application components to a new level of dynamic interaction. This will be very useful for building a wide range of Internet and Internet component-based applications.

Indeed, the communication Channel could form a basic component in the implementation and application of the emerging software agent technologies [4] where agent communication is a key attribute in any multi-agent system. The Channel object thus allows one to agentify a non-agent software component in the communication aspect. The notion of generic message format and passing using object serialization is important for realizing any agent communication language whose content language varies with messages and application domains (e.g. [5]). In general, without object serialization, an implementation will look clumsy in order to cope with potentially enormous combinations of message types in an open computing environment. Last but not least, the proposed Channel class has been used in the prototyping of software agent research and applications [6, 7].

References

  1. Flanagan, D. (1996). Java in a Nutshell. O'Reilly & Associates, Inc.
  2. JavaSoft (1996). Java(tm) Beans: A Component Architecture for Java. White paper, July 1996. http://splash.javasoft.com/beans.
  3. JavaSoft (1996). Java(tm) Object Serialization Specification. Revision 0.9, Draft May 8, 1996. http://chatsubo.javasoft.com/current/serial/index.html.
  4. Genesereth, M.R. and Ketchpel, S.P. (1994). Software agents. Communications of the ACM, Vol. 37, No. 7, July 1994, pp. 48-53.
  5. Finin, T. et al. (1994). KQML as an Agent Communication Language. In Proceedings of CIKM'94, ACM Press, Nov. 1994.
  6. Lai, S.L. and Lim, J,H. (1996). AGATE: an Application GATEway for internet and intranet. In preparation.
  7. Lim, J.H., Wu, J.K. and Lai, S.L. (1996). A Multiagent Meeting Organizer that satisfies Soft Constraints. To appear in the Proceedings of the International Conference on Multi-Agent Systems, Kyoto, Japan, Dec. 10-13, 1996.

About the Author
Siet-Leng Lai has developed a concurrent object-based language on parallel machine as well as clustered workstation environment and a multi-tasking operating system on DOS. She has more than four years of IT industrial experiences. She is now conducting research into internet /intranet middlewares using Java.

Joo-Hwee Lim proposed novel neural networks techniques for timetabling and developed execution monitoring engine for a complex planner. He is now a senior software engineer leading an Agent Engineering Team researching agent-based software engineering and multi-agent systems using Java. The project is funded by Japan's Real World Computing Partnership.

	

Listing 1: The Channel class

import java.lang.*;
import java.io.*;
import java.net.*;

/**
 * Channel: a communication component built upon
 *          sockets and object serialization.
 *
 * @author	Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class Channel extends Thread {

	public static final int ACK = 1;
	public static final int QUEUE_SIZE = 30;
	private protected ServerSocket listen_socket;
	private protected MessageQueue mailbox;

	/**
	 * Create a ServerSocket to listen for connection,
	 *  set the mailbox, and start the thread.
	 */
	public Channel( int port, MessageQueue msg_q ) {

		try {
			listen_socket = new ServerSocket( port );
		} catch (IOException e) {
			e.printStackTrace();
		}
		mailbox = msg_q;
		this.start();
	}

	public Channel( int port ) {

		try {
			listen_socket = new ServerSocket( port );
		} catch (IOException e) {
			e.printStackTrace();
		}
		mailbox = new MessageQueue( QUEUE_SIZE );
		this.start();
	}

	/**
	 * Send message to specified host and port.
	 */
	public void send( Object message, String hostname, int port ) {

		try {
			Socket sender = new Socket( hostname, port );

			ObjectOutputStream out = 
				new ObjectOutputStream( sender.
						getOutputStream() );
			out.writeObject( message );
			out.flush();

			DataInputStream in = new DataInputStream( 
				sender.getInputStream() );
			byte ack = in.readByte();

			in.close();
			out.close();
			sender.close();

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * Send message within local host.
	 */
	public void send( Object message, int port ) {

		try {
			String hostname = InetAddress.getLocalHost().getHostName();
			send( message, hostname, port );
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
	}

	/**
	 * Receive message from the mailbox.
	 */
	public Message receive( ) {
		return( mailbox.get() );
	}

	/**
	 * Listen for connection request (outer while-loop)
	 *  followed by reading messages (inner while-loop).
	 */
	public void run() {

		try {
			while (true) {

	// accept connnection request
	Socket client = listen_socket.accept();

	ObjectInputStream sin = new 
	ObjectInputStream( client.
	getInputStream() );
	DataOutputStream sout = new DataOutputStream( 
	client.getOutputStream() );

		try {
	while (true) {

// read in a message object
	Message msg = (Message)sin.readObject();

// send an acknowledgement
	sout.writeByte( ACK );
	sout.flush();

	try {
// enqueue the message
	mailbox.put( msg );
	} catch (QueueException e) {
	System.out.println( 
	e.getMessage() );
	return;
	}
} // while (inner loop)
	}
catch (Exception e) {
	if (!(e instanceof EOFException))
	e.printStackTrace();
	try {
// clean up
client.close();
sin.close();
sout.close();
} catch (IOException e1) {
e1.printStackTrace();
	}
	}
} // while (outer loop)
}
catch (IOException e) {
	e.printStackTrace();
		}
		finally {
			try {
				listen_socket.close();
			} catch (IOException e1) {
				e1.printStackTrace();
			}
		}
	}
}

Listing 2: The MessageQueue class

import java.lang.*;

/**
 * MessageQueue with synchronization.
 *
 * @author  Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class MessageQueue extends Queue {

	// synchronization primitives
	private Semaphore empty, full;

	public MessageQueue( int size ) {
		super( size );
		empty = new Semaphore( size );
		full  = new Semaphore( 0 );
	}

	/**
	 * Remove message from queue 
	 *  with synchronization.
	 */
	public Message get() {

		full.down();
		Message msg = (Message)super.remove();
		empty.up();
		return( msg );
	}

	/**
	 * Append message to queue
	 *  with synchronization.
	 */
	public void put( Message new_msg )
		throws QueueException {

		empty.down();
		super.append( new_msg );
		full.up();
	}
}

Listing 3: Circular Queue

import java.lang.*;

/**
 * Circular Queue.
 *
 * @author	Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class Queue extends Object {

	public final static int DEFAULT_QSIZE = 10;
	private protected Object buffer[];
	private protected int count, qsize;
	private protected int head, tail;

	/**
	 * Allocate and initialize queue.
	 */
	public Queue( int size ) { /* ... */ }

	public Queue() { /* ... */ }

	/**
	 * Return current queue size.
	 */
	public int n_item() { /* ... */ }

	/**
	 * Append item to queue.
	 */
	public void append( Object new_item ) 
		throws QueueException { /* ... */ }

	/**
	 * Remove item from queue.
	 */
	public Object remove() { /* ... */ }

	/**
	 * Return true if queue is empty, false otherwise
	 */
	public boolean isEmpty() { /* ... */ }

	/**
	 * Return true if queue is full, false otherwise
	 */
	public boolean isFull() { /* ... */ }
}

/**
 * QueueException object for queue overflow.
 */
class QueueException extends Exception {

	public QueueException() { super(); }
	public QueueException(String s) { super(s); }
}

Listing 4: Semaphore

import java.lang.*;

/**
 * Semaphore.
 *
 * @author	Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class Semaphore {

	private int count;

	/**
	 * Initialize semaphore value.
	 */
	public Semaphore( int value ) { /* ... */ }

	/**
	 * V() operation.
	 */
	public synchronized void up() { /* ... */ }

	/**
	 * P() operation.
	 */
	public synchronized void down() { /* ... */ }
} 

Listing 5: Sample Channel class

import java.lang.*;
import java.util.*;
import java.net.*;

/**
 * Test program for Channel component.
 *
 * @author  Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class Test {

	static final int PORT = 5678;

	public static void main(String args[]) {

		System.out.println("## Test started.");

		MessageQueue m = new MessageQueue( 10 );
		Channel c = new Channel( PORT, m );

		Message msg = new Message();
		msg.intention = "Hello";
		try {
			msg.sender = InetAddress.getLocalHost().
							getHostName();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
		msg.content = new Date();

		// send the message to multiple hosts
		for (int i = 0; i < args.length; i++)
			c.send( msg, args[i], PORT );

		// receive and display messages
		while (true) {
			msg = (Message)c.receive();
			System.out.print(msg.intention);
			System.out.print(" from "+msg.sender);
			System.out.println(": "+msg.content);
		}
	}
}

/**
 * Sample Message class.
 */
class Message {

	protected String intention;
	protected String sender;
	protected Object content;

	public Message() { super(); }
}

Listing 6: Example Output

grus% java Test
## Test started.
Hello from marten: Thu Aug 01 15:54:47 SGT 1996
Hello from corona: Thu Aug 01 15:54:50 SGT 1996

marten% java Test grus
## Test started.
Hello from corona: Thu Aug 01 15:54:50 SGT 1996

corona% java Test grus marten
## Test started.

Listing 7: Template for writing service agent

import java.lang.*;

/**
 * Template for writing service agent.
 *
 * @author  Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class ServiceAgent {

	static final int PORT = 5678;

	public static void main(String args[]) {

		MessageQueue m = new MessageQueue( 10 );
		Channel c = new Channel( PORT, m );

		while (true) {

			msg = (Message)c.receive();

			// codes to provide service
			// based on message types
		}
	}
}

 

All Rights Reserved
Copyright ©  2004 SYS-CON Media, Inc.
  E-mail: [email protected]

Java and Java-based marks are trademarks or registered trademarks of Sun Microsystems, Inc. in the United States and other countries. SYS-CON Publications, Inc. is independent of Sun Microsystems, Inc.