Skip to content
Open
25 changes: 25 additions & 0 deletions http-clients/aws-crt-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,31 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
Expand Down Expand Up @@ -91,15 +92,15 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
* we have a pool and no one can destroy it underneath us until we've finished submitting the
* request)
*/
try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(asyncRequest.request()))) {
CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
.crtConnPool(crtConnPool)
.readBufferSize(this.readBufferSize)
.request(asyncRequest)
.build();

return new CrtAsyncRequestExecutor().execute(context);
}
HttpStreamManager streamManager = getOrCreateConnectionPool(poolKey(asyncRequest.request()));
CrtAsyncRequestContext context = CrtAsyncRequestContext.builder()
.streamManager(streamManager)
.readBufferSize(this.readBufferSize)
.request(asyncRequest)
.protocol(this.protocol)
.build();

return new CrtAsyncRequestExecutor().execute(context);
}

/**
Expand Down Expand Up @@ -222,6 +223,14 @@ AwsCrtAsyncHttpClient.Builder connectionHealthConfiguration(Consumer<ConnectionH
AwsCrtAsyncHttpClient.Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfiguration.Builder>
tcpKeepAliveConfigurationBuilder);

/**
* Configure the HTTP protocol version to use for connections.
*
* @param protocol the HTTP protocol version
* @return The builder for method chaining.
*/
AwsCrtAsyncHttpClient.Builder protocol(Protocol protocol);

/**
* Configure whether to enable a hybrid post-quantum key exchange option for the Transport Layer Security (TLS) network
* encryption protocol when communicating with services that support Post Quantum TLS. If Post Quantum cipher suites are
Expand All @@ -246,6 +255,13 @@ AwsCrtAsyncHttpClient.Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveCon
private static final class DefaultAsyncBuilder
extends AwsCrtClientBuilderBase<AwsCrtAsyncHttpClient.Builder> implements Builder {


@Override
public Builder protocol(Protocol protocol) {
getAttributeMap().put(SdkHttpConfigurationOption.PROTOCOL, protocol);
return this;
}

@Override
public SdkAsyncHttpClient build() {
return new AwsCrtAsyncHttpClient(this, getAttributeMap().build()
Expand All @@ -258,5 +274,6 @@ public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
.merge(serviceDefaults)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullResponse;
Expand Down Expand Up @@ -56,6 +57,11 @@ public final class AwsCrtHttpClient extends AwsCrtHttpClientBase implements SdkH

private AwsCrtHttpClient(DefaultBuilder builder, AttributeMap config) {
super(builder, config);
if (this.protocol == Protocol.HTTP2) {
throw new UnsupportedOperationException(
"HTTP/2 is not supported for sync HTTP clients. Either use HTTP/1.1 (the default) or use an async "
+ "HTTP client (e.g., AwsCrtAsyncHttpClient).");
}
}

public static AwsCrtHttpClient.Builder builder() {
Expand Down Expand Up @@ -91,14 +97,13 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
* we have a pool and no one can destroy it underneath us until we've finished submitting the
* request)
*/
try (HttpClientConnectionManager crtConnPool = getOrCreateConnectionPool(poolKey(request.httpRequest()))) {
CrtRequestContext context = CrtRequestContext.builder()
.crtConnPool(crtConnPool)
.readBufferSize(this.readBufferSize)
.request(request)
.build();
return new CrtHttpRequest(context);
}
HttpStreamManager streamManager = getOrCreateConnectionPool(poolKey(request.httpRequest()));
CrtRequestContext context = CrtRequestContext.builder()
.streamManager(streamManager)
.readBufferSize(this.readBufferSize)
.request(request)
.build();
return new CrtHttpRequest(context);
}

