| |
"Better Scaling with New I/O"
Vol. 7, Issue 5, p. 46
Listing 1: Simple web server that uses the old I/O API
package com.innoq.httpd;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.NoSuchElementException;
import java.util.StringTokenizer;
/**
* Simple multithreaded Http daemon that exclusively uses
* the 'old ' I/O API.
*
* @author hendrik.schreiber@innoq.com
*/
public class Httpd extends Thread {
private static int _no; // instance counter
private ServerSocket serverSocket;
private byte[] buf = new byte[1024 * 8];
private String protocol;
private InputStream in;
private OutputStream out;
private String uri;
/**
* Starts a Httpd thread.
*/
public Httpd(ServerSocket serverSocket)
throws IOException {
super("Httpd " + (_no++));
this.serverSocket = serverSocket;
// default protocol-version
protocol = "HTTP/0.9";
start();
}
/**
* Waits for incoming connections and then calls
* handleRequest.
*/
public void run() {
Socket socket = null;
while (true) {
try {
socket = serverSocket.accept();
// disable Nagle's algorithm
// for better performance
socket.setTcpNoDelay(true);
in = socket.getInputStream();
out = socket.getOutputStream();
handleRequest();
} catch (Exception e) {
// something went wrong...
e.printStackTrace();
} finally {
// clean-up
if (socket != null) {
try {
// this also closes
// the in- and outputstream.
socket.close();
} catch (IOException ioe) {
/* ignore */
}
}
socket = null;
}
}
}
/**
* Reads the request and sends either the file
* or an error message back to the client.
*/
private void handleRequest() throws IOException {
try {
// read only 512 bytes - the line should
// not be longer anyway
int length = in.read(buf, 0, 512);
if (length == 512) {
sendError(414, "Request URI too long.");
return;
}
// we assume ASCII as character set,
// therefore we can use the deprecated but
// faster String constructor.
String requestline = new String(buf, 0, 0, length);
StringTokenizer st = new
StringTokenizer(requestline, " \r\n");
String method = st.nextToken();
uri = st.nextToken();
if (st.hasMoreTokens()) {
protocol = st.nextToken();
}
File file = new File(uri.substring(1));
if (!method.equals("GET")) {
sendError(405, "Method " + method
+ " is not supported.");
} else if (!file.exists() || file.isDirectory()) {
sendError(404, "Resource " + uri
+ " was not found.");
} else if (!file.canRead()) {
sendError(403, "Forbidden: " + uri);
} else {
sendFile(file);
}
} catch (NoSuchElementException nsee) {
// we didn't read enough tokens
sendError(400, "Bad request.");
} catch (Exception e) {
try {
sendError(500, "Internal Server Error.");
} catch (IOException ioe) {
/* ignore */
}
}
}
/**
* Sends an error message to the client.
*/
private void sendError(int httpStatus,
String httpMessage) throws IOException {
StringBuffer errorMessage = new StringBuffer(128);
if (!protocol.equals("HTTP/0.9")) {
errorMessage.append("HTTP/1.0 " + httpStatus
+ " " + httpMessage + "\r\n\r\n");
}
errorMessage.append("<HTML><BODY><H1>"
+ httpMessage + "</H1></BODY></HTML>");
out.write(errorMessage.toString().getBytes("ASCII"));
out.flush();
}
/**
* Sends the requested file to the client.
*/
private void sendFile(File file) throws IOException {
InputStream filein = null;
try {
filein = new FileInputStream(file);
if (!protocol.equals("HTTP/0.9")) {
// write status code and header
out.write(("HTTP/1.0 200 OK\r\nContent-Type: "
+ Httpd.guessContentType(uri)
+ "\r\n\r\n").getBytes("ASCII"));
}
int length = 0;
while ((length = filein.read(buf)) != -1) {
out.write(buf, 0, length);
}
out.flush();
} finally {
if (filein != null)
try {
filein.close();
} catch (IOException ioe) {
/* ignore */
}
}
}
/**
* Returns the content-type of a given resource.
*/
public static String guessContentType(String uri) {
// Hack. This should be done with a
// configuration file.
uri = uri.toLowerCase();
if (uri.endsWith(".html") || uri.endsWith(".htm")) {
return "text/html";
} else if (uri.endsWith(".txt")) {
return "text/plain";
} else if (uri.endsWith(".jpg")
|| uri.endsWith(".jpeg")) {
return "image/jpeg";
} else if (uri.endsWith(".gif")) {
return "image/gif";
} else {
return "unknown";
}
}
/**
* Starts the Http daemon with n threads. The first
* argument is n.
*/
public static void main(String[] args)
throws IOException {
ServerSocket serverSocket = new ServerSocket(8080);
for (int i=0; i < Integer.parseInt(args[0]); i++) {
new Httpd(serverSocket);
}
}
}
Listing 2: The class NIOHttpd serves as a launcher for the actual workers Acceptor (Listing 3) and ConnectionSelector (Listing 5)
package com.innoq.httpd;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
/**
* Http daemon that uses the new I/O API. Connections are
* accepted by {@link Acceptor} threads. They are then
* registered as {@link Connection}s with a {@link
* java.nio.channels.Selector}. {@link ConnectionSelector}
* threads call the connection's work-methods
* until the connections de-register themselves.
*
* @author hendrik.schreiber@innoq.com
* @see Acceptor
* @see ConnectionSelector
* @see Connection
*/
public class NIOHttpd {
/**
* Returns the content-type of a given resource.
*/
public static String guessContentType(String uri) {
// Hack. This should be done with a
// configuration file.
uri = uri.toLowerCase();
if (uri.endsWith(".html") || uri.endsWith(".htm")) {
return "text/html";
} else if (uri.endsWith(".txt")) {
return "text/plain";
} else if (uri.endsWith(".jpg")
|| uri.endsWith(".jpeg")) {
return "image/jpeg";
} else if (uri.endsWith(".gif")) {
return "image/gif";
} else {
return "unknown";
}
}
/**
* Starts the Http daemon with 2n threads. The
* first argument is n.
*/
public static void main(String[] args)
throws IOException {
ServerSocketChannel serverSocketChannel
= ServerSocketChannel.open();
serverSocketChannel.socket().bind(
new InetSocketAddress(8080)
);
for (int i = 0; i < Integer.parseInt(args[0]); i++) {
ConnectionSelector connectionSelector
= new ConnectionSelector();
Acceptor acceptor = new Acceptor(
serverSocketChannel, connectionSelector
);
}
}
}
Listing 3: Acceptor accepts incoming connections and registers them with the ConnectionSelector (Listing 5)
package com.innoq.httpd;
import java.net.SocketChannel;
import java.nio.channels.ServerSocketChannel;
/**
* Acceptors call the accept() method of a {@link
* ServerSocketChannel}, instantiate a {@link Connection}
* object with the accepted connection, and register this
* connection with the {@link ConnectionSelector}.
*
* @author hendrik.schreiber@innoq.com
*/
class Acceptor extends Thread {
private static int _no; // Instance counter
private ServerSocketChannel serverSocketChannel;
private ConnectionSelector connectionSelector;
/**
* Starts this Acceptor thread.
*/
public Acceptor(ServerSocketChannel serverSocketChannel,
ConnectionSelector connectionSelector) {
super("Acceptor " + (_no++));
this.serverSocketChannel = serverSocketChannel;
this.connectionSelector = connectionSelector;
start();
}
/**
* Accepts connections and regsiters them with
* {@link ConnectionSelector#queue(Connection)}.
*/
public void run() {
while (true) {
SocketChannel socketChannel = null;
try {
socketChannel = serverSocketChannel.accept();
connectionSelector.queue(
new Connection(socketChannel)
);
} catch (Exception e) {
e.printStackTrace();
// clean-up, if necessary
if (socket != null) {
try {
socket.close();
} catch (Exception ee) {
/* ignore */
}
}
}
}
}
}
Listing 4: Connection represents the connection and deals with the protocol's specifics
package com.innoq.httpd;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.StringTokenizer;
import java.util.NoSuchElementException;
/**
* Represents the connection, during which the request is
* read and a response is written.
*
* @author hendrik.schreiber@innoq.com
* @see ConnectionSelector
* @see Acceptor
*/
class Connection {
private SocketChannel socketChannel;
private ByteBuffer requestLineBuffer;
private ByteBuffer responseLineBuffer;
private int endOfLineIndex;
private SelectionKey key;
private FileChannel fileChannel;
private long filePos;
private long fileLength;
private int httpStatus;
private String httpMessage;
private String uri;
private String protocol;
/**
* Initializes this connection with a SocketChannel.
* Here also the channel is set to non-blocking mode.
*/
public Connection(SocketChannel socketChannel)
throws IOException {
this.socketChannel = socketChannel;
// disable Nagle's algorithm for better performance
socketChannel.socket().setTcpNoDelay(true);
// the channel shall _not_ block
socketChannel.configureBlocking(false);
requestLineBuffer = ByteBuffer.allocate(512);
// default http status code: OK
httpStatus = 200;
// default http status message
httpMessage = "OK";
// default protocol version
protocol = "HTTP/0.9";
}
/**
* Register this connection with the provided Selector.
* Initially we are only interested in read operations
* ({@link SelectionKey.OP_READ}).
* This method is called by {@link ConnectionSelector}.
*/
public void register(Selector selector)
throws IOException {
key = socketChannel.register(selector,
SelectionKey.OP_READ);
// deposit this connection in its selection key
key.attach(this);
}
/**
* Reads the request. If somethings goes wrong, an error
* code is set.
* If the whole request read,
* {@link #prepareForResponse()} is called.
*/
public void readRequest() throws IOException {
try {
if (!requestLineBuffer.hasRemaining()) {
setError(414, "Request URI too long.");
prepareForResponse();
return;
}
socketChannel.read(requestLineBuffer);
if (!isRequestLineRead()) {
return;
}
requestLineBuffer.flip();
byte[] b = new byte[endOfLineIndex];
requestLineBuffer.get(b);
String requestline = new String(b, 0);
StringTokenizer st
= new StringTokenizer(requestline, " \r\n");
String method = st.nextToken();
uri = st.nextToken();
File file = new File(uri.substring(1));
if (st.hasMoreTokens()) {
protocol = st.nextToken();
}
if (!method.equals("GET")) {
setError(405, "Method " + method
+ " is not supported.");
} else if (!file.exists() || file.isDirectory()) {
setError(404, "Resource " + uri
+ " was not found.");
} else if (!file.canRead()) {
setError(403, "Forbidden: " + uri);
} else {
fileLength = file.length();
fileChannel
= new FileInputStream(file).getChannel();
}
prepareForResponse();
} catch (NoSuchElementException nsee) {
// we didn't read enough tokens
setError(400, "Bad request.");
} catch (Exception e) {
// something else went wrong
setError(500, "Internal Server Error.");
prepareForResponse();
e.printStackTrace();
}
}
/**
* Creates a buffer that contains the response line,
* headers and in case of an error an HTML message.
*/
private void prepareForResponse() throws IOException {
StringBuffer responseLine = new StringBuffer(128);
// write response line if Http >= 1.0
if (!protocol.equals("HTTP/0.9")) {
responseLine.append("HTTP/1.0 " + httpStatus
+ " " + httpMessage + "\r\n");
// In case of an error, we don't need headers
if (httpStatus != 200) {
responseLine.append("\r\n");
}
else {
// Header for the file
responseLine.append("Content-Type: "
+ NIOHttpd.guessContentType(uri)
+ "\r\n\r\n");
}
}
if (httpStatus != 200) {
// Error message for the user
responseLine.append("<HTML><BODY><H1>"
+ httpMessage + "</H1></BODY></HTML>");
}
responseLineBuffer = ByteBuffer.wrap(
responseLine.toString().getBytes("ASCII")
);
key.interestOps(SelectionKey.OP_WRITE);
key.selector().wakeup();
}
/**
* Inidcates, whether the request line was read.
*/
private boolean isRequestLineRead() {
for (; endOfLineIndex < requestLineBuffer.limit();
endOfLineIndex++) {
if (requestLineBuffer.get(endOfLineIndex) == '\r')
return true;
}
return false;
}
/**
* Writes the responseLineBuffer and - if necessary -
* the requested file to the client. After all data
* has been written, the selection key is cancelled and
* the connection is closed.
*/
public void writeResponse() throws IOException {
// write the response buffer
if (responseLineBuffer.hasRemaining()) {
socketChannel.write(responseLineBuffer);
}
// if the complete response buffer has been written,
// we are either done (in case of an error) or
// we need to send the file.
if (!responseLineBuffer.hasRemaining()) {
if (httpStatus != 200) {
close();
} else {
filePos += fileChannel.transferTo(
filePos,
(int)Math.min(64*1024, fileLength-filePos),
socketChannel
);
if (filePos == fileLength) {
close();
}
}
}
}
/**
* Sets an error.
*/
private void setError(int httpStatus,
String httpMessage) {
this.httpStatus = httpStatus;
this.httpMessage = httpMessage;
}
/**
* Cancels the selection key and closes all open
* channels.
*/
public void close() {
try {
if (key != null) key.cancel();
} catch (Exception e) {
/* ignore */
}
try {
if (socketChannel != null) socketChannel.close();
} catch (Exception e) {
/* ignore */
}
try {
if (fileChannel != null) fileChannel.close();
} catch (Exception e) {
/* ignore */
}
}
}
Listing 5: ConnectionSelector selects the connections, which are ready for I/O operations
package com.innoq.httpd;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Calls the work-methods of registered {@link Connection}s,
* if the corresponding channels are ready for the specified
* operations.
*
* @author hendrik.schreiber@innoq.com
*/
class ConnectionSelector extends Thread {
private static int _no; // instance counter
private Selector selector;
private List queue;
/**
* Instantiates and starts this ConnectionSelector. The
* {@link Selector} is instantiated, too.
*/
public ConnectionSelector() throws IOException {
super("ConnectionSelector " + (_no++));
selector = Selector.open();
queue = new ArrayList();
start();
}
/**
* Queues a connection and calls {@link
* Selector#wakeup()}, so that a {@link SelectionKey} can
* be created and the connection can be registered.
*
* @see #registerQueuedConnections()
*/
public void queue(Connection connection) {
synchronized (queue) {
queue.add(connection);
}
// make sure that select() wakes up and the queued
// connections is taken care of.
selector.wakeup();
}
/**
* Registers all queued connections with the Selector.
*
* @see Connection#register(Selector)
*/
private void registerQueuedConnections()
throws IOException {
// the synchronized block is a bottleneck, therefore
// it should be avoided if possible
if (!queue.isEmpty()) {
synchronized (queue) {
while (!queue.isEmpty()) {
Connection connection =
(Connection)queue.remove(queue.size()-1);
connection.register(selector);
}
}
}
}
/**
* Calls {@link Selector#select()} in an inifnite loop.
* When the call to select() returns, all queued
* connections are registered with the Selector. Then
* {@link Connection#readRequest()} or {@link
* Connection#writeResponse()} respectively
* are called for connections that are ready for read or
* write operations.
*/
public void run() {
while (true) {
try {
int i = selector.select();
registerQueuedConnections();
if (i > 0) {
Set set = selector.selectedKeys();
Iterator connectionIterator = set.iterator();
while (connectionIterator.hasNext()) {
SelectionKey key =
(SelectionKey)connectionIterator.next();
Connection connection =
(Connection)key.attachment();
try {
if (key.interestOps()
== SelectionKey.OP_READ) {
connection.readRequest();
} else {
connection.writeResponse();
}
} catch (IOException ioe) {
connection.close();
} catch (Throwable t) {
connection.close();
t.printStackTrace();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}
|
|