HomeDigital EditionSys-Con RadioSearch Java Cd
Advanced Java AWT Book Reviews/Excerpts Client Server Corba Editorials Embedded Java Enterprise Java IDE's Industry Watch Integration Interviews Java Applet Java & Databases Java & Web Services Java Fundamentals Java Native Interface Java Servlets Java Beans J2ME Libraries .NET Object Orientation Observations/IMHO Product Reviews Scalability & Performance Security Server Side Source Code Straight Talking Swing Threads Using Java with others Wireless XML
 

"CHANNEL: A COMMUNICATION COMPONENT"
Vol 1, Issue 3 P.16

	

Listing 1: The Channel class

import java.lang.*;
import java.io.*;
import java.net.*;

/**
 * Channel: a communication component built upon
 *          sockets and object serialization.
 *
 * @author	Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class Channel extends Thread {

	public static final int ACK = 1;
	public static final int QUEUE_SIZE = 30;
	private protected ServerSocket listen_socket;
	private protected MessageQueue mailbox;

	/**
	 * Create a ServerSocket to listen for connection,
	 *  set the mailbox, and start the thread.
	 */
	public Channel( int port, MessageQueue msg_q ) {

		try {
			listen_socket = new ServerSocket( port );
		} catch (IOException e) {
			e.printStackTrace();
		}
		mailbox = msg_q;
		this.start();
	}

	public Channel( int port ) {

		try {
			listen_socket = new ServerSocket( port );
		} catch (IOException e) {
			e.printStackTrace();
		}
		mailbox = new MessageQueue( QUEUE_SIZE );
		this.start();
	}

	/**
	 * Send message to specified host and port.
	 */
	public void send( Object message, String hostname, int port ) {

		try {
			Socket sender = new Socket( hostname, port );

			ObjectOutputStream out = 
				new ObjectOutputStream( sender.
						getOutputStream() );
			out.writeObject( message );
			out.flush();

			DataInputStream in = new DataInputStream( 
				sender.getInputStream() );
			byte ack = in.readByte();

			in.close();
			out.close();
			sender.close();

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * Send message within local host.
	 */
	public void send( Object message, int port ) {

		try {
			String hostname = InetAddress.getLocalHost().getHostName();
			send( message, hostname, port );
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
	}

	/**
	 * Receive message from the mailbox.
	 */
	public Message receive( ) {
		return( mailbox.get() );
	}

	/**
	 * Listen for connection request (outer while-loop)
	 *  followed by reading messages (inner while-loop).
	 */
	public void run() {

		try {
			while (true) {

				// accept connnection request
				Socket client = listen_socket.accept();

				ObjectInputStream sin = new 
					ObjectInputStream( client.
							getInputStream() );
				DataOutputStream sout = new DataOutputStream( 
					client.getOutputStream() );

				try {
					while (true) {

						// read in a message object
						Message msg = (Message)sin.readObject();

						// send an acknowledgement
						sout.writeByte( ACK );
						sout.flush();

						try {
							// enqueue the message
							mailbox.put( msg );
						} catch (QueueException e) {
							System.out.println( 
								e.getMessage() );
							return;
						}
					} // while (inner loop)
				}
				catch (Exception e) {
					if (!(e instanceof EOFException))
						e.printStackTrace();
					try {
						// clean up
						client.close();
						sin.close();
						sout.close();
					} catch (IOException e1) {
						e1.printStackTrace();
					}
				}
			} // while (outer loop)
		}
		catch (IOException e) {
			e.printStackTrace();
		}
		finally {
			try {
				listen_socket.close();
			} catch (IOException e1) {
				e1.printStackTrace();
			}
		}
	}
}

Listing 2: The MessageQueue class

import java.lang.*;

/**
 * MessageQueue with synchronization.
 *
 * @author  Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class MessageQueue extends Queue {

	// synchronization primitives
	private Semaphore empty, full;

	public MessageQueue( int size ) {
		super( size );
		empty = new Semaphore( size );
		full  = new Semaphore( 0 );
	}

	/**
	 * Remove message from queue 
	 *  with synchronization.
	 */
	public Message get() {

		full.down();
		Message msg = (Message)super.remove();
		empty.up();
		return( msg );
	}

	/**
	 * Append message to queue
	 *  with synchronization.
	 */
	public void put( Message new_msg )
		throws QueueException {

		empty.down();
		super.append( new_msg );
		full.up();
	}
}

Listing 3: Circular Queue

import java.lang.*;

/**
 * Circular Queue.
 *
 * @author	Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class Queue extends Object {

	public final static int DEFAULT_QSIZE = 10;
	private protected Object buffer[];
	private protected int count, qsize;
	private protected int head, tail;

	/**
	 * Allocate and initialize queue.
	 */
	public Queue( int size ) { /* ... */ }

	public Queue() { /* ... */ }

	/**
	 * Return current queue size.
	 */
	public int n_item() { /* ... */ }

	/**
	 * Append item to queue.
	 */
	public void append( Object new_item ) 
		throws QueueException { /* ... */ }

	/**
	 * Remove item from queue.
	 */
	public Object remove() { /* ... */ }

	/**
	 * Return true if queue is empty, false otherwise
	 */
	public boolean isEmpty() { /* ... */ }

	/**
	 * Return true if queue is full, false otherwise
	 */
	public boolean isFull() { /* ... */ }
}