private static final class CrtHttpRequest implements ExecutableHttpRequest {
Expand Down Expand Up @@ -140,7 +145,7 @@ public HttpExecuteResponse call() throws IOException {
@Override
public void abort() {
if (responseFuture != null) {
responseFuture.completeExceptionally(new IOException("Request ws cancelled"));
responseFuture.completeExceptionally(new IOException("Request was cancelled"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import java.util.concurrent.ConcurrentHashMap;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.Http2StreamManagerOptions;
import software.amazon.awssdk.crt.http.HttpClientConnectionManagerOptions;
import software.amazon.awssdk.crt.http.HttpMonitoringOptions;
import software.amazon.awssdk.crt.http.HttpProxyOptions;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.crt.http.HttpStreamManagerOptions;
import software.amazon.awssdk.crt.http.HttpVersion;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
Expand All @@ -57,44 +60,46 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
private static final long DEFAULT_STREAM_WINDOW_SIZE = 16L * 1024L * 1024L; // 16 MB

protected final long readBufferSize;
private final Map<URI, HttpClientConnectionManager> connectionPools = new ConcurrentHashMap<>();
protected final Protocol protocol;
private final Map<URI, HttpStreamManager> connectionPools = new ConcurrentHashMap<>();
private final LinkedList<CrtResource> ownedSubResources = new LinkedList<>();
private final ClientBootstrap bootstrap;
private final SocketOptions socketOptions;
private final TlsContext tlsContext;
private final HttpProxyOptions proxyOptions;
private final HttpMonitoringOptions monitoringOptions;
private final long maxConnectionIdleInMilliseconds;
private final int maxConnectionsPerEndpoint;
private final int maxStreamsPerEndpoint;
private final long connectionAcquisitionTimeout;
private final TlsContextOptions tlsContextOptions;
private boolean isClosed = false;

AwsCrtHttpClientBase(AwsCrtClientBuilderBase builder, AttributeMap config) {
if (config.get(PROTOCOL) == Protocol.HTTP2) {
throw new UnsupportedOperationException("HTTP/2 is not supported in AwsCrtHttpClient yet. Use "
+ "NettyNioAsyncHttpClient instead.");
ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(),
config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT));
TlsContextOptions clientTlsContextOptions =
TlsContextOptions.createDefaultClient()
.withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled()))
.withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
this.protocol = config.get(PROTOCOL);
if (protocol == Protocol.HTTP2) {
clientTlsContextOptions = clientTlsContextOptions.withAlpnList("h2");
}

try (ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
SocketOptions clientSocketOptions = buildSocketOptions(builder.getTcpKeepAliveConfiguration(),
config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT));
TlsContextOptions clientTlsContextOptions =
TlsContextOptions.createDefaultClient()
.withCipherPreference(resolveCipherPreference(builder.getPostQuantumTlsEnabled()))
.withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions)) {

this.bootstrap = registerOwnedResource(clientBootstrap);
this.socketOptions = registerOwnedResource(clientSocketOptions);
this.tlsContext = registerOwnedResource(clientTlsContext);
this.readBufferSize = builder.getReadBufferSizeInBytes() == null ?
DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes();
this.maxConnectionsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
this.monitoringOptions = resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration()).orElse(null);
this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis();
this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis();
this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null);
}
this.tlsContextOptions = registerOwnedResource(clientTlsContextOptions);
TlsContext clientTlsContext = new TlsContext(clientTlsContextOptions);

this.bootstrap = registerOwnedResource(clientBootstrap);
this.socketOptions = registerOwnedResource(clientSocketOptions);
this.tlsContext = registerOwnedResource(clientTlsContext);
this.readBufferSize = builder.getReadBufferSizeInBytes() == null ?
DEFAULT_STREAM_WINDOW_SIZE : builder.getReadBufferSizeInBytes();
this.maxStreamsPerEndpoint = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);
this.monitoringOptions = resolveHttpMonitoringOptions(builder.getConnectionHealthConfiguration()).orElse(null);
this.maxConnectionIdleInMilliseconds = config.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toMillis();
this.connectionAcquisitionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT).toMillis();
this.proxyOptions = resolveProxy(builder.getProxyConfiguration(), tlsContext).orElse(null);
}

