diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java index 9a7820cb50..a36969831c 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java @@ -44,7 +44,10 @@ import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.concurrent.CallbackContribution; import org.apache.hc.core5.concurrent.Cancellable; +import org.apache.hc.core5.concurrent.ComplexCancellable; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.HandlerFactory; @@ -327,6 +330,54 @@ private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler handle return new ReleasingAsyncClientExchangeHandler(handler, this::releaseSlot); } + private Cancellable doExecute( + final String id, + final AsyncConnectionEndpoint endpoint, + final AsyncClientExchangeHandler exchangeHandler, + final HttpClientContext context) { + final RequestConfig requestConfig = context.getRequestConfigOrDefault(); + final Timeout responseTimeout = requestConfig.getResponseTimeout(); + final EndpointInfo endpointInfo = endpoint.getInfo(); + final ProtocolVersion version = endpointInfo != null ? endpointInfo.getProtocol() : null; + final boolean supportsStreams = version != null && version.greaterEquals(HttpVersion.HTTP_2); + + if (log.isDebugEnabled()) { + log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); + } + + if (supportsStreams) { + final ComplexCancellable complexCancellable = new ComplexCancellable(); + endpoint.execute( + id, + exchangeHandler, + pushHandlerFactory, + context, + streamControl -> { + streamControl.setTimeout(responseTimeout); + complexCancellable.setDependency(streamControl); + }); + return complexCancellable; + } else { + if (responseTimeout != null) { + endpoint.setSocketTimeout(responseTimeout); + } + endpoint.execute( + id, + exchangeHandler, + pushHandlerFactory, + context, + null); + if (requestConfig.isHardCancellationEnabled()) { + return () -> { + exchangeHandler.cancel(); + return true; + }; + } else { + return Operations.nonCancellable(); + } + } + } + @Override public Cancellable execute( final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) { @@ -336,33 +387,16 @@ public Cancellable execute( "Execution pipeline queue limit reached (max=" + maxQueued + ")")); return Operations.nonCancellable(); } - final AsyncClientExchangeHandler actual = guard(exchangeHandler); + final AsyncClientExchangeHandler handler = guard(exchangeHandler); if (endpoint.isConnected()) { - if (log.isDebugEnabled()) { - log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); - } - final RequestConfig requestConfig = context.getRequestConfigOrDefault(); - final Timeout responseTimeout = requestConfig.getResponseTimeout(); - if (responseTimeout != null) { - endpoint.setSocketTimeout(responseTimeout); - } - endpoint.execute(id, actual, pushHandlerFactory, context); - if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) { - return () -> { - actual.cancel(); - return true; - }; - } + doExecute(id, endpoint, handler, context); } else { connectEndpoint(context, new FutureCallback() { @Override public void completed(final AsyncExecRuntime runtime) { - if (log.isDebugEnabled()) { - log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id); - } try { - endpoint.execute(id, actual, pushHandlerFactory, context); + doExecute(id, endpoint, handler, context); } catch (final RuntimeException ex) { failed(ex); } @@ -370,12 +404,12 @@ public void completed(final AsyncExecRuntime runtime) { @Override public void failed(final Exception ex) { - actual.failed(ex); + handler.failed(ex); } @Override public void cancelled() { - actual.failed(new InterruptedIOException()); + handler.failed(new InterruptedIOException()); } }); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java index 0ae21c6666..70dc897789 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hc.client5.http.EndpointInfo; import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.client5.http.SchemePortResolver; import org.apache.hc.client5.http.config.Configurable; @@ -61,6 +62,8 @@ import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.HttpVersion; +import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.nio.AsyncClientEndpoint; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; @@ -448,6 +451,40 @@ public boolean isConnected() { return !isReleased() && connectionEndpoint.isConnected(); } + private void doExecute( + final String id, + final AsyncConnectionEndpoint endpoint, + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpClientContext context) { + final RequestConfig requestConfig = context.getRequestConfigOrDefault(); + final Timeout responseTimeout = requestConfig.getResponseTimeout(); + final EndpointInfo endpointInfo = endpoint.getInfo(); + final ProtocolVersion version = endpointInfo != null ? endpointInfo.getProtocol() : null; + final boolean supportsStreams = version != null && version.greaterEquals(HttpVersion.HTTP_2); + + if (supportsStreams) { + endpoint.execute( + id, + exchangeHandler, + pushHandlerFactory, + context, + streamControl -> { + streamControl.setTimeout(responseTimeout); + }); + } else { + if (responseTimeout != null) { + endpoint.setSocketTimeout(responseTimeout); + } + endpoint.execute( + id, + exchangeHandler, + pushHandlerFactory, + context, + null); + } + } + @Override public void execute( final AsyncClientExchangeHandler exchangeHandler, @@ -460,13 +497,18 @@ public void execute( clientContext.setExchangeId(exchangeId); if (LOG.isDebugEnabled()) { LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint)); - connectionEndpoint.execute( + doExecute( exchangeId, + connectionEndpoint, new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler), pushHandlerFactory, clientContext); } else { - connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext); + doExecute(exchangeId, + connectionEndpoint, + exchangeHandler, + pushHandlerFactory, + clientContext); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java index 1f07c75fe9..d6b6df2b6c 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java @@ -50,7 +50,6 @@ import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator; import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint; import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection; -import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Internal; @@ -59,11 +58,13 @@ import org.apache.hc.core5.concurrent.CallbackContribution; import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.function.Resolver; import org.apache.hc.core5.http.HttpConnection; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.ProtocolVersion; +import org.apache.hc.core5.http.StreamControl; import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.config.RegistryBuilder; @@ -808,12 +809,16 @@ public void setSocketTimeout(final Timeout timeout) { getValidatedPoolEntry().getConnection().setSocketTimeout(timeout); } + /** + * @since 5.7 + */ @Override public void execute( final String exchangeId, final AsyncClientExchangeHandler exchangeHandler, final HandlerFactory pushHandlerFactory, - final HttpContext context) { + final HttpContext context, + final Callback initiationCallback) { final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection(); if (LOG.isDebugEnabled()) { LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection)); @@ -824,14 +829,19 @@ public void execute( exchangeHandler, pushHandlerFactory, context, - streamControl -> { - final HttpClientContext clientContext = HttpClientContext.cast(context); - final Timeout responseTimeout = clientContext.getRequestConfigOrDefault().getResponseTimeout(); - streamControl.setTimeout(responseTimeout); - }), + initiationCallback), Command.Priority.NORMAL); } + @Override + public void execute( + final String exchangeId, + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpContext context) { + execute(exchangeId, exchangeHandler, pushHandlerFactory, context, null); + } + @Override public EndpointInfo getInfo() { final PoolEntry poolEntry = poolEntryRef.get(); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java index 8ae25aae2c..c90d2e2ebf 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java @@ -35,6 +35,8 @@ import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.BasicFuture; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Callback; +import org.apache.hc.core5.http.StreamControl; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; @@ -69,6 +71,27 @@ public abstract void execute( HandlerFactory pushHandlerFactory, HttpContext context); + /** + * Initiates a message exchange using the given handler. + * + * @param id unique operation ID or {@code null}. + * @param exchangeHandler the message exchange handler. + * @param pushHandlerFactory the push handler factory. + * @param context the execution context. + * @param initiationCallback the initialization callback optionally executed + * by connections that support cancellable message + * streams. + * @since 5.7 + */ + public void execute( + final String id, + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpContext context, + final Callback initiationCallback) { + execute(id, exchangeHandler, pushHandlerFactory, context); + } + /** * Determines if the connection to the remote endpoint is still open and valid. */