Index: src/test/java/org/mortbay/jetty/client/HttpExchangeTest.java
===================================================================
--- src/test/java/org/mortbay/jetty/client/HttpExchangeTest.java (revision 81)
+++ src/test/java/org/mortbay/jetty/client/HttpExchangeTest.java (working copy)
@@ -1,13 +1,25 @@
package org.mortbay.jetty.client;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.CountDownLatch;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
import junit.framework.TestCase;
import org.mortbay.io.Buffer;
import org.mortbay.io.ByteArrayBuffer;
-import org.mortbay.jetty.MimeTypes;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.HttpConnection;
+import org.mortbay.jetty.HttpMethods;
+import org.mortbay.jetty.Request;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.handler.AbstractHandler;
+import org.mortbay.jetty.nio.SelectChannelConnector;
/**
* Functional testing for HttpExchange.
@@ -16,47 +28,152 @@
*/
public class HttpExchangeTest extends TestCase
{
- private HttpExchange httpExchange;
+ private Server _server;
+ private int _port;
+ private HttpConnectionPool _connectionPool;
+
+ protected void setUp() throws Exception
+ {
+ startServer();
+ _connectionPool=new HttpConnectionPool();
+ _connectionPool.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ stopServer();
+ }
+
+ public void testPerf() throws Exception {
+ testSend(100);
+
+ for (int i = 100; i <= 10000; i *= 10) {
+ testSend(i);
+ }
+ }
- public void setUp()
+ /**
+ * Test sending data through the exchange.
+ *
+ * @throws IOException
+ */
+ public void testSend(final int nb) throws Exception
{
- httpExchange=new HttpExchange()
- {
- protected void onProgress(Buffer content, boolean last)
+ final CountDownLatch latch = new CountDownLatch(nb);
+ long l0 = System.currentTimeMillis();
+ for (int i = 0; i < nb; i++) {
+ HttpExchange httpExchange=new HttpExchange()
{
- System.err.println("Response content="+content);
- super.onProgress(content,last);
+ protected void onRequestSent()
+ {
+ //System.err.println("Request sent (" + this._connection._endp + ")");
+ //InputStream is = getResponseStream();
+ //OutputStream baos = new ByteArrayOutputStream();
+ //copyStream(is, baos);
+ //System.err.println(baos.toString());
+
+ }
+ protected void onResponseReceived(Buffer version, int status, Buffer reason)
+ {
+ //System.err.println("Received response: " + version+" "+status+" "+reason);
+ }
+ protected void onParsedHeader(Buffer name, Buffer value)
+ {
+ //System.err.println("Received header: " + name + " = " + value);
+ }
+ protected void onContent(Buffer content)
+ {
+ //System.err.println("Received content:" + content);
+ }
+ protected void onComplete()
+ {
+ //System.err.println("Response completed");
+ latch.countDown();
+ }
+ };
+
+ httpExchange.setURL("http://localhost:" + _port + "/");
+ httpExchange.setMethod(HttpMethods.GET); // Optional. GET by default
+ //httpExchange.setAddress(new InetSocketAddress("localhost",_port)); // May use setURL instead
+ //httpExchange.setAddress(new InetSocketAddress("localhost",_port)); // May use setURL instead
+ //httpExchange.setURI("/ep1/?wsdl");
+ //httpExchange.setRequestContentType(MimeTypes.FORM_ENCODED);
+ //httpExchange.addRequestHeader("arbitrary","value");
+ //httpExchange.setRequestContent(new ByteArrayBuffer(""));
+ //httpExchange.setVersion(HttpVersions.HTTP_1_1_ORDINAL);
+ _connectionPool.send(httpExchange);
+ }
+ //System.err.println("Messages sent");
+ latch.await();
+ long l1 = System.currentTimeMillis();
+ System.err.println("Delay (jetty, "+nb+"): " + (l1 - l0) + " ms");
+ }
+
+ public void testPostWithStreamedResponse() throws Exception {
+ HttpExchange.StreamedExchange httpExchange = new HttpExchange.StreamedExchange();
+ httpExchange.setURL("http://localhost:" + _port + "/");
+ httpExchange.setMethod(HttpMethods.POST);
+ httpExchange.setRequestContent(new ByteArrayBuffer(""));
+ _connectionPool.send(httpExchange);
+ httpExchange.waitForStatus(HttpExchange.STATUS_PARSING_CONTENT);
+ InputStream is = httpExchange.getResponseStream();
+ copyStream(is, System.err);
+ }
+
+ public void testGetWithStreamedResponse() throws Exception {
+ HttpExchange.StreamedExchange httpExchange = new HttpExchange.StreamedExchange();
+ httpExchange.setURL("http://localhost:" + _port + "/");
+ httpExchange.setMethod(HttpMethods.GET);
+ _connectionPool.send(httpExchange);
+ httpExchange.waitForStatus(HttpExchange.STATUS_PARSING_CONTENT);
+ InputStream is = httpExchange.getResponseStream();
+ copyStream(is, System.err);
+ is.close();
+ }
+
+ public static void copyStream(InputStream in, OutputStream out) {
+ try {
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = in.read(buffer)) >= 0) {
+ out.write(buffer, 0, len);
}
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
- protected void onSuccess(int status)
+ private void startServer() throws Exception
+ {
+ _server = new Server();
+ Connector connector = new SelectChannelConnector();
+ connector.setPort(0);
+ _server.setConnectors(new Connector[] { connector });
+ _server.setHandler(new AbstractHandler() {
+ public void handle(String target, HttpServletRequest request, HttpServletResponse response, int dispatch) throws IOException, ServletException
{
- System.err.println("Response status="+status);
- super.onSuccess(status);
+ Request base_request=(request instanceof Request)?(Request)request:HttpConnection.getCurrentConnection().getRequest();
+ base_request.setHandled(true);
+ response.setStatus(200);
+ if (request.getMethod().equalsIgnoreCase("GET"))
+ {
+ response.getOutputStream().println("");
+ for (int i = 0; i < 10000; i++)
+ response.getOutputStream().println(" ");
+ response.getOutputStream().println("");
+ }
+ else
+ {
+ copyStream(request.getInputStream(), response.getOutputStream());
+ }
}
- };
-
- httpExchange.setURL("http://localhost:8080/test/dump/info");
-
- HttpConnectionPool connectionPool=new HttpConnectionPool();
-
- httpExchange.setConnectionPool(connectionPool);
-
- httpExchange.setMethod("GET"); // Optional. GET by default
- httpExchange.setAddress(new InetSocketAddress("localhost",8080)); // May use setURL instead
- httpExchange.setURI("/test/dump/info");
- httpExchange.setRequestContentType(MimeTypes.FORM_ENCODED);
- httpExchange.addRequestHeader("arbitrary","value");
- httpExchange.setRequestContent(new ByteArrayBuffer("param=value\n"));
+ });
+ _server.start();
+ _port = connector.getLocalPort();
}
- /**
- * Test sending data through the exchange.
- *
- * @throws IOException
- */
- public void testSend() throws IOException
+ private void stopServer() throws Exception
{
- httpExchange.send();
- httpExchange.send(true); // pipelined on last used connection
+ _server.stop();
}
}
Index: src/main/java/org/mortbay/jetty/client/HttpConnection.java
===================================================================
--- src/main/java/org/mortbay/jetty/client/HttpConnection.java (revision 81)
+++ src/main/java/org/mortbay/jetty/client/HttpConnection.java (working copy)
@@ -1,28 +1,39 @@
package org.mortbay.jetty.client;
import java.io.IOException;
-import java.util.LinkedList;
import org.mortbay.io.Buffer;
import org.mortbay.io.Buffers;
+import org.mortbay.io.Connection;
import org.mortbay.io.EndPoint;
-import org.mortbay.jetty.Generator;
+import org.mortbay.jetty.AbstractGenerator;
import org.mortbay.jetty.HttpGenerator;
+import org.mortbay.jetty.HttpHeaderValues;
+import org.mortbay.jetty.HttpHeaders;
import org.mortbay.jetty.HttpParser;
+import org.mortbay.jetty.HttpTokens;
+import org.mortbay.jetty.HttpVersions;
-public class HttpConnection
+
+/**
+ *
+ * @author Greg Wilkins
+ * @author Guillaume Nodet
+ */
+class HttpConnection implements Connection
{
- private HttpDestination _destination;
- private EndPoint _endp;
- private HttpExchange _exchange;
- private LinkedList _exchanges=new LinkedList();
- private Generator _generator;
- private HttpParser _parser;
+ HttpDestination _destination;
+ EndPoint _endp;
+ HttpExchange _exchange;
+ HttpGenerator _generator;
+ HttpParser _parser;
+ boolean _http11;
+ Buffer _connectionHeader;
+
/* ------------------------------------------------------------ */
- HttpConnection(HttpDestination destination, Buffers buffers,EndPoint endp,int hbs,int cbs)
+ HttpConnection(Buffers buffers,EndPoint endp,int hbs,int cbs)
{
- _destination=destination;
_endp=endp;
_generator=new HttpGenerator(buffers,endp,hbs,cbs);
_parser=new HttpParser(buffers,endp,new Handler(),hbs,cbs);
@@ -33,106 +44,132 @@
return _destination;
}
+ public void setDestination(HttpDestination destination)
+ {
+ _destination = destination;
+ }
+
/* ------------------------------------------------------------ */
public void handle() throws IOException
{
- if (_exchange==null)
- nextExchange();
-
int no_progress=0;
- while (_endp.isOpen())
+ while (_endp.isOpen() && _exchange != null)
{
+ if (_exchange instanceof HttpExchange.StreamedExchange &&
+ _parser.inContentState())
+ {
+ break;
+ }
try
{
long io=0;
- // Do we have more writting to do?
- if (_generator.isCommitted() && !_generator.isComplete())
- io+=_generator.flush();
-
- if (_endp.isBufferingOutput())
- {
- _endp.flush();
- if (_endp.isBufferingOutput())
- no_progress=0;
- }
-
// If we are not ended then parse available
if (!_parser.isComplete())
- io=_parser.parseAvailable();
+ io=_parser.parseNext();
if (io>0)
no_progress=0;
else if (no_progress++>=2)
- return;
+ {
+ if (!_endp.isBlocking())
+ {
+ return;
+ }
+ }
}
- catch (IOException e)
- {
- // TODO Handle this!
- return;
- }
finally
{
- if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput())
+ if (_parser.isComplete())
{
- _exchange=null;
- nextExchange();
- if (_exchange==null)
- reset(true);
+ reset(true);
}
}
}
}
/* ------------------------------------------------------------ */
- public void send(HttpExchange ex)
- throws IOException
+ public boolean isIdle()
{
- _exchanges.add(ex);
-
- // TODO schedule stuff
-
+ return _exchange == null;
}
-
/* ------------------------------------------------------------ */
- private void nextExchange() throws IOException
+ public void send(HttpExchange ex) throws IOException
{
- if (_exchange!=null || _exchanges.size()==0)
- return;
-
- _exchange=_exchanges.remove(0);
- System.err.println("EX:"+_exchange.getURI());
-
- _generator.setVersion(11); // TODO
- _generator.setRequest(_exchange.getMethod(),_exchange.getURI());
-
- // TODO this is not right
- if (_exchange._requestContent!=null)
+ ex.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
+ ex._requestFields.put(HttpHeaders.HOST_BUFFER, _destination._hostHeaderBuffer);
+ _generator.setVersion(ex._version);
+ _generator.setRequest(ex._method, ex._uri);
+ if (ex._requestEntity != null)
{
- _generator.addContent(_exchange._requestContent,Generator.LAST);
- _generator.completeHeader(_exchange.getRequestFields(),Generator.LAST);
- _generator.complete();
+ ex._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO: should not be needed
+ _generator.setContentLength(HttpTokens.CHUNKED_CONTENT);
+ _generator.completeHeader(ex._requestFields, false);
+ AbstractGenerator.Output os = new AbstractGenerator.Output(_generator, 30000);
+ AbstractGenerator.OutputWriter writer = new AbstractGenerator.OutputWriter(os);
+ writer.setCharacterEncoding(null);
+ ex._requestEntity.write(os, writer);
+ writer.close();
+ os.close();
}
-
+ else if (ex._requestContent != null)
+ {
+ ex._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH, ex._requestContent.length());
+ _generator.completeHeader(ex._requestFields, false);
+ _generator.addContent(ex._requestContent, true);
+ }
+ else
+ {
+ ex._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO: should not be needed
+ _generator.completeHeader(ex._requestFields, true);
+ }
+ _generator.flush();
+ ex.onRequestSent();
+ if (ex instanceof HttpExchange.StreamedExchange)
+ {
+ ((HttpExchange.StreamedExchange) ex)._parser = _parser;
+ }
+ ex.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
+ _exchange = ex;
+ if (_endp.isBlocking())
+ {
+ handle();
+ }
}
+
/* ------------------------------------------------------------ */
- protected void reset(boolean returnBuffers)
+ protected void reset(boolean returnBuffers) throws IOException
{
+ _connectionHeader = null;
_parser.reset(returnBuffers);
_generator.reset(returnBuffers);
+ boolean close = shouldClose();
+ _exchange = null;
+ _destination.returnConnection(this, close);
}
-
+
/* ------------------------------------------------------------ */
- private class Handler extends HttpParser.EventHandler
+ private boolean shouldClose()
{
- @Override
- public void content(Buffer ref) throws IOException
+ if (_connectionHeader == HttpHeaderValues.CLOSE_BUFFER)
{
- System.err.println(ref);
+ return true;
}
+ else if (_connectionHeader == HttpHeaderValues.KEEP_ALIVE_BUFFER)
+ {
+ return false;
+ }
+ else
+ {
+ return !_http11;
+ }
+ }
+ /* ------------------------------------------------------------ */
+ private class Handler extends HttpParser.EventHandler
+ {
@Override
public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
{
@@ -142,9 +179,41 @@
@Override
public void startResponse(Buffer version, int status, Buffer reason) throws IOException
{
- System.err.println(version+" "+status+" "+reason);
+ _http11 = (version == HttpVersions.HTTP_1_1_BUFFER);
+ _exchange.onResponseReceived(version, status, reason);
+ _exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
}
+ @Override
+ public void parsedHeader(Buffer name, Buffer value) throws IOException
+ {
+ if (name == HttpHeaders.CONNECTION_BUFFER)
+ {
+ _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
+ }
+ //_exchange._responseFields.add(name, value);
+ _exchange.onParsedHeader(name, value);
+ }
+
+ @Override
+ public void headerComplete() throws IOException
+ {
+ _exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
+ _exchange.onHeaderComplete();
+ }
+
+ @Override
+ public void content(Buffer ref) throws IOException
+ {
+ _exchange.onContent(ref);
+ }
+
+ @Override
+ public void messageComplete(long contextLength) throws IOException
+ {
+ _exchange.onComplete();
+ _exchange.setStatus(HttpExchange.STATUS_COMPLETED);
+ }
}
Index: src/main/java/org/mortbay/jetty/client/HttpDestination.java
===================================================================
--- src/main/java/org/mortbay/jetty/client/HttpDestination.java (revision 81)
+++ src/main/java/org/mortbay/jetty/client/HttpDestination.java (working copy)
@@ -3,24 +3,43 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.mortbay.io.nio.ChannelEndPoint;
+import org.mortbay.io.Buffer;
+import org.mortbay.io.ByteArrayBuffer;
-public class HttpDestination
+/**
+*
+* @author Greg Wilkins
+* @author Guillaume Nodet
+*/
+class HttpDestination implements Runnable
{
private InetSocketAddress _address;
- private ArrayList _connections=new ArrayList();
+ private LinkedList _connections=new LinkedList();
+ private LinkedList _idle=new LinkedList();
private LinkedList _exchanges=new LinkedList();
private HttpConnectionPool _pool;
+ private boolean _ssl;
+ private int _maxConnections;
+ private AtomicBoolean _dispatched = new AtomicBoolean(false);
+ private AtomicInteger _pendingConnections = new AtomicInteger(0);
+ Buffer _hostHeaderBuffer;
/* ------------------------------------------------------------ */
- HttpDestination(HttpConnectionPool pool, InetSocketAddress address)
+ HttpDestination(HttpConnectionPool pool, InetSocketAddress address, boolean ssl, int maxConnections)
{
_pool=pool;
_address=address;
+ _ssl=ssl;
+ _maxConnections=maxConnections;
+ String host = address.getHostName();
+ if (address.getPort() != (_ssl ? 443 : 80)) {
+ host += ":" + address.getPort();
+ }
+ _hostHeaderBuffer = new ByteArrayBuffer(host);
}
/* ------------------------------------------------------------ */
@@ -30,45 +49,149 @@
}
/* ------------------------------------------------------------------------------- */
- public HttpConnection getConnection(boolean ssl, boolean blockForIdle) throws UnknownHostException, IOException
+ public HttpConnection getConnection() throws IOException
{
- HttpConnection connection=null; // TODO search for existing idle connection
+ synchronized (_connections)
+ {
+ // Find an idle connection
+ if (_idle.size() > 0)
+ {
+ HttpConnection connection = _idle.removeFirst();
+ return connection;
+ }
+ else {
+ startNewConnectionIfNeeded();
+ return null;
+ }
+ }
+ }
- // If a connection doesn't exist, create a new one.
- if (connection==null)
+ /* ------------------------------------------------------------------------------- */
+ protected void startNewConnectionIfNeeded() throws UnknownHostException, IOException
+ {
+ if (_connections.size() + _pendingConnections.get() < _maxConnections)
{
- return newConnection(ssl);
+ startNewConnection();
}
- else
+ }
+
+ protected void startNewConnection() throws UnknownHostException, IOException
+ {
+ _pendingConnections.incrementAndGet();
+ _pool._connector.startConnection(this);
+ //System.err.println("Starting new connection to: " + _address);
+ }
+
+ /* ------------------------------------------------------------------------------- */
+ public void onNewConnection(HttpConnection connection) throws IOException
+ {
+ _pendingConnections.decrementAndGet();
+ synchronized (_connections)
{
- return connection;
+ _connections.add(connection);
+ _idle.add(connection);
}
+ dispatch();
}
/* ------------------------------------------------------------------------------- */
- public HttpConnection newConnection(boolean ssl) throws UnknownHostException, IOException
+ public void returnConnection(HttpConnection connection, boolean close) throws IOException
{
- SocketChannel channel=SocketChannel.open();
- channel.configureBlocking(false);
- channel.connect(_address);
-
- // TODO return REAL http connection
- return new HttpConnection(this,_pool,new ChannelEndPoint(channel),_pool.getHeaderBufferSize(),_pool.getRequestBufferSize());
-
+ if (close)
+ {
+ synchronized (_connections)
+ {
+ _connections.remove(connection);
+ }
+ connection._endp.close();
+ if (_exchanges.size() > 0)
+ {
+ startNewConnectionIfNeeded();
+ }
+ }
+ else
+ {
+ synchronized (_connections)
+ {
+ _idle.add(connection);
+ }
+ dispatch();
+ }
}
-
+
/* ------------------------------------------------------------ */
public void send(HttpExchange ex) throws IOException
{
- _exchanges.add(ex);
-
- // TODO schedule stuff
-
+ synchronized (_exchanges)
+ {
+ _exchanges.add(ex);
+ }
+ dispatch();
+ startNewConnectionIfNeeded();
}
-
- /* ------------------------------------------------------------ */
- public void setAddress(InetSocketAddress address)
+
+ protected void dispatch()
{
- _address=address;
+ if (_exchanges.size() > 0 && _dispatched.compareAndSet(false, true)) {
+ _pool._threadPool.dispatch(this);
+ }
}
+
+ public void run()
+ {
+ try {
+ for (;;)
+ {
+ try
+ {
+ final HttpConnection connection = getConnection();
+ if (connection != null)
+ {
+ final HttpExchange ex;
+ synchronized (_exchanges)
+ {
+ if (_exchanges.size() > 0)
+ {
+ ex = _exchanges.removeFirst();
+ }
+ else
+ {
+ returnConnection(connection, false);
+ return;
+ }
+ }
+ _pool._threadPool.dispatch(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connection.send(ex);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ // TODO: report expcetion on exchange
+ }
+ }
+ });
+ }
+ else
+ {
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ // TODO: report expcetion on exchange
+ }
+ }
+ }
+ finally
+ {
+ _dispatched.set(false);
+ }
+ }
+
}
\ No newline at end of file
Index: src/main/java/org/mortbay/jetty/client/HttpExchange.java
===================================================================
--- src/main/java/org/mortbay/jetty/client/HttpExchange.java (revision 81)
+++ src/main/java/org/mortbay/jetty/client/HttpExchange.java (working copy)
@@ -1,33 +1,52 @@
package org.mortbay.jetty.client;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Writer;
import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.mortbay.io.Buffer;
import org.mortbay.io.ByteArrayBuffer;
import org.mortbay.jetty.HttpFields;
import org.mortbay.jetty.HttpHeaders;
import org.mortbay.jetty.HttpMethods;
+import org.mortbay.jetty.HttpParser;
import org.mortbay.jetty.HttpSchemes;
import org.mortbay.jetty.HttpURI;
-import org.mortbay.jetty.MimeTypes;
+import org.mortbay.jetty.HttpVersions;
/**
* VERY rough start to a client API - inpired by javascript XmlHttpRequest.
*
* @author gregw
+ * @author Guillaume Nodet
*/
public class HttpExchange
{
+ public static final int STATUS_UNKOWN = 0;
+ public static final int STATUS_WAITING_FOR_CONNECTION = 1;
+ public static final int STATUS_SENDING_REQUEST = 2;
+ public static final int STATUS_WAITING_FOR_RESPONSE = 3;
+ public static final int STATUS_PARSING_HEADERS = 4;
+ public static final int STATUS_PARSING_CONTENT = 5;
+ public static final int STATUS_COMPLETED = 6;
+
+ public interface Entity {
+ public void write(OutputStream os, Writer w) throws IOException;
+ }
+
InetSocketAddress _address;
- HttpConnectionPool _connectionPool=new HttpConnectionPool();
- String _method;
- Buffer _requestContent;
-
- HttpFields _requestFields=new HttpFields();
- Buffer _responseContent;
- HttpFields _responseFields=new HttpFields();
+ String _method = HttpMethods.GET;
Buffer _scheme;
+ int _version = HttpVersions.HTTP_1_1_ORDINAL;
String _uri;
+ AtomicInteger _status = new AtomicInteger(STATUS_UNKOWN);
+ HttpFields _requestFields = new HttpFields();
+ Buffer _requestContent;
+ Entity _requestEntity;
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
@@ -36,25 +55,34 @@
/* ------------------------------------------------------------ */
- /**
- * @param connectionPool
- */
- public void setConnectionPool(HttpConnectionPool connectionPool)
+ public int getStatus()
{
- _connectionPool=connectionPool;
+ return _status.get();
}
/* ------------------------------------------------------------ */
- /**
- * @return
- */
- public HttpConnectionPool getConnectionPool()
+ public void waitForStatus(int status) throws InterruptedException
{
- return _connectionPool;
+ synchronized (_status)
+ {
+ while (_status.get() < status)
+ {
+ _status.wait();
+ }
+ }
}
-
/* ------------------------------------------------------------ */
+ void setStatus(int status)
+ {
+ synchronized (_status)
+ {
+ _status.set(status);
+ _status.notifyAll();
+ }
+ }
+
+ /* ------------------------------------------------------------ */
/**
* @param url
*/
@@ -121,6 +149,24 @@
/* ------------------------------------------------------------ */
/**
+ * @param scheme
+ */
+ public void setVersion(int version)
+ {
+ _version=version;
+ }
+
+ /* ------------------------------------------------------------ */
+ /**
+ * @return
+ */
+ public int getVersion()
+ {
+ return _version;
+ }
+
+ /* ------------------------------------------------------------ */
+ /**
* @param method
*/
public void setMethod(String method)
@@ -225,63 +271,149 @@
/* ------------------------------------------------------------ */
/**
- * @throws IOException
+ * @param requestContent
*/
- public void send() throws IOException
+ public void setRequestContent(Buffer requestContent)
{
- // TODO add to destination queue
- HttpConnectionPool pool = getConnectionPool();
- HttpConnection connection = pool.getConnection(getAddress(),HttpSchemes.HTTPS_BUFFER.equalsIgnoreCase(getScheme()),true);
- connection.send(this);
+ _requestContent=requestContent;
}
/* ------------------------------------------------------------ */
- /**
- * @param pipeline
- * @throws IOException
- */
- public void send(boolean pipeline) throws IOException
+ public Buffer getRequestContent()
{
- // TODO add to destination queue
- getConnectionPool().getConnection(getAddress(),HttpSchemes.HTTPS_BUFFER.equalsIgnoreCase(getScheme()),!pipeline).send(this);
+ return _requestContent;
}
-
/* ------------------------------------------------------------ */
/**
- * @param requestContent
+ * @param entity
*/
- public void setRequestContent(Buffer requestContent)
+ public void setRequestEntity(Entity requestEntity)
{
- _requestContent=requestContent;
+ _requestEntity=requestEntity;
}
+
+ /* ------------------------------------------------------------ */
+ public Entity getRequestEntity()
+ {
+ return _requestEntity;
+ }
/* ------------------------------------------------------------ */
- /**
- * @param requestContent
- */
- public void addRequestContent(Buffer requestContent)
+ public InputStream getResponseStream()
{
- _requestContent=requestContent;
- // TODO implement!
+ // TODO
+ return null;
}
-
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
// methods to handle response
+ protected void onRequestSent() throws IOException
+ {
+ }
+
+ protected void onResponseReceived(Buffer version, int status, Buffer reason) throws IOException
+ {
+ }
+ protected void onParsedHeader(Buffer name, Buffer value) throws IOException
+ {
+ }
+
+ protected void onHeaderComplete() throws IOException
+ {
+ }
- protected void onProgress(Buffer content, boolean last)
+ protected void onContent(Buffer content) throws IOException
{
- // TODO Auto-generated method stub
}
- protected void onSuccess(int status)
+ protected void onComplete() throws IOException
{
- // TODO Auto-generated method stub
}
+ /* ------------------------------------------------------------ */
+ /* ------------------------------------------------------------ */
+ /* ------------------------------------------------------------ */
+ public static class CachedExchange extends HttpExchange
+ {
+ int _responseStatus;
+ HttpFields _responseFields = new HttpFields();
+
+ /* ------------------------------------------------------------ */
+ public int getResponseStatus()
+ {
+ if (_status.get() < STATUS_PARSING_HEADERS)
+ throw new IllegalStateException("Response not received");
+ return _responseStatus;
+ }
+
+ /* ------------------------------------------------------------ */
+ public HttpFields getResponseFields()
+ {
+ if (_status.get() < STATUS_PARSING_CONTENT)
+ throw new IllegalStateException("Headers not complete");
+ return _responseFields;
+ }
+
+ /* ------------------------------------------------------------ */
+ protected void onResponseReceived(Buffer version, int status, Buffer reason) throws IOException
+ {
+ _responseStatus = status;
+ }
+
+ /* ------------------------------------------------------------ */
+ protected void onParsedHeader(Buffer name, Buffer value) throws IOException
+ {
+ _responseFields.add(name, value);
+ }
+
+ }
+
+ /* ------------------------------------------------------------ */
+ /* ------------------------------------------------------------ */
+ /* ------------------------------------------------------------ */
+ public static class ContentExchange extends CachedExchange
+ {
+ ByteArrayOutputStream _responseContent = new ByteArrayOutputStream();
+
+ /* ------------------------------------------------------------ */
+ public byte[] getResponseContent()
+ {
+ if (_status.get() < STATUS_COMPLETED)
+ throw new IllegalStateException("Content not complete");
+ return _responseContent.toByteArray();
+ }
+
+ protected void onContent(Buffer content) throws IOException
+ {
+ content.writeTo(_responseContent);
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ /* ------------------------------------------------------------ */
+ /* ------------------------------------------------------------ */
+ public static class StreamedExchange extends CachedExchange
+ {
+ HttpParser _parser;
+ InputStream _responseStream;
+
+ /* ------------------------------------------------------------ */
+ public InputStream getResponseStream()
+ {
+ if (_status.get() < STATUS_PARSING_CONTENT)
+ throw new IllegalStateException("Content not started");
+ return _responseStream;
+ }
+
+ protected void onHeaderComplete() throws IOException
+ {
+ _responseStream = new HttpParser.Input(_parser, 30000);
+ }
+ }
+
}
Index: src/main/java/org/mortbay/jetty/client/HttpConnectionPool.java
===================================================================
--- src/main/java/org/mortbay/jetty/client/HttpConnectionPool.java (revision 81)
+++ src/main/java/org/mortbay/jetty/client/HttpConnectionPool.java (working copy)
@@ -2,14 +2,28 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
+import java.net.Socket;
import java.net.UnknownHostException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
+import org.mortbay.component.AbstractLifeCycle;
+import org.mortbay.component.LifeCycle;
import org.mortbay.io.Buffer;
+import org.mortbay.io.ByteArrayBuffer;
+import org.mortbay.io.Connection;
+import org.mortbay.io.EndPoint;
+import org.mortbay.io.bio.SocketEndPoint;
+import org.mortbay.io.nio.ChannelEndPoint;
import org.mortbay.io.nio.NIOBuffer;
+import org.mortbay.io.nio.SelectChannelEndPoint;
+import org.mortbay.io.nio.SelectorManager;
import org.mortbay.jetty.AbstractBuffers;
+import org.mortbay.jetty.HttpSchemes;
+import org.mortbay.thread.BoundedThreadPool;
+import org.mortbay.thread.ThreadPool;
/**
* Http connection pool for pooling connections and managing sets of mapped
@@ -17,38 +31,48 @@
*
* @author Greg Wilkins
* @author Matthew Purland
+ * @author Guillaume Nodet
*/
public class HttpConnectionPool extends AbstractBuffers
{
+ public static final int CONNECTOR_SOCKET = 0;
+ public static final int CONNECTOR_BLOCKING_CHANNEL = 1;
+ public static final int CONNECTOR_SELECT_CHANNEL = 2;
+
+ private int _connectorType = CONNECTOR_SOCKET;
private boolean _useDirectBuffers=true;
+ private int _maxConnectionsPerAddress = 32;
private Map _destinations=new HashMap();
+ ThreadPool _threadPool=new BoundedThreadPool();
+ Connector _connector;
/* ------------------------------------------------------------------------------- */
- public HttpConnection getConnection(InetSocketAddress remote, boolean ssl, boolean blockForIdle) throws UnknownHostException, IOException
+ public void send(HttpExchange exchange) throws IOException
{
+ boolean ssl = HttpSchemes.HTTPS_BUFFER.equalsIgnoreCase(exchange.getScheme());
+ HttpDestination destination = getDestination(exchange.getAddress(), ssl);
+ destination.send(exchange);
+ exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_CONNECTION);
+ }
+
+ /* ------------------------------------------------------------------------------- */
+ HttpDestination getDestination(InetSocketAddress remote, boolean ssl) throws UnknownHostException, IOException
+ {
if (remote==null)
throw new UnknownHostException("Remote socket address cannot be null.");
synchronized (_destinations)
{
- // TODO separate map for ssl?
HttpDestination destination=_destinations.get(remote);
-
- if (destination == null )
+ if (destination == null)
{
- destination = new HttpDestination(this,remote);
+ destination = new HttpDestination(this, remote, ssl, _maxConnectionsPerAddress);
_destinations.put(remote,destination);
}
- return destination.getConnection(ssl,blockForIdle);
+ return destination;
}
}
-
- protected HttpConnection newConnection(SocketAddress remote, boolean ssl, boolean blockForIdle)
- {
- return null;
- // HttpConnection connection = new HttpConnection();
- }
-
+
/**
* Get whether the connector can use direct NIO buffers.
*/
@@ -69,6 +93,19 @@
{
_useDirectBuffers=direct;
}
+
+ /**
+ * Get the type of connector (socket, blocking or select) in use.
+ */
+ public int getConnectorType()
+ {
+ return _connectorType;
+ }
+
+ public void setConnectorType(int connectorType)
+ {
+ this._connectorType = connectorType;
+ }
/**
* Create a new NIO buffer. If using direct buffers, it will create a direct
@@ -77,12 +114,186 @@
@Override
protected Buffer newBuffer(int size)
{
- Buffer buf=null;
- if (size==getHeaderBufferSize())
- buf=new NIOBuffer(size,NIOBuffer.INDIRECT);
+ if (_connectorType != CONNECTOR_SOCKET)
+ {
+ Buffer buf=null;
+ if (size==getHeaderBufferSize())
+ buf=new NIOBuffer(size,NIOBuffer.INDIRECT);
+ else
+ buf=new NIOBuffer(size,_useDirectBuffers?NIOBuffer.DIRECT:NIOBuffer.INDIRECT);
+ return buf;
+ }
else
- buf=new NIOBuffer(size,_useDirectBuffers?NIOBuffer.DIRECT:NIOBuffer.INDIRECT);
- return buf;
+ {
+ return new ByteArrayBuffer(size);
+ }
}
+ public int getMaxConnectionsPerAddress() {
+ return _maxConnectionsPerAddress;
+ }
+
+ public void setMaxConnectionsPerAddress(int maxConnectionsPerAddress) {
+ _maxConnectionsPerAddress = maxConnectionsPerAddress;
+ }
+
+ /* ------------------------------------------------------------ */
+ protected void doStart() throws Exception
+ {
+ super.doStart();
+ if (_threadPool instanceof LifeCycle)
+ {
+ ((LifeCycle) _threadPool).start();
+ }
+ if (_connectorType == CONNECTOR_BLOCKING_CHANNEL)
+ {
+ _connector = new BlockingConnector();
+ }
+ else if (_connectorType == CONNECTOR_SELECT_CHANNEL)
+ {
+ _connector = new SelectConnector();
+ }
+ else
+ {
+ _connector = new SocketConnector();
+ }
+ _connector.start();
+ }
+
+ /* ------------------------------------------------------------ */
+ protected void doStop() throws Exception
+ {
+ _connector.stop();
+ _connector = null;
+ if (_threadPool instanceof LifeCycle)
+ {
+ ((LifeCycle) _threadPool).stop();
+ }
+ super.doStop();
+ }
+
+ /* ------------------------------------------------------------ */
+ interface Connector extends LifeCycle {
+
+ public void startConnection(HttpDestination destination) throws IOException;
+
+ }
+
+ /* ------------------------------------------------------------ */
+ class SocketConnector extends AbstractLifeCycle implements Connector
+ {
+ public void startConnection(final HttpDestination destination) throws IOException
+ {
+ _threadPool.dispatch(new Runnable(){
+ public void run()
+ {
+ try {
+ Socket socket = new Socket();
+ socket.connect(destination.getAddress());
+ EndPoint endpoint = new SocketEndPoint(socket);
+ HttpConnection connection = new HttpConnection(HttpConnectionPool.this, endpoint, getHeaderBufferSize(), getRequestBufferSize());
+ connection.setDestination(destination);
+ destination.onNewConnection(connection);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ class BlockingConnector extends AbstractLifeCycle implements Connector {
+ public void startConnection(final HttpDestination destination) throws IOException
+ {
+ _threadPool.dispatch(new Runnable(){
+ public void run()
+ {
+ try {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(true);
+ channel.connect(destination.getAddress());
+ EndPoint endpoint = new ChannelEndPoint(channel);
+ HttpConnection connection = new HttpConnection(HttpConnectionPool.this, endpoint, getHeaderBufferSize(), getRequestBufferSize());
+ connection.setDestination(destination);
+ destination.onNewConnection(connection);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ class SelectConnector extends AbstractLifeCycle implements Connector, Runnable {
+ SelectorManager _selectorManager = new Manager();
+ protected void doStart() throws Exception
+ {
+ _selectorManager.start();
+ _threadPool.dispatch(this);
+ }
+ protected void doStop() throws Exception
+ {
+ _selectorManager.stop();
+ }
+ public void startConnection(HttpDestination destination) throws IOException
+ {
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+ channel.connect(destination.getAddress());
+ _selectorManager.register(channel, destination);
+ }
+ public void run()
+ {
+ while (isRunning())
+ {
+ try
+ {
+ _selectorManager.doSelect(0);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ class Manager extends SelectorManager
+ {
+ protected SocketChannel acceptChannel(SelectionKey key) throws IOException
+ {
+ throw new IllegalStateException();
+ }
+ protected boolean dispatch(Runnable task) throws IOException
+ {
+ return _threadPool.dispatch(task);
+ }
+ protected void endPointOpened(SelectChannelEndPoint endpoint)
+ {
+ }
+ protected void endPointClosed(SelectChannelEndPoint endpoint)
+ {
+ }
+ protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
+ {
+ return new HttpConnection(HttpConnectionPool.this, endpoint, getHeaderBufferSize(), getRequestBufferSize());
+ }
+ protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
+ {
+ HttpDestination dest = (HttpDestination) sKey.attachment();
+ SelectChannelEndPoint ep = new SelectChannelEndPoint(channel, selectSet, sKey);
+ HttpConnection connection = (HttpConnection) ep.getConnection();
+ connection.setDestination(dest);
+ dest.onNewConnection(connection);
+ return ep;
+ }
+ }
+
+ }
+
}