/**
Expand All @@ -106,7 +111,6 @@ abstract class AwsCrtHttpClientBase implements SdkAutoCloseable {
*/
private <T extends CrtResource> T registerOwnedResource(T subresource) {
if (subresource != null) {
subresource.addRef();
ownedSubResources.push(subresource);
}
return subresource;
Expand All @@ -116,23 +120,44 @@ String clientName() {
return AWS_COMMON_RUNTIME;
}

private HttpClientConnectionManager createConnectionPool(URI uri) {
log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxConnectionsPerEndpoint);
private HttpStreamManager createConnectionPool(URI uri) {
log.debug(() -> "Creating ConnectionPool for: URI:" + uri + ", MaxConns: " + maxStreamsPerEndpoint+ ", MaxStreams: " + maxStreamsPerEndpoint);

boolean isHttps = "https".equalsIgnoreCase(uri.getScheme());
TlsContext poolTlsContext = isHttps ? tlsContext : null;

HttpClientConnectionManagerOptions options = new HttpClientConnectionManagerOptions()
HttpClientConnectionManagerOptions h1Options = new HttpClientConnectionManagerOptions()
.withClientBootstrap(bootstrap)
.withSocketOptions(socketOptions)
.withTlsContext(tlsContext)
.withTlsContext(poolTlsContext)
.withUri(uri)
.withWindowSize(readBufferSize)
.withMaxConnections(maxConnectionsPerEndpoint)
.withMaxConnections(maxStreamsPerEndpoint)
.withManualWindowManagement(true)
.withProxyOptions(proxyOptions)
.withMonitoringOptions(monitoringOptions)
.withMaxConnectionIdleInMilliseconds(maxConnectionIdleInMilliseconds)
.withConnectionAcquisitionTimeoutInMilliseconds(connectionAcquisitionTimeout);

return HttpClientConnectionManager.create(options);
HttpStreamManagerOptions options = new HttpStreamManagerOptions()
.withHTTP1ConnectionManagerOptions(h1Options);

if (protocol == Protocol.HTTP2) {
Http2StreamManagerOptions h2Options = new Http2StreamManagerOptions()
.withMaxConcurrentStreams(maxStreamsPerEndpoint)
.withConnectionManagerOptions(h1Options);

if (!isHttps) {
h2Options.withPriorKnowledge(true);
}

options.withHTTP2StreamManagerOptions(h2Options);
options.withExpectedProtocol(HttpVersion.HTTP_2);
} else {
options.withExpectedProtocol(HttpVersion.HTTP_1_1);
}

return HttpStreamManager.create(options);
}

/*
Expand All @@ -150,14 +175,13 @@ private HttpClientConnectionManager createConnectionPool(URI uri) {
* existing pool. If we add all of execute() to the scope, we include, at minimum a JNI call to the native
* pool implementation.
*/
HttpClientConnectionManager getOrCreateConnectionPool(URI uri) {
HttpStreamManager getOrCreateConnectionPool(URI uri) {
synchronized (this) {
if (isClosed) {
throw new IllegalStateException("Client is closed. No more requests can be made with this client.");
}

HttpClientConnectionManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool);
connPool.addRef();
HttpStreamManager connPool = connectionPools.computeIfAbsent(uri, this::createConnectionPool);
return connPool;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@
package software.amazon.awssdk.http.crt.internal;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.metrics.MetricCollector;

@SdkInternalApi
public final class CrtAsyncRequestContext {
private final AsyncExecuteRequest request;
private final long readBufferSize;
private final HttpClientConnectionManager crtConnPool;
private final HttpStreamManager streamManager;
private final MetricCollector metricCollector;
private final Protocol protocol;

private CrtAsyncRequestContext(Builder builder) {
this.request = builder.request;
this.readBufferSize = builder.readBufferSize;
this.crtConnPool = builder.crtConnPool;
this.streamManager = builder.streamManager;
this.metricCollector = request.metricCollector().orElse(null);
this.protocol = builder.protocol;
}

public static Builder builder() {
Expand All @@ -46,8 +49,12 @@ public long readBufferSize() {
return readBufferSize;
}

public HttpClientConnectionManager crtConnPool() {
return crtConnPool;
public Protocol protocol() {
return protocol;
}

public HttpStreamManager streamManager() {
return streamManager;
}

public MetricCollector metricCollector() {
Expand All @@ -57,7 +64,8 @@ public MetricCollector metricCollector() {
public static final class Builder {
private AsyncExecuteRequest request;
private long readBufferSize;
private HttpClientConnectionManager crtConnPool;
private HttpStreamManager streamManager;
private Protocol protocol;

private Builder() {
}
Expand All @@ -72,13 +80,19 @@ public Builder readBufferSize(long readBufferSize) {
return this;
}

public Builder crtConnPool(HttpClientConnectionManager crtConnPool) {
this.crtConnPool = crtConnPool;
public Builder streamManager(HttpStreamManager streamManager) {
this.streamManager = streamManager;
return this;
}

public Builder protocol(Protocol protocol) {
this.protocol = protocol;
return this;
}

public CrtAsyncRequestContext build() {
return new CrtAsyncRequestContext(this);
}

}
}
Loading
Loading