Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.ComplexCancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
Expand Down Expand Up @@ -327,6 +330,54 @@ private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler handle
return new ReleasingAsyncClientExchangeHandler(handler, this::releaseSlot);
}

private Cancellable doExecute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, preparatory refactorings like this method extraction would be done in separate commits for ease of review (they can still be submitted in a single PR and squashed before merging)

final String id,
final AsyncConnectionEndpoint endpoint,
final AsyncClientExchangeHandler exchangeHandler,
final HttpClientContext context) {
final RequestConfig requestConfig = context.getRequestConfigOrDefault();
final Timeout responseTimeout = requestConfig.getResponseTimeout();
final EndpointInfo endpointInfo = endpoint.getInfo();
final ProtocolVersion version = endpointInfo != null ? endpointInfo.getProtocol() : null;
final boolean supportsStreams = version != null && version.greaterEquals(HttpVersion.HTTP_2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest supportsMultiplexing or just isH2. "Stream" is an overloaded term and to me mainly connotes InputStreams and the like, i.e. streaming in contrast to buffering.


if (log.isDebugEnabled()) {
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
}

if (supportsStreams) {
final ComplexCancellable complexCancellable = new ComplexCancellable();
endpoint.execute(
id,
exchangeHandler,
pushHandlerFactory,
context,
streamControl -> {
streamControl.setTimeout(responseTimeout);
complexCancellable.setDependency(streamControl);
});
return complexCancellable;
} else {
if (responseTimeout != null) {
endpoint.setSocketTimeout(responseTimeout);
}
endpoint.execute(
id,
exchangeHandler,
pushHandlerFactory,
context,
null);
if (requestConfig.isHardCancellationEnabled()) {
return () -> {
exchangeHandler.cancel();
return true;
};
} else {
return Operations.nonCancellable();
}
}
}

@Override
public Cancellable execute(
final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
Expand All @@ -336,46 +387,29 @@ public Cancellable execute(
"Execution pipeline queue limit reached (max=" + maxQueued + ")"));
return Operations.nonCancellable();
}
final AsyncClientExchangeHandler actual = guard(exchangeHandler);
final AsyncClientExchangeHandler handler = guard(exchangeHandler);
if (endpoint.isConnected()) {
if (log.isDebugEnabled()) {
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
}
final RequestConfig requestConfig = context.getRequestConfigOrDefault();
final Timeout responseTimeout = requestConfig.getResponseTimeout();
if (responseTimeout != null) {
endpoint.setSocketTimeout(responseTimeout);
}
endpoint.execute(id, actual, pushHandlerFactory, context);
if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) {
return () -> {
actual.cancel();
return true;
};
}
doExecute(id, endpoint, handler, context);
} else {
connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {

@Override
public void completed(final AsyncExecRuntime runtime) {
if (log.isDebugEnabled()) {
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
}
try {
endpoint.execute(id, actual, pushHandlerFactory, context);
doExecute(id, endpoint, handler, context);
} catch (final RuntimeException ex) {
failed(ex);
}
}

@Override
public void failed(final Exception ex) {
actual.failed(ex);
handler.failed(ex);
}

@Override
public void cancelled() {
actual.failed(new InterruptedIOException());
handler.failed(new InterruptedIOException());
}

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hc.client5.http.EndpointInfo;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.config.Configurable;
Expand All @@ -61,6 +62,8 @@
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
Expand Down Expand Up @@ -448,6 +451,40 @@ public boolean isConnected() {
return !isReleased() && connectionEndpoint.isConnected();
}

private void doExecute(
final String id,
final AsyncConnectionEndpoint endpoint,
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpClientContext context) {
final RequestConfig requestConfig = context.getRequestConfigOrDefault();
final Timeout responseTimeout = requestConfig.getResponseTimeout();
final EndpointInfo endpointInfo = endpoint.getInfo();
final ProtocolVersion version = endpointInfo != null ? endpointInfo.getProtocol() : null;
final boolean supportsStreams = version != null && version.greaterEquals(HttpVersion.HTTP_2);

if (supportsStreams) {
endpoint.execute(
id,
exchangeHandler,
pushHandlerFactory,
context,
streamControl -> {
streamControl.setTimeout(responseTimeout);
});
} else {
if (responseTimeout != null) {
endpoint.setSocketTimeout(responseTimeout);
}
endpoint.execute(
id,
exchangeHandler,
pushHandlerFactory,
context,
null);
}
}

@Override
public void execute(
final AsyncClientExchangeHandler exchangeHandler,
Expand All @@ -460,13 +497,18 @@ public void execute(
clientContext.setExchangeId(exchangeId);
if (LOG.isDebugEnabled()) {
LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
connectionEndpoint.execute(
doExecute(
exchangeId,
connectionEndpoint,
new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
pushHandlerFactory,
clientContext);
} else {
connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
doExecute(exchangeId,
connectionEndpoint,
exchangeHandler,
pushHandlerFactory,
clientContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
Expand All @@ -59,11 +58,13 @@
import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Resolver;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.StreamControl;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.config.RegistryBuilder;
Expand Down Expand Up @@ -808,12 +809,16 @@ public void setSocketTimeout(final Timeout timeout) {
getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
}

/**
* @since 5.7
*/
@Override
public void execute(
final String exchangeId,
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
final HttpContext context,
final Callback<StreamControl> initiationCallback) {
final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
if (LOG.isDebugEnabled()) {
LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
Expand All @@ -824,14 +829,19 @@ public void execute(
exchangeHandler,
pushHandlerFactory,
context,
streamControl -> {
final HttpClientContext clientContext = HttpClientContext.cast(context);
final Timeout responseTimeout = clientContext.getRequestConfigOrDefault().getResponseTimeout();
streamControl.setTimeout(responseTimeout);
}),
initiationCallback),
Command.Priority.NORMAL);
}

@Override
public void execute(
final String exchangeId,
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
execute(exchangeId, exchangeHandler, pushHandlerFactory, context, null);
}

@Override
public EndpointInfo getInfo() {
final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.StreamControl;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
Expand Down Expand Up @@ -69,6 +71,27 @@ public abstract void execute(
HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
HttpContext context);

/**
* Initiates a message exchange using the given handler.
*
* @param id unique operation ID or {@code null}.
* @param exchangeHandler the message exchange handler.
* @param pushHandlerFactory the push handler factory.
* @param context the execution context.
* @param initiationCallback the initialization callback optionally executed
* by connections that support cancellable message
* streams.
* @since 5.7
*/
public void execute(
final String id,
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final Callback<StreamControl> initiationCallback) {
execute(id, exchangeHandler, pushHandlerFactory, context);
}

/**
* Determines if the connection to the remote endpoint is still open and valid.
*/
Expand Down
Loading