Index: src/test/java/org/mortbay/jetty/client/HttpExchangeTest.java =================================================================== --- src/test/java/org/mortbay/jetty/client/HttpExchangeTest.java (revision 81) +++ src/test/java/org/mortbay/jetty/client/HttpExchangeTest.java (working copy) @@ -1,13 +1,25 @@ package org.mortbay.jetty.client; import java.io.IOException; -import java.net.InetSocketAddress; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.CountDownLatch; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + import junit.framework.TestCase; import org.mortbay.io.Buffer; import org.mortbay.io.ByteArrayBuffer; -import org.mortbay.jetty.MimeTypes; +import org.mortbay.jetty.Connector; +import org.mortbay.jetty.HttpConnection; +import org.mortbay.jetty.HttpMethods; +import org.mortbay.jetty.Request; +import org.mortbay.jetty.Server; +import org.mortbay.jetty.handler.AbstractHandler; +import org.mortbay.jetty.nio.SelectChannelConnector; /** * Functional testing for HttpExchange. @@ -16,47 +28,152 @@ */ public class HttpExchangeTest extends TestCase { - private HttpExchange httpExchange; + private Server _server; + private int _port; + private HttpConnectionPool _connectionPool; + + protected void setUp() throws Exception + { + startServer(); + _connectionPool=new HttpConnectionPool(); + _connectionPool.start(); + } + + protected void tearDown() throws Exception + { + stopServer(); + } + + public void testPerf() throws Exception { + testSend(100); + + for (int i = 100; i <= 10000; i *= 10) { + testSend(i); + } + } - public void setUp() + /** + * Test sending data through the exchange. + * + * @throws IOException + */ + public void testSend(final int nb) throws Exception { - httpExchange=new HttpExchange() - { - protected void onProgress(Buffer content, boolean last) + final CountDownLatch latch = new CountDownLatch(nb); + long l0 = System.currentTimeMillis(); + for (int i = 0; i < nb; i++) { + HttpExchange httpExchange=new HttpExchange() { - System.err.println("Response content="+content); - super.onProgress(content,last); + protected void onRequestSent() + { + //System.err.println("Request sent (" + this._connection._endp + ")"); + //InputStream is = getResponseStream(); + //OutputStream baos = new ByteArrayOutputStream(); + //copyStream(is, baos); + //System.err.println(baos.toString()); + + } + protected void onResponseReceived(Buffer version, int status, Buffer reason) + { + //System.err.println("Received response: " + version+" "+status+" "+reason); + } + protected void onParsedHeader(Buffer name, Buffer value) + { + //System.err.println("Received header: " + name + " = " + value); + } + protected void onContent(Buffer content) + { + //System.err.println("Received content:" + content); + } + protected void onComplete() + { + //System.err.println("Response completed"); + latch.countDown(); + } + }; + + httpExchange.setURL("http://localhost:" + _port + "/"); + httpExchange.setMethod(HttpMethods.GET); // Optional. GET by default + //httpExchange.setAddress(new InetSocketAddress("localhost",_port)); // May use setURL instead + //httpExchange.setAddress(new InetSocketAddress("localhost",_port)); // May use setURL instead + //httpExchange.setURI("/ep1/?wsdl"); + //httpExchange.setRequestContentType(MimeTypes.FORM_ENCODED); + //httpExchange.addRequestHeader("arbitrary","value"); + //httpExchange.setRequestContent(new ByteArrayBuffer("")); + //httpExchange.setVersion(HttpVersions.HTTP_1_1_ORDINAL); + _connectionPool.send(httpExchange); + } + //System.err.println("Messages sent"); + latch.await(); + long l1 = System.currentTimeMillis(); + System.err.println("Delay (jetty, "+nb+"): " + (l1 - l0) + " ms"); + } + + public void testPostWithStreamedResponse() throws Exception { + HttpExchange.StreamedExchange httpExchange = new HttpExchange.StreamedExchange(); + httpExchange.setURL("http://localhost:" + _port + "/"); + httpExchange.setMethod(HttpMethods.POST); + httpExchange.setRequestContent(new ByteArrayBuffer("")); + _connectionPool.send(httpExchange); + httpExchange.waitForStatus(HttpExchange.STATUS_PARSING_CONTENT); + InputStream is = httpExchange.getResponseStream(); + copyStream(is, System.err); + } + + public void testGetWithStreamedResponse() throws Exception { + HttpExchange.StreamedExchange httpExchange = new HttpExchange.StreamedExchange(); + httpExchange.setURL("http://localhost:" + _port + "/"); + httpExchange.setMethod(HttpMethods.GET); + _connectionPool.send(httpExchange); + httpExchange.waitForStatus(HttpExchange.STATUS_PARSING_CONTENT); + InputStream is = httpExchange.getResponseStream(); + copyStream(is, System.err); + is.close(); + } + + public static void copyStream(InputStream in, OutputStream out) { + try { + byte[] buffer = new byte[1024]; + int len; + while ((len = in.read(buffer)) >= 0) { + out.write(buffer, 0, len); } + } catch (IOException e) { + e.printStackTrace(); + } + } - protected void onSuccess(int status) + private void startServer() throws Exception + { + _server = new Server(); + Connector connector = new SelectChannelConnector(); + connector.setPort(0); + _server.setConnectors(new Connector[] { connector }); + _server.setHandler(new AbstractHandler() { + public void handle(String target, HttpServletRequest request, HttpServletResponse response, int dispatch) throws IOException, ServletException { - System.err.println("Response status="+status); - super.onSuccess(status); + Request base_request=(request instanceof Request)?(Request)request:HttpConnection.getCurrentConnection().getRequest(); + base_request.setHandled(true); + response.setStatus(200); + if (request.getMethod().equalsIgnoreCase("GET")) + { + response.getOutputStream().println(""); + for (int i = 0; i < 10000; i++) + response.getOutputStream().println(" "); + response.getOutputStream().println(""); + } + else + { + copyStream(request.getInputStream(), response.getOutputStream()); + } } - }; - - httpExchange.setURL("http://localhost:8080/test/dump/info"); - - HttpConnectionPool connectionPool=new HttpConnectionPool(); - - httpExchange.setConnectionPool(connectionPool); - - httpExchange.setMethod("GET"); // Optional. GET by default - httpExchange.setAddress(new InetSocketAddress("localhost",8080)); // May use setURL instead - httpExchange.setURI("/test/dump/info"); - httpExchange.setRequestContentType(MimeTypes.FORM_ENCODED); - httpExchange.addRequestHeader("arbitrary","value"); - httpExchange.setRequestContent(new ByteArrayBuffer("param=value\n")); + }); + _server.start(); + _port = connector.getLocalPort(); } - /** - * Test sending data through the exchange. - * - * @throws IOException - */ - public void testSend() throws IOException + private void stopServer() throws Exception { - httpExchange.send(); - httpExchange.send(true); // pipelined on last used connection + _server.stop(); } } Index: src/main/java/org/mortbay/jetty/client/HttpConnection.java =================================================================== --- src/main/java/org/mortbay/jetty/client/HttpConnection.java (revision 81) +++ src/main/java/org/mortbay/jetty/client/HttpConnection.java (working copy) @@ -1,28 +1,39 @@ package org.mortbay.jetty.client; import java.io.IOException; -import java.util.LinkedList; import org.mortbay.io.Buffer; import org.mortbay.io.Buffers; +import org.mortbay.io.Connection; import org.mortbay.io.EndPoint; -import org.mortbay.jetty.Generator; +import org.mortbay.jetty.AbstractGenerator; import org.mortbay.jetty.HttpGenerator; +import org.mortbay.jetty.HttpHeaderValues; +import org.mortbay.jetty.HttpHeaders; import org.mortbay.jetty.HttpParser; +import org.mortbay.jetty.HttpTokens; +import org.mortbay.jetty.HttpVersions; -public class HttpConnection + +/** + * + * @author Greg Wilkins + * @author Guillaume Nodet + */ +class HttpConnection implements Connection { - private HttpDestination _destination; - private EndPoint _endp; - private HttpExchange _exchange; - private LinkedList _exchanges=new LinkedList(); - private Generator _generator; - private HttpParser _parser; + HttpDestination _destination; + EndPoint _endp; + HttpExchange _exchange; + HttpGenerator _generator; + HttpParser _parser; + boolean _http11; + Buffer _connectionHeader; + /* ------------------------------------------------------------ */ - HttpConnection(HttpDestination destination, Buffers buffers,EndPoint endp,int hbs,int cbs) + HttpConnection(Buffers buffers,EndPoint endp,int hbs,int cbs) { - _destination=destination; _endp=endp; _generator=new HttpGenerator(buffers,endp,hbs,cbs); _parser=new HttpParser(buffers,endp,new Handler(),hbs,cbs); @@ -33,106 +44,132 @@ return _destination; } + public void setDestination(HttpDestination destination) + { + _destination = destination; + } + /* ------------------------------------------------------------ */ public void handle() throws IOException { - if (_exchange==null) - nextExchange(); - int no_progress=0; - while (_endp.isOpen()) + while (_endp.isOpen() && _exchange != null) { + if (_exchange instanceof HttpExchange.StreamedExchange && + _parser.inContentState()) + { + break; + } try { long io=0; - // Do we have more writting to do? - if (_generator.isCommitted() && !_generator.isComplete()) - io+=_generator.flush(); - - if (_endp.isBufferingOutput()) - { - _endp.flush(); - if (_endp.isBufferingOutput()) - no_progress=0; - } - // If we are not ended then parse available if (!_parser.isComplete()) - io=_parser.parseAvailable(); + io=_parser.parseNext(); if (io>0) no_progress=0; else if (no_progress++>=2) - return; + { + if (!_endp.isBlocking()) + { + return; + } + } } - catch (IOException e) - { - // TODO Handle this! - return; - } finally { - if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput()) + if (_parser.isComplete()) { - _exchange=null; - nextExchange(); - if (_exchange==null) - reset(true); + reset(true); } } } } /* ------------------------------------------------------------ */ - public void send(HttpExchange ex) - throws IOException + public boolean isIdle() { - _exchanges.add(ex); - - // TODO schedule stuff - + return _exchange == null; } - /* ------------------------------------------------------------ */ - private void nextExchange() throws IOException + public void send(HttpExchange ex) throws IOException { - if (_exchange!=null || _exchanges.size()==0) - return; - - _exchange=_exchanges.remove(0); - System.err.println("EX:"+_exchange.getURI()); - - _generator.setVersion(11); // TODO - _generator.setRequest(_exchange.getMethod(),_exchange.getURI()); - - // TODO this is not right - if (_exchange._requestContent!=null) + ex.setStatus(HttpExchange.STATUS_SENDING_REQUEST); + ex._requestFields.put(HttpHeaders.HOST_BUFFER, _destination._hostHeaderBuffer); + _generator.setVersion(ex._version); + _generator.setRequest(ex._method, ex._uri); + if (ex._requestEntity != null) { - _generator.addContent(_exchange._requestContent,Generator.LAST); - _generator.completeHeader(_exchange.getRequestFields(),Generator.LAST); - _generator.complete(); + ex._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO: should not be needed + _generator.setContentLength(HttpTokens.CHUNKED_CONTENT); + _generator.completeHeader(ex._requestFields, false); + AbstractGenerator.Output os = new AbstractGenerator.Output(_generator, 30000); + AbstractGenerator.OutputWriter writer = new AbstractGenerator.OutputWriter(os); + writer.setCharacterEncoding(null); + ex._requestEntity.write(os, writer); + writer.close(); + os.close(); } - + else if (ex._requestContent != null) + { + ex._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH, ex._requestContent.length()); + _generator.completeHeader(ex._requestFields, false); + _generator.addContent(ex._requestContent, true); + } + else + { + ex._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO: should not be needed + _generator.completeHeader(ex._requestFields, true); + } + _generator.flush(); + ex.onRequestSent(); + if (ex instanceof HttpExchange.StreamedExchange) + { + ((HttpExchange.StreamedExchange) ex)._parser = _parser; + } + ex.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE); + _exchange = ex; + if (_endp.isBlocking()) + { + handle(); + } } + /* ------------------------------------------------------------ */ - protected void reset(boolean returnBuffers) + protected void reset(boolean returnBuffers) throws IOException { + _connectionHeader = null; _parser.reset(returnBuffers); _generator.reset(returnBuffers); + boolean close = shouldClose(); + _exchange = null; + _destination.returnConnection(this, close); } - + /* ------------------------------------------------------------ */ - private class Handler extends HttpParser.EventHandler + private boolean shouldClose() { - @Override - public void content(Buffer ref) throws IOException + if (_connectionHeader == HttpHeaderValues.CLOSE_BUFFER) { - System.err.println(ref); + return true; } + else if (_connectionHeader == HttpHeaderValues.KEEP_ALIVE_BUFFER) + { + return false; + } + else + { + return !_http11; + } + } + /* ------------------------------------------------------------ */ + private class Handler extends HttpParser.EventHandler + { @Override public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException { @@ -142,9 +179,41 @@ @Override public void startResponse(Buffer version, int status, Buffer reason) throws IOException { - System.err.println(version+" "+status+" "+reason); + _http11 = (version == HttpVersions.HTTP_1_1_BUFFER); + _exchange.onResponseReceived(version, status, reason); + _exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS); } + @Override + public void parsedHeader(Buffer name, Buffer value) throws IOException + { + if (name == HttpHeaders.CONNECTION_BUFFER) + { + _connectionHeader = HttpHeaderValues.CACHE.lookup(value); + } + //_exchange._responseFields.add(name, value); + _exchange.onParsedHeader(name, value); + } + + @Override + public void headerComplete() throws IOException + { + _exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT); + _exchange.onHeaderComplete(); + } + + @Override + public void content(Buffer ref) throws IOException + { + _exchange.onContent(ref); + } + + @Override + public void messageComplete(long contextLength) throws IOException + { + _exchange.onComplete(); + _exchange.setStatus(HttpExchange.STATUS_COMPLETED); + } } Index: src/main/java/org/mortbay/jetty/client/HttpDestination.java =================================================================== --- src/main/java/org/mortbay/jetty/client/HttpDestination.java (revision 81) +++ src/main/java/org/mortbay/jetty/client/HttpDestination.java (working copy) @@ -3,24 +3,43 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.LinkedList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; -import org.mortbay.io.nio.ChannelEndPoint; +import org.mortbay.io.Buffer; +import org.mortbay.io.ByteArrayBuffer; -public class HttpDestination +/** +* +* @author Greg Wilkins +* @author Guillaume Nodet +*/ +class HttpDestination implements Runnable { private InetSocketAddress _address; - private ArrayList _connections=new ArrayList(); + private LinkedList _connections=new LinkedList(); + private LinkedList _idle=new LinkedList(); private LinkedList _exchanges=new LinkedList(); private HttpConnectionPool _pool; + private boolean _ssl; + private int _maxConnections; + private AtomicBoolean _dispatched = new AtomicBoolean(false); + private AtomicInteger _pendingConnections = new AtomicInteger(0); + Buffer _hostHeaderBuffer; /* ------------------------------------------------------------ */ - HttpDestination(HttpConnectionPool pool, InetSocketAddress address) + HttpDestination(HttpConnectionPool pool, InetSocketAddress address, boolean ssl, int maxConnections) { _pool=pool; _address=address; + _ssl=ssl; + _maxConnections=maxConnections; + String host = address.getHostName(); + if (address.getPort() != (_ssl ? 443 : 80)) { + host += ":" + address.getPort(); + } + _hostHeaderBuffer = new ByteArrayBuffer(host); } /* ------------------------------------------------------------ */ @@ -30,45 +49,149 @@ } /* ------------------------------------------------------------------------------- */ - public HttpConnection getConnection(boolean ssl, boolean blockForIdle) throws UnknownHostException, IOException + public HttpConnection getConnection() throws IOException { - HttpConnection connection=null; // TODO search for existing idle connection + synchronized (_connections) + { + // Find an idle connection + if (_idle.size() > 0) + { + HttpConnection connection = _idle.removeFirst(); + return connection; + } + else { + startNewConnectionIfNeeded(); + return null; + } + } + } - // If a connection doesn't exist, create a new one. - if (connection==null) + /* ------------------------------------------------------------------------------- */ + protected void startNewConnectionIfNeeded() throws UnknownHostException, IOException + { + if (_connections.size() + _pendingConnections.get() < _maxConnections) { - return newConnection(ssl); + startNewConnection(); } - else + } + + protected void startNewConnection() throws UnknownHostException, IOException + { + _pendingConnections.incrementAndGet(); + _pool._connector.startConnection(this); + //System.err.println("Starting new connection to: " + _address); + } + + /* ------------------------------------------------------------------------------- */ + public void onNewConnection(HttpConnection connection) throws IOException + { + _pendingConnections.decrementAndGet(); + synchronized (_connections) { - return connection; + _connections.add(connection); + _idle.add(connection); } + dispatch(); } /* ------------------------------------------------------------------------------- */ - public HttpConnection newConnection(boolean ssl) throws UnknownHostException, IOException + public void returnConnection(HttpConnection connection, boolean close) throws IOException { - SocketChannel channel=SocketChannel.open(); - channel.configureBlocking(false); - channel.connect(_address); - - // TODO return REAL http connection - return new HttpConnection(this,_pool,new ChannelEndPoint(channel),_pool.getHeaderBufferSize(),_pool.getRequestBufferSize()); - + if (close) + { + synchronized (_connections) + { + _connections.remove(connection); + } + connection._endp.close(); + if (_exchanges.size() > 0) + { + startNewConnectionIfNeeded(); + } + } + else + { + synchronized (_connections) + { + _idle.add(connection); + } + dispatch(); + } } - + /* ------------------------------------------------------------ */ public void send(HttpExchange ex) throws IOException { - _exchanges.add(ex); - - // TODO schedule stuff - + synchronized (_exchanges) + { + _exchanges.add(ex); + } + dispatch(); + startNewConnectionIfNeeded(); } - - /* ------------------------------------------------------------ */ - public void setAddress(InetSocketAddress address) + + protected void dispatch() { - _address=address; + if (_exchanges.size() > 0 && _dispatched.compareAndSet(false, true)) { + _pool._threadPool.dispatch(this); + } } + + public void run() + { + try { + for (;;) + { + try + { + final HttpConnection connection = getConnection(); + if (connection != null) + { + final HttpExchange ex; + synchronized (_exchanges) + { + if (_exchanges.size() > 0) + { + ex = _exchanges.removeFirst(); + } + else + { + returnConnection(connection, false); + return; + } + } + _pool._threadPool.dispatch(new Runnable() + { + public void run() + { + try + { + connection.send(ex); + } + catch (Exception e) + { + e.printStackTrace(); + // TODO: report expcetion on exchange + } + } + }); + } + else + { + return; + } + } + catch (Exception e) + { + e.printStackTrace(); + // TODO: report expcetion on exchange + } + } + } + finally + { + _dispatched.set(false); + } + } + } \ No newline at end of file Index: src/main/java/org/mortbay/jetty/client/HttpExchange.java =================================================================== --- src/main/java/org/mortbay/jetty/client/HttpExchange.java (revision 81) +++ src/main/java/org/mortbay/jetty/client/HttpExchange.java (working copy) @@ -1,33 +1,52 @@ package org.mortbay.jetty.client; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Writer; import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicInteger; + import org.mortbay.io.Buffer; import org.mortbay.io.ByteArrayBuffer; import org.mortbay.jetty.HttpFields; import org.mortbay.jetty.HttpHeaders; import org.mortbay.jetty.HttpMethods; +import org.mortbay.jetty.HttpParser; import org.mortbay.jetty.HttpSchemes; import org.mortbay.jetty.HttpURI; -import org.mortbay.jetty.MimeTypes; +import org.mortbay.jetty.HttpVersions; /** * VERY rough start to a client API - inpired by javascript XmlHttpRequest. * * @author gregw + * @author Guillaume Nodet */ public class HttpExchange { + public static final int STATUS_UNKOWN = 0; + public static final int STATUS_WAITING_FOR_CONNECTION = 1; + public static final int STATUS_SENDING_REQUEST = 2; + public static final int STATUS_WAITING_FOR_RESPONSE = 3; + public static final int STATUS_PARSING_HEADERS = 4; + public static final int STATUS_PARSING_CONTENT = 5; + public static final int STATUS_COMPLETED = 6; + + public interface Entity { + public void write(OutputStream os, Writer w) throws IOException; + } + InetSocketAddress _address; - HttpConnectionPool _connectionPool=new HttpConnectionPool(); - String _method; - Buffer _requestContent; - - HttpFields _requestFields=new HttpFields(); - Buffer _responseContent; - HttpFields _responseFields=new HttpFields(); + String _method = HttpMethods.GET; Buffer _scheme; + int _version = HttpVersions.HTTP_1_1_ORDINAL; String _uri; + AtomicInteger _status = new AtomicInteger(STATUS_UNKOWN); + HttpFields _requestFields = new HttpFields(); + Buffer _requestContent; + Entity _requestEntity; /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ @@ -36,25 +55,34 @@ /* ------------------------------------------------------------ */ - /** - * @param connectionPool - */ - public void setConnectionPool(HttpConnectionPool connectionPool) + public int getStatus() { - _connectionPool=connectionPool; + return _status.get(); } /* ------------------------------------------------------------ */ - /** - * @return - */ - public HttpConnectionPool getConnectionPool() + public void waitForStatus(int status) throws InterruptedException { - return _connectionPool; + synchronized (_status) + { + while (_status.get() < status) + { + _status.wait(); + } + } } - /* ------------------------------------------------------------ */ + void setStatus(int status) + { + synchronized (_status) + { + _status.set(status); + _status.notifyAll(); + } + } + + /* ------------------------------------------------------------ */ /** * @param url */ @@ -121,6 +149,24 @@ /* ------------------------------------------------------------ */ /** + * @param scheme + */ + public void setVersion(int version) + { + _version=version; + } + + /* ------------------------------------------------------------ */ + /** + * @return + */ + public int getVersion() + { + return _version; + } + + /* ------------------------------------------------------------ */ + /** * @param method */ public void setMethod(String method) @@ -225,63 +271,149 @@ /* ------------------------------------------------------------ */ /** - * @throws IOException + * @param requestContent */ - public void send() throws IOException + public void setRequestContent(Buffer requestContent) { - // TODO add to destination queue - HttpConnectionPool pool = getConnectionPool(); - HttpConnection connection = pool.getConnection(getAddress(),HttpSchemes.HTTPS_BUFFER.equalsIgnoreCase(getScheme()),true); - connection.send(this); + _requestContent=requestContent; } /* ------------------------------------------------------------ */ - /** - * @param pipeline - * @throws IOException - */ - public void send(boolean pipeline) throws IOException + public Buffer getRequestContent() { - // TODO add to destination queue - getConnectionPool().getConnection(getAddress(),HttpSchemes.HTTPS_BUFFER.equalsIgnoreCase(getScheme()),!pipeline).send(this); + return _requestContent; } - /* ------------------------------------------------------------ */ /** - * @param requestContent + * @param entity */ - public void setRequestContent(Buffer requestContent) + public void setRequestEntity(Entity requestEntity) { - _requestContent=requestContent; + _requestEntity=requestEntity; } + + /* ------------------------------------------------------------ */ + public Entity getRequestEntity() + { + return _requestEntity; + } /* ------------------------------------------------------------ */ - /** - * @param requestContent - */ - public void addRequestContent(Buffer requestContent) + public InputStream getResponseStream() { - _requestContent=requestContent; - // TODO implement! + // TODO + return null; } - /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ // methods to handle response + protected void onRequestSent() throws IOException + { + } + + protected void onResponseReceived(Buffer version, int status, Buffer reason) throws IOException + { + } + protected void onParsedHeader(Buffer name, Buffer value) throws IOException + { + } + + protected void onHeaderComplete() throws IOException + { + } - protected void onProgress(Buffer content, boolean last) + protected void onContent(Buffer content) throws IOException { - // TODO Auto-generated method stub } - protected void onSuccess(int status) + protected void onComplete() throws IOException { - // TODO Auto-generated method stub } + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + public static class CachedExchange extends HttpExchange + { + int _responseStatus; + HttpFields _responseFields = new HttpFields(); + + /* ------------------------------------------------------------ */ + public int getResponseStatus() + { + if (_status.get() < STATUS_PARSING_HEADERS) + throw new IllegalStateException("Response not received"); + return _responseStatus; + } + + /* ------------------------------------------------------------ */ + public HttpFields getResponseFields() + { + if (_status.get() < STATUS_PARSING_CONTENT) + throw new IllegalStateException("Headers not complete"); + return _responseFields; + } + + /* ------------------------------------------------------------ */ + protected void onResponseReceived(Buffer version, int status, Buffer reason) throws IOException + { + _responseStatus = status; + } + + /* ------------------------------------------------------------ */ + protected void onParsedHeader(Buffer name, Buffer value) throws IOException + { + _responseFields.add(name, value); + } + + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + public static class ContentExchange extends CachedExchange + { + ByteArrayOutputStream _responseContent = new ByteArrayOutputStream(); + + /* ------------------------------------------------------------ */ + public byte[] getResponseContent() + { + if (_status.get() < STATUS_COMPLETED) + throw new IllegalStateException("Content not complete"); + return _responseContent.toByteArray(); + } + + protected void onContent(Buffer content) throws IOException + { + content.writeTo(_responseContent); + } + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + public static class StreamedExchange extends CachedExchange + { + HttpParser _parser; + InputStream _responseStream; + + /* ------------------------------------------------------------ */ + public InputStream getResponseStream() + { + if (_status.get() < STATUS_PARSING_CONTENT) + throw new IllegalStateException("Content not started"); + return _responseStream; + } + + protected void onHeaderComplete() throws IOException + { + _responseStream = new HttpParser.Input(_parser, 30000); + } + } + } Index: src/main/java/org/mortbay/jetty/client/HttpConnectionPool.java =================================================================== --- src/main/java/org/mortbay/jetty/client/HttpConnectionPool.java (revision 81) +++ src/main/java/org/mortbay/jetty/client/HttpConnectionPool.java (working copy) @@ -2,14 +2,28 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.net.SocketAddress; +import java.net.Socket; import java.net.UnknownHostException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Map; +import org.mortbay.component.AbstractLifeCycle; +import org.mortbay.component.LifeCycle; import org.mortbay.io.Buffer; +import org.mortbay.io.ByteArrayBuffer; +import org.mortbay.io.Connection; +import org.mortbay.io.EndPoint; +import org.mortbay.io.bio.SocketEndPoint; +import org.mortbay.io.nio.ChannelEndPoint; import org.mortbay.io.nio.NIOBuffer; +import org.mortbay.io.nio.SelectChannelEndPoint; +import org.mortbay.io.nio.SelectorManager; import org.mortbay.jetty.AbstractBuffers; +import org.mortbay.jetty.HttpSchemes; +import org.mortbay.thread.BoundedThreadPool; +import org.mortbay.thread.ThreadPool; /** * Http connection pool for pooling connections and managing sets of mapped @@ -17,38 +31,48 @@ * * @author Greg Wilkins * @author Matthew Purland + * @author Guillaume Nodet */ public class HttpConnectionPool extends AbstractBuffers { + public static final int CONNECTOR_SOCKET = 0; + public static final int CONNECTOR_BLOCKING_CHANNEL = 1; + public static final int CONNECTOR_SELECT_CHANNEL = 2; + + private int _connectorType = CONNECTOR_SOCKET; private boolean _useDirectBuffers=true; + private int _maxConnectionsPerAddress = 32; private Map _destinations=new HashMap(); + ThreadPool _threadPool=new BoundedThreadPool(); + Connector _connector; /* ------------------------------------------------------------------------------- */ - public HttpConnection getConnection(InetSocketAddress remote, boolean ssl, boolean blockForIdle) throws UnknownHostException, IOException + public void send(HttpExchange exchange) throws IOException { + boolean ssl = HttpSchemes.HTTPS_BUFFER.equalsIgnoreCase(exchange.getScheme()); + HttpDestination destination = getDestination(exchange.getAddress(), ssl); + destination.send(exchange); + exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_CONNECTION); + } + + /* ------------------------------------------------------------------------------- */ + HttpDestination getDestination(InetSocketAddress remote, boolean ssl) throws UnknownHostException, IOException + { if (remote==null) throw new UnknownHostException("Remote socket address cannot be null."); synchronized (_destinations) { - // TODO separate map for ssl? HttpDestination destination=_destinations.get(remote); - - if (destination == null ) + if (destination == null) { - destination = new HttpDestination(this,remote); + destination = new HttpDestination(this, remote, ssl, _maxConnectionsPerAddress); _destinations.put(remote,destination); } - return destination.getConnection(ssl,blockForIdle); + return destination; } } - - protected HttpConnection newConnection(SocketAddress remote, boolean ssl, boolean blockForIdle) - { - return null; - // HttpConnection connection = new HttpConnection(); - } - + /** * Get whether the connector can use direct NIO buffers. */ @@ -69,6 +93,19 @@ { _useDirectBuffers=direct; } + + /** + * Get the type of connector (socket, blocking or select) in use. + */ + public int getConnectorType() + { + return _connectorType; + } + + public void setConnectorType(int connectorType) + { + this._connectorType = connectorType; + } /** * Create a new NIO buffer. If using direct buffers, it will create a direct @@ -77,12 +114,186 @@ @Override protected Buffer newBuffer(int size) { - Buffer buf=null; - if (size==getHeaderBufferSize()) - buf=new NIOBuffer(size,NIOBuffer.INDIRECT); + if (_connectorType != CONNECTOR_SOCKET) + { + Buffer buf=null; + if (size==getHeaderBufferSize()) + buf=new NIOBuffer(size,NIOBuffer.INDIRECT); + else + buf=new NIOBuffer(size,_useDirectBuffers?NIOBuffer.DIRECT:NIOBuffer.INDIRECT); + return buf; + } else - buf=new NIOBuffer(size,_useDirectBuffers?NIOBuffer.DIRECT:NIOBuffer.INDIRECT); - return buf; + { + return new ByteArrayBuffer(size); + } } + public int getMaxConnectionsPerAddress() { + return _maxConnectionsPerAddress; + } + + public void setMaxConnectionsPerAddress(int maxConnectionsPerAddress) { + _maxConnectionsPerAddress = maxConnectionsPerAddress; + } + + /* ------------------------------------------------------------ */ + protected void doStart() throws Exception + { + super.doStart(); + if (_threadPool instanceof LifeCycle) + { + ((LifeCycle) _threadPool).start(); + } + if (_connectorType == CONNECTOR_BLOCKING_CHANNEL) + { + _connector = new BlockingConnector(); + } + else if (_connectorType == CONNECTOR_SELECT_CHANNEL) + { + _connector = new SelectConnector(); + } + else + { + _connector = new SocketConnector(); + } + _connector.start(); + } + + /* ------------------------------------------------------------ */ + protected void doStop() throws Exception + { + _connector.stop(); + _connector = null; + if (_threadPool instanceof LifeCycle) + { + ((LifeCycle) _threadPool).stop(); + } + super.doStop(); + } + + /* ------------------------------------------------------------ */ + interface Connector extends LifeCycle { + + public void startConnection(HttpDestination destination) throws IOException; + + } + + /* ------------------------------------------------------------ */ + class SocketConnector extends AbstractLifeCycle implements Connector + { + public void startConnection(final HttpDestination destination) throws IOException + { + _threadPool.dispatch(new Runnable(){ + public void run() + { + try { + Socket socket = new Socket(); + socket.connect(destination.getAddress()); + EndPoint endpoint = new SocketEndPoint(socket); + HttpConnection connection = new HttpConnection(HttpConnectionPool.this, endpoint, getHeaderBufferSize(), getRequestBufferSize()); + connection.setDestination(destination); + destination.onNewConnection(connection); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + }); + } + } + + /* ------------------------------------------------------------ */ + class BlockingConnector extends AbstractLifeCycle implements Connector { + public void startConnection(final HttpDestination destination) throws IOException + { + _threadPool.dispatch(new Runnable(){ + public void run() + { + try { + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(true); + channel.connect(destination.getAddress()); + EndPoint endpoint = new ChannelEndPoint(channel); + HttpConnection connection = new HttpConnection(HttpConnectionPool.this, endpoint, getHeaderBufferSize(), getRequestBufferSize()); + connection.setDestination(destination); + destination.onNewConnection(connection); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + }); + } + } + + /* ------------------------------------------------------------ */ + class SelectConnector extends AbstractLifeCycle implements Connector, Runnable { + SelectorManager _selectorManager = new Manager(); + protected void doStart() throws Exception + { + _selectorManager.start(); + _threadPool.dispatch(this); + } + protected void doStop() throws Exception + { + _selectorManager.stop(); + } + public void startConnection(HttpDestination destination) throws IOException + { + SocketChannel channel = SocketChannel.open(); + channel.configureBlocking(false); + channel.connect(destination.getAddress()); + _selectorManager.register(channel, destination); + } + public void run() + { + while (isRunning()) + { + try + { + _selectorManager.doSelect(0); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + + class Manager extends SelectorManager + { + protected SocketChannel acceptChannel(SelectionKey key) throws IOException + { + throw new IllegalStateException(); + } + protected boolean dispatch(Runnable task) throws IOException + { + return _threadPool.dispatch(task); + } + protected void endPointOpened(SelectChannelEndPoint endpoint) + { + } + protected void endPointClosed(SelectChannelEndPoint endpoint) + { + } + protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) + { + return new HttpConnection(HttpConnectionPool.this, endpoint, getHeaderBufferSize(), getRequestBufferSize()); + } + protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException + { + HttpDestination dest = (HttpDestination) sKey.attachment(); + SelectChannelEndPoint ep = new SelectChannelEndPoint(channel, selectSet, sKey); + HttpConnection connection = (HttpConnection) ep.getConnection(); + connection.setDestination(dest); + dest.onNewConnection(connection); + return ep; + } + } + + } + }