/**
 * QueueException object for queue overflow.
 */
class QueueException extends Exception {

	public QueueException() { super(); }
	public QueueException(String s) { super(s); }
}

Listing 4: Semaphore

import java.lang.*;

/**
 * Semaphore.
 *
 * @author	Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class Semaphore {

	private int count;

	/**
	 * Initialize semaphore value.
	 */
	public Semaphore( int value ) { /* ... */ }

	/**
	 * V() operation.
	 */
	public synchronized void up() { /* ... */ }

	/**
	 * P() operation.
	 */
	public synchronized void down() { /* ... */ }
} 

Listing 5: Sample Channel class

import java.lang.*;
import java.util.*;
import java.net.*;

/**
 * Test program for Channel component.
 *
 * @author  Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class Test {

	static final int PORT = 5678;

	public static void main(String args[]) {

		System.out.println("## Test started.");

		MessageQueue m = new MessageQueue( 10 );
		Channel c = new Channel( PORT, m );

		Message msg = new Message();
		msg.intention = "Hello";
		try {
			msg.sender = InetAddress.getLocalHost().
							getHostName();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
		msg.content = new Date();

		// send the message to multiple hosts
		for (int i = 0; i < args.length; i++)
			c.send( msg, args[i], PORT );

		// receive and display messages
		while (true) {
			msg = (Message)c.receive();
			System.out.print(msg.intention);
			System.out.print(" from "+msg.sender);
			System.out.println(": "+msg.content);
		}
	}
}

/**
 * Sample Message class.
 */
class Message {

	protected String intention;
	protected String sender;
	protected Object content;

	public Message() { super(); }
}

Listing 6: Example Output

grus% java Test
## Test started.
Hello from marten: Thu Aug 01 15:54:47 SGT 1996
Hello from corona: Thu Aug 01 15:54:50 SGT 1996

marten% java Test grus
## Test started.
Hello from corona: Thu Aug 01 15:54:50 SGT 1996

corona% java Test grus marten
## Test started.

Listing 7: Template for writing service agent

import java.lang.*;

/**
 * Template for writing service agent.
 *
 * @author  Siet-Leng Lai, Joo-Hwee Lim
 * @version 1.0
 */
public class ServiceAgent {

	static final int PORT = 5678;

	public static void main(String args[]) {

		MessageQueue m = new MessageQueue( 10 );
		Channel c = new Channel( PORT, m );

		while (true) {

			msg = (Message)c.receive();

			// codes to provide service
			// based on message types
		}
	}
}

 

All Rights Reserved
Copyright ©  2004 SYS-CON Media, Inc.
  E-mail: [email protected]

Java and Java-based marks are trademarks or registered trademarks of Sun Microsystems, Inc. in the United States and other countries. SYS-CON Publications, Inc. is independent of Sun Microsystems, Inc.