From 6224ba51e61d9bf23ce3edd917170ec65e94d76b Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sun, 25 Jan 2026 19:15:01 +0100 Subject: [PATCH 1/2] Add HTTP/2 keep-alive PING policy. Emit PINGs on otherwise idle connections once SETTINGS are ACKed, and close the session if the peer does not ACK within the configured timeout. --- .../hc/core5/http2/config/H2Config.java | 35 +- .../hc/core5/http2/config/H2PingPolicy.java | 103 +++++ .../impl/nio/AbstractH2StreamMultiplexer.java | 167 ++++++++ .../H2KeepAlivePingClientExample.java | 257 +++++++++++++ .../nio/TestAbstractH2StreamMultiplexer.java | 345 +++++++++++++++-- .../nio/TestH2KeepAlivePingPolicyIT.java | 357 ++++++++++++++++++ 6 files changed, 1238 insertions(+), 26 deletions(-) create mode 100644 httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java create mode 100644 httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java create mode 100644 httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java index a801cc51fe..e8ef0dd4b4 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java @@ -51,10 +51,11 @@ public class H2Config { private final int maxHeaderListSize; private final boolean compressionEnabled; private final int maxContinuations; + private final H2PingPolicy pingPolicy; H2Config(final int headerTableSize, final boolean pushEnabled, final int maxConcurrentStreams, final int initialWindowSize, final int maxFrameSize, final int maxHeaderListSize, - final boolean compressionEnabled, final int maxContinuations) { + final boolean compressionEnabled, final int maxContinuations, final H2PingPolicy pingPolicy) { super(); this.headerTableSize = headerTableSize; this.pushEnabled = pushEnabled; @@ -64,6 +65,7 @@ public class H2Config { this.maxHeaderListSize = maxHeaderListSize; this.compressionEnabled = compressionEnabled; this.maxContinuations = maxContinuations; + this.pingPolicy = pingPolicy; } public int getHeaderTableSize() { @@ -98,6 +100,15 @@ public int getMaxContinuations() { return maxContinuations; } + /** + * Optional keep-alive PING policy. + * + * @since 5.5 + */ + public H2PingPolicy getPingPolicy() { + return pingPolicy; + } + @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -109,6 +120,7 @@ public String toString() { .append(", maxHeaderListSize=").append(this.maxHeaderListSize) .append(", compressionEnabled=").append(this.compressionEnabled) .append(", maxContinuations=").append(this.maxContinuations) + .append(", pingPolicy=").append(this.pingPolicy) .append("]"); return builder.toString(); } @@ -142,7 +154,9 @@ public static H2Config.Builder copy(final H2Config config) { .setInitialWindowSize(config.getInitialWindowSize()) .setMaxFrameSize(config.getMaxFrameSize()) .setMaxHeaderListSize(config.getMaxHeaderListSize()) - .setCompressionEnabled(config.isCompressionEnabled()); + .setCompressionEnabled(config.isCompressionEnabled()) + .setMaxContinuations(config.getMaxContinuations()) + .setPingPolicy(config.getPingPolicy()); } public static class Builder { @@ -155,6 +169,7 @@ public static class Builder { private int maxHeaderListSize; private boolean compressionEnabled; private int maxContinuations; + private H2PingPolicy pingPolicy; Builder() { this.headerTableSize = INIT_HEADER_TABLE_SIZE * 2; @@ -165,6 +180,7 @@ public static class Builder { this.maxHeaderListSize = FrameConsts.MAX_FRAME_SIZE; this.compressionEnabled = true; this.maxContinuations = 100; + this.pingPolicy = null; } public Builder setHeaderTableSize(final int headerTableSize) { @@ -211,7 +227,7 @@ public Builder setCompressionEnabled(final boolean compressionEnabled) { * Sets max limit on number of continuations. *

value zero represents no limit

* - * @since 5,4 + * @since 5.4 */ public Builder setMaxContinuations(final int maxContinuations) { Args.positive(maxContinuations, "Max continuations"); @@ -219,6 +235,16 @@ public Builder setMaxContinuations(final int maxContinuations) { return this; } + /** + * Sets optional keep-alive PING policy. + * + * @since 5.5 + */ + public Builder setPingPolicy(final H2PingPolicy pingPolicy) { + this.pingPolicy = pingPolicy; + return this; + } + public H2Config build() { return new H2Config( headerTableSize, @@ -228,7 +254,8 @@ public H2Config build() { maxFrameSize, maxHeaderListSize, compressionEnabled, - maxContinuations); + maxContinuations, + pingPolicy); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java new file mode 100644 index 0000000000..fc81908fd6 --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java @@ -0,0 +1,103 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http2.config; + +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; + +/** + * HTTP/2 keep-alive ping policy. + * + * @since 5.5 + */ +public final class H2PingPolicy { + + private static final H2PingPolicy DISABLED = new H2PingPolicy(Timeout.DISABLED, Timeout.DISABLED); + + private final Timeout idleTime; + private final Timeout ackTimeout; + + private H2PingPolicy(final Timeout idleTime, final Timeout ackTimeout) { + this.idleTime = idleTime; + this.ackTimeout = ackTimeout; + } + + public static H2PingPolicy disabled() { + return DISABLED; + } + + public static Builder custom() { + return new Builder(); + } + + public Timeout getIdleTime() { + return idleTime; + } + + public Timeout getAckTimeout() { + return ackTimeout; + } + + public boolean isEnabled() { + return isActive(idleTime) && isActive(ackTimeout); + } + + private static boolean isActive(final Timeout timeout) { + return timeout != null && timeout.isEnabled() && TimeValue.isPositive(timeout); + } + + public static final class Builder { + + private Timeout idleTime; + private Timeout ackTimeout; + + private Builder() { + this.idleTime = Timeout.DISABLED; + this.ackTimeout = Timeout.DISABLED; + } + + public Builder setIdleTime(final Timeout idleTime) { + this.idleTime = Args.notNull(idleTime, "idleTime"); + return this; + } + + public Builder setAckTimeout(final Timeout ackTimeout) { + this.ackTimeout = Args.notNull(ackTimeout, "ackTimeout"); + return this; + } + + public H2PingPolicy build() { + if (isActive(idleTime)) { + Args.check(isActive(ackTimeout), "ackTimeout must be positive when idleTime is enabled"); + } + return new H2PingPolicy(idleTime, ackTimeout); + } + } + +} diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 5c4b5b3971..5cf88ecf4e 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -77,6 +77,7 @@ import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; +import org.apache.hc.core5.http2.config.H2PingPolicy; import org.apache.hc.core5.http2.config.H2Setting; import org.apache.hc.core5.http2.frame.FrameFactory; import org.apache.hc.core5.http2.frame.FrameFlag; @@ -125,6 +126,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private final AtomicInteger connOutputWindow; private final AtomicInteger outputRequests; private final H2StreamListener streamListener; + private final KeepAlivePingSupport keepAlivePingSupport; private ConnectionHandshake connState = ConnectionHandshake.READY; private SettingsHandshake localSettingState = SettingsHandshake.READY; @@ -183,6 +185,11 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } this.lowMark = H2Config.INIT.getInitialWindowSize() / 2; this.streamListener = streamListener; + + final H2PingPolicy pingPolicy = this.localConfig.getPingPolicy(); + this.keepAlivePingSupport = pingPolicy != null && pingPolicy.isEnabled() + ? new KeepAlivePingSupport(pingPolicy) + : null; } @Override @@ -444,6 +451,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio for (;;) { final RawFrame frame = inputBuffer.read(src, ioSession); if (frame != null) { + if (keepAlivePingSupport != null) { + keepAlivePingSupport.onFrameInput(frame); + } if (streamListener != null) { streamListener.onFrameInput(this, frame.getStreamId(), frame); } @@ -487,6 +497,10 @@ public final void onOutput() throws HttpException, IOException { ioSession.getLock().unlock(); } + if (keepAlivePingSupport != null) { + keepAlivePingSupport.activateIfReady(); + } + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) { @@ -592,6 +606,15 @@ public final void onOutput() throws HttpException, IOException { } public final void onTimeout(final Timeout timeout) throws HttpException, IOException { + if (keepAlivePingSupport != null + && connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 + && localSettingState == SettingsHandshake.ACKED + && remoteSettingState == SettingsHandshake.ACKED) { + if (keepAlivePingSupport.onTimeout(timeout)) { + return; + } + } + connState = ConnectionHandshake.SHUTDOWN; final RawFrame goAway; @@ -888,6 +911,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload"); } if (frame.isFlagSet(FrameFlag.ACK)) { + if (keepAlivePingSupport != null && keepAlivePingSupport.consumePingAck(ping)) { + break; + } final AsyncPingHandler pingHandler = pingHandlers.poll(); if (pingHandler != null) { pingHandler.consumeResponse(ping); @@ -910,6 +936,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio localSettingState = SettingsHandshake.ACKED; ioSession.setEvent(SelectionKey.OP_WRITE); applyLocalSettings(); + if (keepAlivePingSupport != null) { + keepAlivePingSupport.activateIfReady(); + } } } else { final ByteBuffer payload = frame.getPayload(); @@ -923,6 +952,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio final RawFrame response = frameFactory.createSettingsAck(); commitFrame(response); remoteSettingState = SettingsHandshake.ACKED; + if (keepAlivePingSupport != null) { + keepAlivePingSupport.activateIfReady(); + } } } break; @@ -1328,6 +1360,141 @@ void appendState(final StringBuilder buf) { .append(", streams.lastRemote=").append(streams.getLastRemoteId()); } + private final class KeepAlivePingSupport { + + private static final int PING_DATA_LEN = 8; + + private final Timeout idleTime; + private final Timeout ackTimeout; + + private boolean active; + private boolean awaitingAck; + + private long lastActivityNanos; + private long pingSeq; + private long expectedAckSeq; + + KeepAlivePingSupport(final H2PingPolicy policy) { + Args.notNull(policy, "PING policy"); + this.idleTime = policy.getIdleTime(); + this.ackTimeout = policy.getAckTimeout(); + this.active = false; + this.awaitingAck = false; + this.lastActivityNanos = System.nanoTime(); + this.pingSeq = 0L; + this.expectedAckSeq = 0L; + } + + void activateIfReady() { + if (active) { + return; + } + if (localSettingState == SettingsHandshake.ACKED && remoteSettingState == SettingsHandshake.ACKED) { + active = true; + awaitingAck = false; + lastActivityNanos = System.nanoTime(); + ioSession.setSocketTimeout(idleTime); + } + } + + void onFrameInput(final RawFrame frame) { + if (!active) { + return; + } + lastActivityNanos = System.nanoTime(); + if (awaitingAck) { + if (!(frame.getType() == FrameType.PING.getValue() && frame.isFlagSet(FrameFlag.ACK))) { + awaitingAck = false; + } + } + ioSession.setSocketTimeout(idleTime); + } + + void onActivity() { + if (!active) { + return; + } + lastActivityNanos = System.nanoTime(); + if (awaitingAck) { + awaitingAck = false; + } + ioSession.setSocketTimeout(idleTime); + } + + boolean consumePingAck(final ByteBuffer payload) { + if (!active || !awaitingAck) { + return false; + } + if (payload == null || payload.remaining() != PING_DATA_LEN) { + return false; + } + final long ack = payload.getLong(payload.position()); + if (ack != expectedAckSeq) { + return false; + } + onActivity(); + return true; + } + + boolean onTimeout(final Timeout timeout) throws IOException { + activateIfReady(); + if (!active) { + return false; + } + + if (awaitingAck) { + shutdownKeepAlive(timeout); + return true; + } + + final long idleNanos = idleTime.toMilliseconds() * 1_000_000L; + if (idleNanos > 0L) { + final long elapsed = System.nanoTime() - lastActivityNanos; + if (elapsed < idleNanos) { + final long remainingMs = Math.max(1L, (idleNanos - elapsed) / 1_000_000L); + ioSession.setSocketTimeout(Timeout.ofMilliseconds(remainingMs)); + return true; + } + } + + awaitingAck = true; + sendPing(); + ioSession.setSocketTimeout(ackTimeout); + return true; + } + + private void sendPing() throws IOException { + final long v = ++pingSeq; + expectedAckSeq = v; + + final ByteBuffer payload = ByteBuffer.allocate(PING_DATA_LEN); + payload.putLong(v); + payload.flip(); + + final RawFrame ping = frameFactory.createPing(payload); + commitFrame(ping); + } + + private void shutdownKeepAlive(final Timeout timeout) throws IOException { + connState = ConnectionHandshake.SHUTDOWN; + + final RawFrame goAway = frameFactory.createGoAway( + streams.getLastRemoteId(), + H2Error.NO_ERROR, + "Ping response timeout (" + timeout + ")"); + commitFrame(goAway); + + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + stream.fail(new H2StreamResetException( + H2Error.NO_ERROR, + "Ping response timeout (" + timeout + ")")); + } + streams.shutdownAndReleaseAll(); + } + + } + private static class Continuation { final int streamId; diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java new file mode 100644 index 0000000000..6fb7505d8c --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java @@ -0,0 +1,257 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.config.H2PingPolicy; +import org.apache.hc.core5.http2.frame.FrameFlag; +import org.apache.hc.core5.http2.frame.FrameType; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +/** + * Minimal example demonstrating HTTP/2 connection keepalive using {@link H2PingPolicy}. + *

+ * The client configures an idle timeout and an ACK timeout. When the underlying HTTP/2 + * connection becomes idle, the I/O reactor triggers a keepalive {@code PING}. If the + * peer responds with {@code PING[ACK]} within the configured ACK timeout, the connection + * remains usable; otherwise the connection is considered dead and is terminated by the + * transport. + *

+ *

+ * This example performs a single request to establish the connection and then waits + * long enough for one keepalive round-trip. It prints: + *

+ *
    + *
  • the remote endpoint once,
  • + *
  • {@code >> PING} when a keepalive PING is emitted,
  • + *
  • {@code << PING[ACK]} when the ACK is received,
  • + *
  • a final counter line {@code keepalive: pingsOut=..., pingAcksIn=...}.
  • + *
+ *

+ * Notes: + *

+ *
    + *
  • This is intentionally not a unit test; it is a runnable sanity-check and usage example.
  • + *
  • Keepalive requires HTTP/2 settings negotiation to complete; PINGs may not be emitted + * immediately on startup.
  • + *
  • Timing is inherently environment-dependent; adjust {@code idleTime}/{@code ackTimeout} + * if running on a slow or heavily loaded machine.
  • + *
+ * @since 5.5 + */ + +public class H2KeepAlivePingClientExample { + + public static void main(final String[] args) throws Exception { + + final Timeout idleTime = Timeout.ofSeconds(1); + final Timeout ackTimeout = Timeout.ofSeconds(2); + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(5, TimeUnit.SECONDS) + .build(); + + final H2PingPolicy pingPolicy = H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(100) + .setPingPolicy(pingPolicy) + .build(); + + final AtomicBoolean remotePrinted = new AtomicBoolean(false); + final AtomicInteger pingsOut = new AtomicInteger(0); + final AtomicInteger pingAcksIn = new AtomicInteger(0); + final CountDownLatch pingAckLatch = new CountDownLatch(1); + + final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(h2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setStreamListener(new H2StreamListener() { + + private void printRemoteOnce(final HttpConnection connection) { + if (remotePrinted.compareAndSet(false, true)) { + System.out.println("remote=" + connection.getRemoteAddress()); + } + } + + @Override + public void onHeaderInput(final HttpConnection connection, final int streamId, final List headers) { + } + + @Override + public void onHeaderOutput(final HttpConnection connection, final int streamId, final List headers) { + } + + @Override + public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) { + printRemoteOnce(connection); + if (FrameType.valueOf(frame.getType()) == FrameType.PING && frame.isFlagSet(FrameFlag.ACK)) { + System.out.println("<< PING[ACK]"); + pingAcksIn.incrementAndGet(); + pingAckLatch.countDown(); + } + } + + @Override + public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) { + printRemoteOnce(connection); + if (FrameType.valueOf(frame.getType()) == FrameType.PING && !frame.isFlagSet(FrameFlag.ACK)) { + System.out.println(">> PING"); + pingsOut.incrementAndGet(); + } + } + + @Override + public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + @Override + public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + }) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> requester.close(CloseMode.GRACEFUL))); + + requester.start(); + + final URI requestUri = new URI("http://nghttp2.org/httpbin/post"); + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(requestUri) + .setEntity("stuff") + .build(); + final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>(new StringAsyncEntityConsumer()); + + final CountDownLatch exchangeLatch = new CountDownLatch(1); + + requester.execute(new AsyncClientExchangeHandler() { + + @Override + public void releaseResources() { + requestProducer.releaseResources(); + responseConsumer.releaseResources(); + exchangeLatch.countDown(); + } + + @Override + public void cancel() { + exchangeLatch.countDown(); + } + + @Override + public void failed(final Exception cause) { + exchangeLatch.countDown(); + } + + @Override + public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException { + requestProducer.sendRequest(channel, httpContext); + } + + @Override + public int available() { + return requestProducer.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + requestProducer.produce(channel); + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext httpContext) { + } + + @Override + public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { + responseConsumer.consumeResponse(response, entityDetails, httpContext, null); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + responseConsumer.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + responseConsumer.consume(src); + } + + @Override + public void streamEnd(final List trailers) throws HttpException, IOException { + responseConsumer.streamEnd(trailers); + } + + }, Timeout.ofSeconds(30), HttpCoreContext.create()); + + exchangeLatch.await(); + + final long waitMs = idleTime.toMilliseconds() + ackTimeout.toMilliseconds() + 500L; + pingAckLatch.await(waitMs, TimeUnit.MILLISECONDS); + + System.out.println("keepalive: pingsOut=" + pingsOut.get() + ", pingAcksIn=" + pingAcksIn.get()); + + requester.close(CloseMode.GRACEFUL); + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index c85d10aa9a..150382440a 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -57,10 +57,12 @@ import org.apache.hc.core5.http2.WritableByteChannelMock; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; +import org.apache.hc.core5.http2.config.H2PingPolicy; import org.apache.hc.core5.http2.config.H2Setting; import org.apache.hc.core5.http2.frame.DefaultFrameFactory; import org.apache.hc.core5.http2.frame.FrameConsts; import org.apache.hc.core5.http2.frame.FrameFactory; +import org.apache.hc.core5.http2.frame.FrameFlag; import org.apache.hc.core5.http2.frame.FrameType; import org.apache.hc.core5.http2.frame.RawFrame; import org.apache.hc.core5.http2.frame.StreamIdGenerator; @@ -846,28 +848,6 @@ static final class PriorityHeaderSender implements H2StreamHandler { @Override public void releaseResources() { } } - // Small struct + parser to decode the frames we capture from writes - private static final class FrameStub { - final int type; - final int streamId; - FrameStub(final int type, final int streamId) { this.type = type; this.streamId = streamId; } - } - private static List parseFrames(final byte[] all) { - final List out = new ArrayList<>(); - int p = 0; - while (p + 9 <= all.length) { - final int len = ((all[p] & 0xff) << 16) | ((all[p + 1] & 0xff) << 8) | (all[p + 2] & 0xff); - final int type = all[p + 3] & 0xff; - final int sid = ((all[p + 5] & 0x7f) << 24) | ((all[p + 6] & 0xff) << 16) - | ((all[p + 7] & 0xff) << 8) | (all[p + 8] & 0xff); - p += 9; - if (p + len > all.length) break; - out.add(new FrameStub(type, sid)); - p += len; - } - return out; - } - // 2) Client emits PRIORITY_UPDATE BEFORE HEADERS when Priority header present @Test void testSubmitWithPriorityHeaderEmitsPriorityUpdateBeforeHeaders() throws Exception { @@ -1083,5 +1063,326 @@ void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { Assertions.assertEquals(1, timeoutEx.getStreamId()); } + private static byte[] encodeFrame(final RawFrame frame) throws IOException { + final WritableByteChannelMock writableChannel = new WritableByteChannelMock(256); + final FrameOutputBuffer outBuffer = new FrameOutputBuffer(16 * 1024); + outBuffer.write(frame, writableChannel); + return writableChannel.toByteArray(); + } + + private static void feedFrame(final AbstractH2StreamMultiplexer mux, final RawFrame frame) throws Exception { + mux.onInput(ByteBuffer.wrap(encodeFrame(frame))); + } + + private static void completeSettingsHandshake(final AbstractH2StreamMultiplexer mux) throws Exception { + // Remote SETTINGS (non-ACK) -> mux replies with SETTINGS ACK and marks remoteSettingState ACKED + final RawFrame remoteSettings = FRAME_FACTORY.createSettings(new H2Setting[] { + new H2Setting(H2Param.MAX_FRAME_SIZE, FrameConsts.MIN_FRAME_SIZE) + }); + feedFrame(mux, remoteSettings); + + // Remote ACK of our SETTINGS -> localSettingState ACKED + feedFrame(mux, new RawFrame(FrameType.SETTINGS.getValue(), FrameFlag.ACK.getValue(), 0, null)); + } + + private static final class FrameStub { + final int type; + final int flags; + final int streamId; + final byte[] payload; + + FrameStub(final int type, final int flags, final int streamId, final byte[] payload) { + this.type = type; + this.flags = flags; + this.streamId = streamId; + this.payload = payload; + } + + boolean isPing() { + return type == FrameType.PING.getValue(); + } + + boolean isGoAway() { + return type == FrameType.GOAWAY.getValue(); + } + + boolean isAck() { + return (flags & FrameFlag.ACK.getValue()) != 0; + } + } + + private static List parseFrames(final byte[] all) { + final List out = new ArrayList<>(); + int p = 0; + while (p + 9 <= all.length) { + final int len = ((all[p] & 0xff) << 16) | ((all[p + 1] & 0xff) << 8) | (all[p + 2] & 0xff); + final int type = all[p + 3] & 0xff; + final int flags = all[p + 4] & 0xff; + final int sid = ((all[p + 5] & 0x7f) << 24) | ((all[p + 6] & 0xff) << 16) + | ((all[p + 7] & 0xff) << 8) | (all[p + 8] & 0xff); + p += 9; + if (p + len > all.length) { + break; + } + final byte[] payload = new byte[len]; + System.arraycopy(all, p, payload, 0, len); + out.add(new FrameStub(type, flags, sid, payload)); + p += len; + } + return out; + } + + private static byte[] concat(final List writes) { + final int total = writes.stream().mapToInt(a -> a.length).sum(); + final byte[] all = new byte[total]; + int p = 0; + for (final byte[] a : writes) { + System.arraycopy(a, 0, all, p, a.length); + p += a.length; + } + return all; + } + + + @Test + void testKeepAliveNotActiveBeforeSettingsHandshake() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofMilliseconds(5); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + writes.clear(); + + // BEFORE SETTINGS handshake is fully ACKed, keepalive must NOT run + mux.onTimeout(idleTime); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Must not emit PING before handshake"); + Assertions.assertTrue(frames.stream().anyMatch(FrameStub::isGoAway), "Default timeout path must emit GOAWAY"); + } + + @Test + void testKeepAliveActivatesAfterSettingsAckedSetsIdleTimeout() throws Exception { + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))).thenReturn(0); + + final Timeout idleTime = Timeout.ofMilliseconds(50); + final Timeout ackTimeout = Timeout.ofMilliseconds(20); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + completeSettingsHandshake(mux); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(idleTime)); + Mockito.verify(protocolIOSession, Mockito.never()).setSocketTimeout(ArgumentMatchers.eq(ackTimeout)); + } + + @Test + void testKeepAliveIdleTimeoutSendsPingAndSetsAckTimeout() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofMilliseconds(50); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + completeSettingsHandshake(mux); + + writes.clear(); + Thread.sleep(idleTime.toMilliseconds() + 10); + + mux.onTimeout(idleTime); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(ackTimeout)); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().anyMatch(f -> f.isPing() && !f.isAck()), "Must emit keepalive PING"); + Assertions.assertTrue(mux.isOpen(), "Connection should still be open after sending PING"); + } + + @Test + void testKeepAlivePingAckReturnsToIdleTimeout() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofMilliseconds(50); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + completeSettingsHandshake(mux); + + writes.clear(); + Thread.sleep(idleTime.toMilliseconds() + 10); + mux.onTimeout(idleTime); + + final List frames = parseFrames(concat(writes)); + final FrameStub ping = frames.stream().filter(f -> f.isPing() && !f.isAck()).findFirst().orElse(null); + Assertions.assertNotNull(ping, "Expected a keepalive PING frame"); + Assertions.assertEquals(8, ping.payload.length, "PING payload must be 8 bytes"); + + // Feed an ACK with the same 8 bytes + final RawFrame pingAck = new RawFrame(FrameType.PING.getValue(), FrameFlag.ACK.getValue(), 0, ByteBuffer.wrap(ping.payload)); + feedFrame(mux, pingAck); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(idleTime)); + } + + @Test + void testKeepAliveAckTimeoutShutsDownAndFailsStreams() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofMilliseconds(20); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + completeSettingsHandshake(mux); + + // Ensure at least one live stream to be failed + final H2StreamChannel channel = mux.createChannel(1); + mux.createStream(channel, streamHandler); + + writes.clear(); + Thread.sleep(idleTime.toMilliseconds() + 10); + mux.onTimeout(idleTime); // send PING, awaiting ACK + writes.clear(); + + // No ACK arrives -> next timeout closes via keepalive path (GOAWAY + fail streams) + mux.onTimeout(ackTimeout); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().anyMatch(FrameStub::isGoAway), "Must emit GOAWAY on ping ACK timeout"); + + Mockito.verify(streamHandler, Mockito.atLeastOnce()).failed(exceptionCaptor.capture()); + Assertions.assertInstanceOf(H2StreamResetException.class, exceptionCaptor.getValue()); + + Assertions.assertFalse(mux.isOpen(), "Connection must not be open after keepalive shutdown"); + } + + @Test + void testKeepAliveDisabledNeverEmitsPing() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom() + .setPingPolicy(H2PingPolicy.disabled()) + .build(); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + + mux.onConnect(); + writes.clear(); + + mux.onTimeout(Timeout.ofMilliseconds(1)); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Disabled policy must never emit PING"); + } + + } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java new file mode 100644 index 0000000000..3968c42c43 --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java @@ -0,0 +1,357 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.testing.nio; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.LockSupport; + +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.support.BasicRequestBuilder; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.config.H2PingPolicy; +import org.apache.hc.core5.http2.nio.AsyncPingHandler; +import org.apache.hc.core5.http2.nio.command.PingCommand; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.testing.extension.nio.H2TestResources; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class TestH2KeepAlivePingPolicyIT { + + private static final Timeout TIMEOUT = Timeout.ofSeconds(30); + + @RegisterExtension + private final H2TestResources resources = new H2TestResources(URIScheme.HTTP, TIMEOUT); + + @Test + void keepAlivePing_keepsConnectionOpenPastIdleTimeout() throws Exception { + final H2TestServer server = resources.server(); + final H2TestClient client = resources.client(); + + server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) { + @Override + protected void handle( + final Message request, + final AsyncServerRequestHandler.ResponseTrigger responseTrigger, + final HttpContext context) throws IOException, HttpException { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(HttpStatus.SC_OK) + .setEntity("OK", ContentType.TEXT_PLAIN) + .build(), + context); + } + }); + + final Timeout idleTime = Timeout.ofMilliseconds(200); + final Timeout ackTimeout = Timeout.ofSeconds(2); + + final H2PingPolicy pingPolicy = H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setPingPolicy(pingPolicy) + .build(); + + server.configure(h2Config); + final InetSocketAddress serverEndpoint = server.start(); + + client.configure(h2Config); + client.start(); + + final IOSession ioSession = client.requestSession( + new HttpHost("localhost", serverEndpoint.getPort()), + TIMEOUT, + null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + + // Make the inactivity timeout aggressive; keep-alive must prevent it from killing the session. + ioSession.setSocketTimeout(idleTime); + + try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) { + final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort()); + + final Message r1 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r1.getHead().getCode()); + Assertions.assertEquals("OK", r1.getBody()); + + parkAtLeast(idleTime.toMilliseconds() * 6L); + + Assertions.assertTrue(ioSession.isOpen(), "Expected session to stay open with keep-alive enabled"); + + final Message r2 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r2.getHead().getCode()); + Assertions.assertEquals("OK", r2.getBody()); + } + } + + @Test + void keepAlivePing_disabled_connectionClosesOnIdleTimeout() throws Exception { + final H2TestServer server = resources.server(); + final H2TestClient client = resources.client(); + + server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) { + @Override + protected void handle( + final Message request, + final AsyncServerRequestHandler.ResponseTrigger responseTrigger, + final HttpContext context) throws IOException, HttpException { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(HttpStatus.SC_OK) + .setEntity("OK", ContentType.TEXT_PLAIN) + .build(), + context); + } + }); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + // pingPolicy is intentionally not set (disabled) + .build(); + + server.configure(h2Config); + final InetSocketAddress serverEndpoint = server.start(); + + client.configure(h2Config); + client.start(); + + final IOSession ioSession = client.requestSession( + new HttpHost("localhost", serverEndpoint.getPort()), + TIMEOUT, + null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + + final Timeout idleTimeout = Timeout.ofMilliseconds(200); + ioSession.setSocketTimeout(idleTimeout); + + try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) { + final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort()); + + final Message r1 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r1.getHead().getCode()); + Assertions.assertEquals("OK", r1.getBody()); + + awaitTrue(() -> !ioSession.isOpen(), Timeout.ofSeconds(5), "Expected session to close without keep-alive"); + + Assertions.assertFalse(ioSession.isOpen(), "Expected session to close without keep-alive"); + + final Future> f = executeHelloAsync(streamEndpoint, target); + Assertions.assertThrows(ExecutionException.class, () -> f.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); + } + } + + @Test + void keepAlivePing_enabled_doesNotStealAckFromExplicitPingCommand() throws Exception { + final H2TestServer server = resources.server(); + final H2TestClient client = resources.client(); + + server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) { + @Override + protected void handle( + final Message request, + final AsyncServerRequestHandler.ResponseTrigger responseTrigger, + final HttpContext context) throws IOException, HttpException { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(HttpStatus.SC_OK) + .setEntity("OK", ContentType.TEXT_PLAIN) + .build(), + context); + } + }); + + final Timeout idleTime = Timeout.ofMilliseconds(100); + final Timeout ackTimeout = Timeout.ofSeconds(2); + + final H2PingPolicy pingPolicy = H2PingPolicy.custom() + .setIdleTime(idleTime) + .setAckTimeout(ackTimeout) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setPingPolicy(pingPolicy) + .build(); + + server.configure(h2Config); + final InetSocketAddress serverEndpoint = server.start(); + + client.configure(h2Config); + client.start(); + + final IOSession ioSession = client.requestSession( + new HttpHost("localhost", serverEndpoint.getPort()), + TIMEOUT, + null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + + ioSession.setSocketTimeout(idleTime); + + try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) { + final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort()); + + // Warm-up to complete the HTTP/2 session & SETTINGS handshake. + final Message r1 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r1.getHead().getCode()); + Assertions.assertEquals("OK", r1.getBody()); + + // Give the keep-alive logic a chance to become active (no hard assumptions about socket timeout changes). + parkAtLeast(idleTime.toMilliseconds() * 3L); + + final byte[] expected = new byte[]{0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55}; + final CompletableFuture acked = new CompletableFuture<>(); + + final AsyncPingHandler handler = new AsyncPingHandler() { + + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(expected).asReadOnlyBuffer(); + } + + @Override + public void consumeResponse(final ByteBuffer feedback) throws IOException, HttpException { + if (feedback == null || feedback.remaining() != expected.length) { + acked.completeExceptionally(new AssertionError("Unexpected ping ACK payload")); + return; + } + final ByteBuffer dup = feedback.slice(); + final byte[] actual = new byte[expected.length]; + dup.get(actual); + for (int i = 0; i < expected.length; i++) { + if (actual[i] != expected[i]) { + acked.completeExceptionally(new AssertionError("Ping ACK payload mismatch")); + return; + } + } + acked.complete(null); + } + + @Override + public void failed(final Exception cause) { + acked.completeExceptionally(cause); + } + + @Override + public void cancel() { + acked.cancel(false); + } + }; + + ioSession.enqueue(new PingCommand(handler), Command.Priority.NORMAL); + ioSession.setEvent(SelectionKey.OP_WRITE); + + try { + acked.get(5, TimeUnit.SECONDS); + } catch (final TimeoutException ex) { + Assertions.fail("Timed out waiting for explicit PING ACK"); + } + + // Still usable. + Assertions.assertTrue(ioSession.isOpen(), "Expected session to stay open"); + final Message r2 = executeHello(streamEndpoint, target); + Assertions.assertEquals(200, r2.getHead().getCode()); + Assertions.assertEquals("OK", r2.getBody()); + } + } + + @Test + void keepAlivePingPolicy_rejectsDisabledAckTimeoutWhenIdleEnabled() { + Assertions.assertThrows(IllegalArgumentException.class, () -> H2PingPolicy.custom() + .setIdleTime(Timeout.ofSeconds(1)) + .setAckTimeout(Timeout.DISABLED) + .build()); + } + + private static Message executeHello( + final ClientSessionEndpoint endpoint, + final HttpHost target) throws Exception { + final Future> f = executeHelloAsync(endpoint, target); + return f.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + } + + private static Future> executeHelloAsync( + final ClientSessionEndpoint endpoint, + final HttpHost target) { + + final org.apache.hc.core5.http.message.BasicHttpRequest request = BasicRequestBuilder.get() + .setHttpHost(target) + .setPath("/hello") + .build(); + + return endpoint.execute( + new BasicRequestProducer(request, null), + new BasicResponseConsumer(new StringAsyncEntityConsumer()), + null); + } + + private static void parkAtLeast(final long millis) { + final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis); + while (System.nanoTime() < deadlineNanos) { + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10)); + } + } + + private interface Condition { + boolean get(); + } + + private static void awaitTrue(final Condition condition, final Timeout timeout, final String message) { + final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout.toMilliseconds()); + while (System.nanoTime() < deadlineNanos) { + if (condition.get()) { + return; + } + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + } + Assertions.fail(message); + } + +} From 7a89923ba5ed29dbfb4797a2c6b34c3d2f53c88d Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Sun, 1 Feb 2026 10:53:18 +0100 Subject: [PATCH 2/2] Update H2 keepalive to use validateAfterInactivity --- .../hc/core5/http2/config/H2Config.java | 35 +- .../hc/core5/http2/config/H2PingPolicy.java | 103 ----- .../impl/nio/AbstractH2StreamMultiplexer.java | 118 ++++-- .../impl/nio/ClientH2StreamMultiplexer.java | 20 +- .../nio/ClientH2StreamMultiplexerFactory.java | 35 +- .../bootstrap/H2MultiplexingRequester.java | 10 + .../H2MultiplexingRequesterBootstrap.java | 7 +- .../H2KeepAlivePingClientExample.java | 321 +++++++++------- .../nio/TestAbstractH2StreamMultiplexer.java | 83 ++-- .../nio/TestH2KeepAlivePingPolicyIT.java | 357 ------------------ 10 files changed, 360 insertions(+), 729 deletions(-) delete mode 100644 httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java delete mode 100644 httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java index e8ef0dd4b4..a801cc51fe 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java @@ -51,11 +51,10 @@ public class H2Config { private final int maxHeaderListSize; private final boolean compressionEnabled; private final int maxContinuations; - private final H2PingPolicy pingPolicy; H2Config(final int headerTableSize, final boolean pushEnabled, final int maxConcurrentStreams, final int initialWindowSize, final int maxFrameSize, final int maxHeaderListSize, - final boolean compressionEnabled, final int maxContinuations, final H2PingPolicy pingPolicy) { + final boolean compressionEnabled, final int maxContinuations) { super(); this.headerTableSize = headerTableSize; this.pushEnabled = pushEnabled; @@ -65,7 +64,6 @@ public class H2Config { this.maxHeaderListSize = maxHeaderListSize; this.compressionEnabled = compressionEnabled; this.maxContinuations = maxContinuations; - this.pingPolicy = pingPolicy; } public int getHeaderTableSize() { @@ -100,15 +98,6 @@ public int getMaxContinuations() { return maxContinuations; } - /** - * Optional keep-alive PING policy. - * - * @since 5.5 - */ - public H2PingPolicy getPingPolicy() { - return pingPolicy; - } - @Override public String toString() { final StringBuilder builder = new StringBuilder(); @@ -120,7 +109,6 @@ public String toString() { .append(", maxHeaderListSize=").append(this.maxHeaderListSize) .append(", compressionEnabled=").append(this.compressionEnabled) .append(", maxContinuations=").append(this.maxContinuations) - .append(", pingPolicy=").append(this.pingPolicy) .append("]"); return builder.toString(); } @@ -154,9 +142,7 @@ public static H2Config.Builder copy(final H2Config config) { .setInitialWindowSize(config.getInitialWindowSize()) .setMaxFrameSize(config.getMaxFrameSize()) .setMaxHeaderListSize(config.getMaxHeaderListSize()) - .setCompressionEnabled(config.isCompressionEnabled()) - .setMaxContinuations(config.getMaxContinuations()) - .setPingPolicy(config.getPingPolicy()); + .setCompressionEnabled(config.isCompressionEnabled()); } public static class Builder { @@ -169,7 +155,6 @@ public static class Builder { private int maxHeaderListSize; private boolean compressionEnabled; private int maxContinuations; - private H2PingPolicy pingPolicy; Builder() { this.headerTableSize = INIT_HEADER_TABLE_SIZE * 2; @@ -180,7 +165,6 @@ public static class Builder { this.maxHeaderListSize = FrameConsts.MAX_FRAME_SIZE; this.compressionEnabled = true; this.maxContinuations = 100; - this.pingPolicy = null; } public Builder setHeaderTableSize(final int headerTableSize) { @@ -227,7 +211,7 @@ public Builder setCompressionEnabled(final boolean compressionEnabled) { * Sets max limit on number of continuations. *

value zero represents no limit

* - * @since 5.4 + * @since 5,4 */ public Builder setMaxContinuations(final int maxContinuations) { Args.positive(maxContinuations, "Max continuations"); @@ -235,16 +219,6 @@ public Builder setMaxContinuations(final int maxContinuations) { return this; } - /** - * Sets optional keep-alive PING policy. - * - * @since 5.5 - */ - public Builder setPingPolicy(final H2PingPolicy pingPolicy) { - this.pingPolicy = pingPolicy; - return this; - } - public H2Config build() { return new H2Config( headerTableSize, @@ -254,8 +228,7 @@ public H2Config build() { maxFrameSize, maxHeaderListSize, compressionEnabled, - maxContinuations, - pingPolicy); + maxContinuations); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java deleted file mode 100644 index fc81908fd6..0000000000 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2PingPolicy.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * ==================================================================== - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * ==================================================================== - * - * This software consists of voluntary contributions made by many - * individuals on behalf of the Apache Software Foundation. For more - * information on the Apache Software Foundation, please see - * . - * - */ - -package org.apache.hc.core5.http2.config; - -import org.apache.hc.core5.util.Args; -import org.apache.hc.core5.util.TimeValue; -import org.apache.hc.core5.util.Timeout; - -/** - * HTTP/2 keep-alive ping policy. - * - * @since 5.5 - */ -public final class H2PingPolicy { - - private static final H2PingPolicy DISABLED = new H2PingPolicy(Timeout.DISABLED, Timeout.DISABLED); - - private final Timeout idleTime; - private final Timeout ackTimeout; - - private H2PingPolicy(final Timeout idleTime, final Timeout ackTimeout) { - this.idleTime = idleTime; - this.ackTimeout = ackTimeout; - } - - public static H2PingPolicy disabled() { - return DISABLED; - } - - public static Builder custom() { - return new Builder(); - } - - public Timeout getIdleTime() { - return idleTime; - } - - public Timeout getAckTimeout() { - return ackTimeout; - } - - public boolean isEnabled() { - return isActive(idleTime) && isActive(ackTimeout); - } - - private static boolean isActive(final Timeout timeout) { - return timeout != null && timeout.isEnabled() && TimeValue.isPositive(timeout); - } - - public static final class Builder { - - private Timeout idleTime; - private Timeout ackTimeout; - - private Builder() { - this.idleTime = Timeout.DISABLED; - this.ackTimeout = Timeout.DISABLED; - } - - public Builder setIdleTime(final Timeout idleTime) { - this.idleTime = Args.notNull(idleTime, "idleTime"); - return this; - } - - public Builder setAckTimeout(final Timeout ackTimeout) { - this.ackTimeout = Args.notNull(ackTimeout, "ackTimeout"); - return this; - } - - public H2PingPolicy build() { - if (isActive(idleTime)) { - Args.check(isActive(ackTimeout), "ackTimeout must be positive when idleTime is enabled"); - } - return new H2PingPolicy(idleTime, ackTimeout); - } - } - -} diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 5cf88ecf4e..1a97fb7bd5 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -77,7 +77,6 @@ import org.apache.hc.core5.http2.H2StreamTimeoutException; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; -import org.apache.hc.core5.http2.config.H2PingPolicy; import org.apache.hc.core5.http2.config.H2Setting; import org.apache.hc.core5.http2.frame.FrameFactory; import org.apache.hc.core5.http2.frame.FrameFlag; @@ -128,6 +127,9 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private final H2StreamListener streamListener; private final KeepAlivePingSupport keepAlivePingSupport; + private final Timeout validateAfterInactivity; + private static final Timeout KEEP_ALIVE_PING_ACK_TIMEOUT = Timeout.ofSeconds(5); + private ConnectionHandshake connState = ConnectionHandshake.READY; private SettingsHandshake localSettingState = SettingsHandshake.READY; private SettingsHandshake remoteSettingState = SettingsHandshake.READY; @@ -150,7 +152,6 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private static final long STREAM_TIMEOUT_GRANULARITY_MILLIS = 1000; private long lastStreamTimeoutCheckMillis; - AbstractH2StreamMultiplexer( final ProtocolIOSession ioSession, final FrameFactory frameFactory, @@ -159,6 +160,18 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } final CharCodingConfig charCodingConfig, final H2Config h2Config, final H2StreamListener streamListener) { + this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, null); + } + + AbstractH2StreamMultiplexer( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final StreamIdGenerator idGenerator, + final HttpProcessor httpProcessor, + final CharCodingConfig charCodingConfig, + final H2Config h2Config, + final H2StreamListener streamListener, + final Timeout validateAfterInactivity) { this.ioSession = Args.notNull(ioSession, "IO session"); this.frameFactory = Args.notNull(frameFactory, "Frame factory"); this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); @@ -186,9 +199,10 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } this.lowMark = H2Config.INIT.getInitialWindowSize() / 2; this.streamListener = streamListener; - final H2PingPolicy pingPolicy = this.localConfig.getPingPolicy(); - this.keepAlivePingSupport = pingPolicy != null && pingPolicy.isEnabled() - ? new KeepAlivePingSupport(pingPolicy) + this.validateAfterInactivity = validateAfterInactivity; + + this.keepAlivePingSupport = this.validateAfterInactivity != null && this.validateAfterInactivity.isEnabled() + ? new KeepAlivePingSupport(this.validateAfterInactivity, KEEP_ALIVE_PING_ACK_TIMEOUT) : null; } @@ -659,13 +673,17 @@ private void executeShutdown(final ShutdownCommand shutdownCommand) throws IOExc } } - private void executePing(final PingCommand pingCommand) throws IOException { - final AsyncPingHandler handler = pingCommand.getHandler(); - pingHandlers.add(handler); - final RawFrame ping = frameFactory.createPing(handler.getData()); + private void sendPing(final AsyncPingHandler handler) throws IOException { + final AsyncPingHandler pingHandler = Args.notNull(handler, "PING handler"); + pingHandlers.add(pingHandler); + final RawFrame ping = frameFactory.createPing(pingHandler.getData()); commitFrame(ping); } + private void executePing(final PingCommand pingCommand) throws IOException { + sendPing(pingCommand.getHandler()); + } + private void executeStaleCheck(final StaleCheckCommand staleCheckCommand) { final Consumer callback = staleCheckCommand.getCallback(); callback.accept(ioSession.isOpen() && @@ -1374,15 +1392,17 @@ private final class KeepAlivePingSupport { private long pingSeq; private long expectedAckSeq; - KeepAlivePingSupport(final H2PingPolicy policy) { - Args.notNull(policy, "PING policy"); - this.idleTime = policy.getIdleTime(); - this.ackTimeout = policy.getAckTimeout(); + private AsyncPingHandler keepAliveHandler; + + KeepAlivePingSupport(final Timeout idleTime, final Timeout ackTimeout) { + this.idleTime = Args.notNull(idleTime, "Idle time"); + this.ackTimeout = Args.notNull(ackTimeout, "ACK timeout"); this.active = false; this.awaitingAck = false; this.lastActivityNanos = System.nanoTime(); this.pingSeq = 0L; this.expectedAckSeq = 0L; + this.keepAliveHandler = null; } void activateIfReady() { @@ -1405,22 +1425,12 @@ void onFrameInput(final RawFrame frame) { if (awaitingAck) { if (!(frame.getType() == FrameType.PING.getValue() && frame.isFlagSet(FrameFlag.ACK))) { awaitingAck = false; + clearKeepAliveHandler(); } } ioSession.setSocketTimeout(idleTime); } - void onActivity() { - if (!active) { - return; - } - lastActivityNanos = System.nanoTime(); - if (awaitingAck) { - awaitingAck = false; - } - ioSession.setSocketTimeout(idleTime); - } - boolean consumePingAck(final ByteBuffer payload) { if (!active || !awaitingAck) { return false; @@ -1432,7 +1442,10 @@ boolean consumePingAck(final ByteBuffer payload) { if (ack != expectedAckSeq) { return false; } - onActivity(); + awaitingAck = false; + clearKeepAliveHandler(); + lastActivityNanos = System.nanoTime(); + ioSession.setSocketTimeout(idleTime); return true; } @@ -1447,7 +1460,7 @@ boolean onTimeout(final Timeout timeout) throws IOException { return true; } - final long idleNanos = idleTime.toMilliseconds() * 1_000_000L; + final long idleNanos = idleTime.toNanoseconds(); if (idleNanos > 0L) { final long elapsed = System.nanoTime() - lastActivityNanos; if (elapsed < idleNanos) { @@ -1458,24 +1471,59 @@ boolean onTimeout(final Timeout timeout) throws IOException { } awaitingAck = true; - sendPing(); + sendKeepAlivePing(); ioSession.setSocketTimeout(ackTimeout); return true; } - private void sendPing() throws IOException { + private void sendKeepAlivePing() throws IOException { final long v = ++pingSeq; expectedAckSeq = v; - final ByteBuffer payload = ByteBuffer.allocate(PING_DATA_LEN); - payload.putLong(v); - payload.flip(); + final ByteBuffer data = ByteBuffer.allocate(PING_DATA_LEN); + data.putLong(v); + data.flip(); + + final ByteBuffer ro = data.asReadOnlyBuffer(); + + keepAliveHandler = new AsyncPingHandler() { + + @Override + public ByteBuffer getData() { + return ro.duplicate(); + } + + @Override + public void consumeResponse(final ByteBuffer payload) { + // Keep-alive ACK is consumed by consumePingAck(...) before the handler queue is polled. + } + + @Override + public void failed(final Exception cause) { + // No-op; connection error handling is performed elsewhere. + } + + @Override + public void cancel() { + // No-op. + } + + }; - final RawFrame ping = frameFactory.createPing(payload); - commitFrame(ping); + // Use the same PING sending path as PingCommand (existing command path). + sendPing(keepAliveHandler); + } + + private void clearKeepAliveHandler() { + final AsyncPingHandler handler = keepAliveHandler; + keepAliveHandler = null; + if (handler != null) { + pingHandlers.remove(handler); + } } private void shutdownKeepAlive(final Timeout timeout) throws IOException { + clearKeepAliveHandler(); connState = ConnectionHandshake.SHUTDOWN; final RawFrame goAway = frameFactory.createGoAway( @@ -1759,6 +1807,10 @@ public String toString() { return buf.toString(); } + public boolean isLocalReset() { + return localResetTime > 0; + } + } private void checkStreamTimeouts(final long nowNanos) throws IOException { diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java index ab8ecc48a0..7b313f8ee4 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java @@ -46,6 +46,7 @@ import org.apache.hc.core5.http2.frame.FrameFactory; import org.apache.hc.core5.http2.frame.StreamIdGenerator; import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.util.Timeout; /** * I/O event handler for events fired by {@link ProtocolIOSession} that implements @@ -66,18 +67,31 @@ public ClientH2StreamMultiplexer( final HandlerFactory pushHandlerFactory, final H2Config h2Config, final CharCodingConfig charCodingConfig, - final H2StreamListener streamListener) { - super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener); + final H2StreamListener streamListener, + final Timeout validateAfterInactivity) { + super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener, + validateAfterInactivity); this.pushHandlerFactory = pushHandlerFactory; } + public ClientH2StreamMultiplexer( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final HttpProcessor httpProcessor, + final HandlerFactory pushHandlerFactory, + final H2Config h2Config, + final CharCodingConfig charCodingConfig, + final H2StreamListener streamListener) { + this(ioSession, frameFactory, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null); + } + public ClientH2StreamMultiplexer( final ProtocolIOSession ioSession, final HttpProcessor httpProcessor, final HandlerFactory pushHandlerFactory, final H2Config h2Config, final CharCodingConfig charCodingConfig) { - this(ioSession, DefaultFrameFactory.INSTANCE, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, null); + this(ioSession, DefaultFrameFactory.INSTANCE, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, null, null); } public ClientH2StreamMultiplexer( diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java index c2b9afeea3..a2315554ee 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java @@ -30,6 +30,7 @@ import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.HandlerFactory; @@ -39,6 +40,8 @@ import org.apache.hc.core5.http2.frame.FrameFactory; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; /** * {@link ClientH2StreamMultiplexer} factory. @@ -55,6 +58,7 @@ public final class ClientH2StreamMultiplexerFactory { private final CharCodingConfig charCodingConfig; private final H2StreamListener streamListener; private final FrameFactory frameFactory; + private final Supplier validateAfterInactivitySupplier; public ClientH2StreamMultiplexerFactory( final HttpProcessor httpProcessor, @@ -62,13 +66,25 @@ public ClientH2StreamMultiplexerFactory( final H2Config h2Config, final CharCodingConfig charCodingConfig, final H2StreamListener streamListener, - final FrameFactory frameFactory) { + final FrameFactory frameFactory, + final Supplier validateAfterInactivitySupplier) { this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); this.pushHandlerFactory = pushHandlerFactory; this.h2Config = h2Config != null ? h2Config : H2Config.DEFAULT; this.charCodingConfig = charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT; this.streamListener = streamListener; this.frameFactory = frameFactory != null ? frameFactory : DefaultFrameFactory.INSTANCE; + this.validateAfterInactivitySupplier = validateAfterInactivitySupplier; + } + + public ClientH2StreamMultiplexerFactory( + final HttpProcessor httpProcessor, + final HandlerFactory pushHandlerFactory, + final H2Config h2Config, + final CharCodingConfig charCodingConfig, + final H2StreamListener streamListener, + final FrameFactory frameFactory) { + this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, frameFactory, null); } public ClientH2StreamMultiplexerFactory( @@ -78,14 +94,14 @@ public ClientH2StreamMultiplexerFactory( final CharCodingConfig charCodingConfig, final H2StreamListener streamListener ) { - this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null); + this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null, null); } public ClientH2StreamMultiplexerFactory( final HttpProcessor httpProcessor, final HandlerFactory pushHandlerFactory, final H2StreamListener streamListener) { - this(httpProcessor, pushHandlerFactory, null, null, streamListener, null); + this(httpProcessor, pushHandlerFactory, null, null, streamListener, null, null); } public ClientH2StreamMultiplexerFactory( @@ -96,7 +112,18 @@ public ClientH2StreamMultiplexerFactory( public ClientH2StreamMultiplexer create(final ProtocolIOSession ioSession) { return new ClientH2StreamMultiplexer(ioSession, frameFactory, httpProcessor, - pushHandlerFactory, h2Config, charCodingConfig, streamListener); + pushHandlerFactory, h2Config, charCodingConfig, streamListener, resolveValidateAfterInactivity()); + } + + private Timeout resolveValidateAfterInactivity() { + if (validateAfterInactivitySupplier == null) { + return null; + } + final TimeValue timeValue = validateAfterInactivitySupplier.get(); + if (!TimeValue.isNonNegative(timeValue)) { + return null; + } + return timeValue.toTimeout(); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java index 476ecacbd2..f00b612be8 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java @@ -32,6 +32,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -87,6 +88,7 @@ public class H2MultiplexingRequester extends AsyncRequester { private final H2ConnPool connPool; + private final AtomicReference validateAfterInactivityRef; /** * Hard cap on per-connection queued / in-flight commands. @@ -108,11 +110,16 @@ public H2MultiplexingRequester( final TlsStrategy tlsStrategy, final IOReactorMetricsListener threadPoolListener, final IOWorkerSelector workerSelector, + final AtomicReference validateAfterInactivityRef, final int maxCommandsPerConnection) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener, workerSelector); this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy); + this.validateAfterInactivityRef = validateAfterInactivityRef; + if (this.validateAfterInactivityRef != null) { + this.validateAfterInactivityRef.set(this.connPool.getValidateAfterInactivity()); + } this.maxCommandsPerConnection = maxCommandsPerConnection; } @@ -130,6 +137,9 @@ public TimeValue getValidateAfterInactivity() { public void setValidateAfterInactivity(final TimeValue timeValue) { connPool.setValidateAfterInactivity(timeValue); + if (validateAfterInactivityRef != null) { + validateAfterInactivityRef.set(timeValue); + } } /** diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java index f48900317b..c4523c8000 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.core5.annotation.Experimental; import org.apache.hc.core5.function.Callback; @@ -53,6 +54,7 @@ import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.TimeValue; /** * {@link H2MultiplexingRequester} bootstrap. @@ -277,13 +279,15 @@ public final H2MultiplexingRequesterBootstrap registerVirtual(final String hostn public H2MultiplexingRequester create() { final RequestRouter> requestRouter = RequestRouter.create( null, uriPatternType, routeEntries, RequestRouter.LOCAL_AUTHORITY_RESOLVER, null); + final AtomicReference validateAfterInactivityRef = new AtomicReference<>(TimeValue.NEG_ONE_MILLISECOND); final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory = new ClientH2StreamMultiplexerFactory( httpProcessor != null ? httpProcessor : H2Processors.client(), new DefaultAsyncPushConsumerFactory(requestRouter), h2Config != null ? h2Config : H2Config.DEFAULT, charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT, streamListener, - frameFactory); + frameFactory, + validateAfterInactivityRef::get); return new H2MultiplexingRequester( ioReactorConfig, (ioSession, attachment) -> @@ -295,6 +299,7 @@ public H2MultiplexingRequester create() { tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(), threadPoolListener, null, + validateAfterInactivityRef, maxCommandsPerConnection); } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java index 6fb7505d8c..396a08af98 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java @@ -26,232 +26,263 @@ */ package org.apache.hc.core5.http2.examples; -import java.io.IOException; import java.net.URI; -import java.nio.ByteBuffer; +import java.time.Instant; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hc.core5.http.EntityDetails; import org.apache.hc.core5.http.Header; -import org.apache.hc.core5.http.HttpConnection; -import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; -import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.Message; import org.apache.hc.core5.http.nio.AsyncRequestProducer; -import org.apache.hc.core5.http.nio.CapacityChannel; -import org.apache.hc.core5.http.nio.DataStreamChannel; -import org.apache.hc.core5.http.nio.RequestChannel; import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; -import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpCoreContext; -import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.http2.config.H2Config; -import org.apache.hc.core5.http2.config.H2PingPolicy; import org.apache.hc.core5.http2.frame.FrameFlag; import org.apache.hc.core5.http2.frame.FrameType; import org.apache.hc.core5.http2.frame.RawFrame; import org.apache.hc.core5.http2.impl.nio.H2StreamListener; -import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.IOSessionListener; +import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; /** - * Minimal example demonstrating HTTP/2 connection keepalive using {@link H2PingPolicy}. - *

- * The client configures an idle timeout and an ACK timeout. When the underlying HTTP/2 - * connection becomes idle, the I/O reactor triggers a keepalive {@code PING}. If the - * peer responds with {@code PING[ACK]} within the configured ACK timeout, the connection - * remains usable; otherwise the connection is considered dead and is terminated by the - * transport. - *

- *

- * This example performs a single request to establish the connection and then waits - * long enough for one keepalive round-trip. It prints: - *

- *
    - *
  • the remote endpoint once,
  • - *
  • {@code >> PING} when a keepalive PING is emitted,
  • - *
  • {@code << PING[ACK]} when the ACK is received,
  • - *
  • a final counter line {@code keepalive: pingsOut=..., pingAcksIn=...}.
  • - *
- *

- * Notes: - *

- *
    - *
  • This is intentionally not a unit test; it is a runnable sanity-check and usage example.
  • - *
  • Keepalive requires HTTP/2 settings negotiation to complete; PINGs may not be emitted - * immediately on startup.
  • - *
  • Timing is inherently environment-dependent; adjust {@code idleTime}/{@code ackTimeout} - * if running on a slow or heavily loaded machine.
  • - *
+ * HTTP/2 keepalive sanity-check for HttpComponents Core (no HttpClient classes). + * + * This version enables keep-alive by setting validateAfterInactivity, which is what + * AbstractH2StreamMultiplexer currently uses to arm KeepAlivePingSupport. + * * @since 5.5 */ +public final class H2KeepAlivePingClientExample { -public class H2KeepAlivePingClientExample { + private static final URI TARGET = URI.create("https://nghttp2.org/httpbin/get"); - public static void main(final String[] args) throws Exception { + private static final class Counters { + final AtomicInteger sessionsConnected = new AtomicInteger(0); + final AtomicInteger reactorTimeoutEvents = new AtomicInteger(0); + final AtomicInteger pingsOut = new AtomicInteger(0); + final AtomicInteger pingAcksIn = new AtomicInteger(0); + final AtomicInteger goAwayIn = new AtomicInteger(0); + final AtomicInteger rstStreamIn = new AtomicInteger(0); + final AtomicInteger exceptions = new AtomicInteger(0); + } - final Timeout idleTime = Timeout.ofSeconds(1); - final Timeout ackTimeout = Timeout.ofSeconds(2); + public static void main(final String[] args) throws Exception { + // Base socket timeout BEFORE keepalive activates. final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() - .setSoTimeout(5, TimeUnit.SECONDS) + .setSoTimeout(Timeout.ofSeconds(30)) .build(); - final H2PingPolicy pingPolicy = H2PingPolicy.custom() - .setIdleTime(idleTime) - .setAckTimeout(ackTimeout) - .build(); + // Keep-alive idle time (what your multiplexer currently wires via validateAfterInactivity). + final TimeValue validateAfterInactivity = TimeValue.ofSeconds(3); final H2Config h2Config = H2Config.custom() .setPushEnabled(false) .setMaxConcurrentStreams(100) - .setPingPolicy(pingPolicy) .build(); - final AtomicBoolean remotePrinted = new AtomicBoolean(false); - final AtomicInteger pingsOut = new AtomicInteger(0); - final AtomicInteger pingAcksIn = new AtomicInteger(0); - final CountDownLatch pingAckLatch = new CountDownLatch(1); + final Counters counters = new Counters(); - final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() .setIOReactorConfig(ioReactorConfig) .setH2Config(h2Config) - .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) - .setStreamListener(new H2StreamListener() { + .setIOSessionListener(new IOSessionListener() { - private void printRemoteOnce(final HttpConnection connection) { - if (remotePrinted.compareAndSet(false, true)) { - System.out.println("remote=" + connection.getRemoteAddress()); - } + @Override + public void connected(final IOSession session) { + counters.sessionsConnected.incrementAndGet(); + log("session.connected id=" + safeId(session) + + " remote=" + session.getRemoteAddress() + + " soTimeout=" + session.getSocketTimeout()); } @Override - public void onHeaderInput(final HttpConnection connection, final int streamId, final List headers) { + public void startTls(final IOSession session) { + log("session.startTls id=" + safeId(session) + + " soTimeout=" + session.getSocketTimeout()); } @Override - public void onHeaderOutput(final HttpConnection connection, final int streamId, final List headers) { + public void inputReady(final IOSession session) { + // no-op } @Override - public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) { - printRemoteOnce(connection); - if (FrameType.valueOf(frame.getType()) == FrameType.PING && frame.isFlagSet(FrameFlag.ACK)) { - System.out.println("<< PING[ACK]"); - pingAcksIn.incrementAndGet(); - pingAckLatch.countDown(); - } + public void outputReady(final IOSession session) { + // no-op } @Override - public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) { - printRemoteOnce(connection); - if (FrameType.valueOf(frame.getType()) == FrameType.PING && !frame.isFlagSet(FrameFlag.ACK)) { - System.out.println(">> PING"); - pingsOut.incrementAndGet(); - } + public void timeout(final IOSession session) { + // Expected: keep-alive uses the reactor timer. + counters.reactorTimeoutEvents.incrementAndGet(); + log("session.timeout (reactor) id=" + safeId(session) + + " soTimeout=" + session.getSocketTimeout()); } @Override - public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + public void exception(final IOSession session, final Exception ex) { + counters.exceptions.incrementAndGet(); + log("session.exception id=" + safeId(session) + " ex=" + ex); } @Override - public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + public void disconnected(final IOSession session) { + log("session.disconnected id=" + safeId(session)); } - }) - .create(); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> requester.close(CloseMode.GRACEFUL))); + private String safeId(final IOSession session) { + try { + return session.getId(); + } catch (final RuntimeException ignore) { + return "n/a"; + } + } - requester.start(); + }) + .setStreamListener(new H2StreamListener() { - final URI requestUri = new URI("http://nghttp2.org/httpbin/post"); - final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(requestUri) - .setEntity("stuff") - .build(); - final BasicResponseConsumer responseConsumer = new BasicResponseConsumer<>(new StringAsyncEntityConsumer()); + @Override + public void onHeaderInput( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final List headers) { + // no-op + } - final CountDownLatch exchangeLatch = new CountDownLatch(1); + @Override + public void onHeaderOutput( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final List headers) { + // no-op + } - requester.execute(new AsyncClientExchangeHandler() { + @Override + public void onFrameInput( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final RawFrame frame) { + + final FrameType type = FrameType.valueOf(frame.getType()); + + if (type == FrameType.PING && frame.isFlagSet(FrameFlag.ACK)) { + counters.pingAcksIn.incrementAndGet(); + log("<< PING[ACK]"); + } else if (type == FrameType.GOAWAY) { + counters.goAwayIn.incrementAndGet(); + log("<< GOAWAY"); + } else if (type == FrameType.RST_STREAM) { + counters.rstStreamIn.incrementAndGet(); + log("<< RST_STREAM streamId=" + streamId); + } + } - @Override - public void releaseResources() { - requestProducer.releaseResources(); - responseConsumer.releaseResources(); - exchangeLatch.countDown(); - } + @Override + public void onFrameOutput( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final RawFrame frame) { - @Override - public void cancel() { - exchangeLatch.countDown(); - } + final FrameType type = FrameType.valueOf(frame.getType()); - @Override - public void failed(final Exception cause) { - exchangeLatch.countDown(); - } + if (type == FrameType.PING && !frame.isFlagSet(FrameFlag.ACK)) { + counters.pingsOut.incrementAndGet(); + log(">> PING"); + } + } - @Override - public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException { - requestProducer.sendRequest(channel, httpContext); - } + @Override + public void onInputFlowControl( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // no-op + } - @Override - public int available() { - return requestProducer.available(); - } + @Override + public void onOutputFlowControl( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // no-op + } - @Override - public void produce(final DataStreamChannel channel) throws IOException { - requestProducer.produce(channel); - } + }) + .create(); - @Override - public void consumeInformation(final HttpResponse response, final HttpContext httpContext) { - } + // IMPORTANT FIX: arm keep-alive before any connection is created. + requester.setValidateAfterInactivity(validateAfterInactivity); - @Override - public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { - responseConsumer.consumeResponse(response, entityDetails, httpContext, null); - } + requester.start(); + try { + log("requester.validateAfterInactivity=" + requester.getValidateAfterInactivity()); - @Override - public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { - responseConsumer.updateCapacity(capacityChannel); - } + final Message m1 = executeSimpleGet(requester, TARGET); + log("response1=" + m1.getHead().getCode()); - @Override - public void consume(final ByteBuffer src) throws IOException { - responseConsumer.consume(src); - } + Thread.sleep(250); - @Override - public void streamEnd(final List trailers) throws HttpException, IOException { - responseConsumer.streamEnd(trailers); - } + // Wait long enough to force keepalive activity. + final long waitMs = 13_000L; + log("waiting=" + waitMs + "ms for keepalive..."); + Thread.sleep(waitMs); - }, Timeout.ofSeconds(30), HttpCoreContext.create()); + final Message m2 = executeSimpleGet(requester, TARGET); + log("response2=" + m2.getHead().getCode()); - exchangeLatch.await(); + log("stats: sessionsConnected=" + counters.sessionsConnected.get() + + ", pingsOut=" + counters.pingsOut.get() + + ", pingAcksIn=" + counters.pingAcksIn.get() + + ", goAwayIn=" + counters.goAwayIn.get() + + ", rstStreamIn=" + counters.rstStreamIn.get() + + ", reactorTimeoutEvents=" + counters.reactorTimeoutEvents.get() + + ", exceptions=" + counters.exceptions.get()); - final long waitMs = idleTime.toMilliseconds() + ackTimeout.toMilliseconds() + 500L; - pingAckLatch.await(waitMs, TimeUnit.MILLISECONDS); + } finally { + requester.close(CloseMode.GRACEFUL); + } + } - System.out.println("keepalive: pingsOut=" + pingsOut.get() + ", pingAcksIn=" + pingAcksIn.get()); + private static Message executeSimpleGet( + final H2MultiplexingRequester requester, + final URI uri) throws Exception { + + final Timeout timeout = Timeout.ofSeconds(30); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(uri).build(); + final BasicResponseConsumer responseConsumer = + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()); + + try { + final Future> f = requester.execute( + requestProducer, + responseConsumer, + timeout, + HttpCoreContext.create(), + null); + return f.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } finally { + requestProducer.releaseResources(); + responseConsumer.releaseResources(); + } + } - requester.close(CloseMode.GRACEFUL); + private static void log(final String s) { + System.out.println(Instant.now() + " " + s); } } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 150382440a..d89ff6106d 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -57,7 +57,6 @@ import org.apache.hc.core5.http2.WritableByteChannelMock; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.http2.config.H2Param; -import org.apache.hc.core5.http2.config.H2PingPolicy; import org.apache.hc.core5.http2.config.H2Setting; import org.apache.hc.core5.http2.frame.DefaultFrameFactory; import org.apache.hc.core5.http2.frame.FrameConsts; @@ -118,7 +117,22 @@ public H2StreamMultiplexerImpl( final H2Config h2Config, final H2StreamListener streamListener, final Supplier streamHandlerSupplier) { - super(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener); + this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, + streamHandlerSupplier, null); + } + + public H2StreamMultiplexerImpl( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final StreamIdGenerator idGenerator, + final HttpProcessor httpProcessor, + final CharCodingConfig charCodingConfig, + final H2Config h2Config, + final H2StreamListener streamListener, + final Supplier streamHandlerSupplier, + final Timeout validateAfterInactivity) { + super(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, + validateAfterInactivity); this.streamHandlerSupplier = streamHandlerSupplier; } @@ -1159,18 +1173,11 @@ void testKeepAliveNotActiveBeforeSettingsHandshake() throws Exception { Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); final Timeout idleTime = Timeout.ofMilliseconds(5); - final Timeout ackTimeout = Timeout.ofMilliseconds(5); - - final H2Config h2Config = H2Config.custom() - .setPingPolicy(H2PingPolicy.custom() - .setIdleTime(idleTime) - .setAckTimeout(ackTimeout) - .build()) - .build(); final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, - httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); mux.onConnect(); writes.clear(); @@ -1190,18 +1197,12 @@ void testKeepAliveActivatesAfterSettingsAckedSetsIdleTimeout() throws Exception Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))).thenReturn(0); final Timeout idleTime = Timeout.ofMilliseconds(50); - final Timeout ackTimeout = Timeout.ofMilliseconds(20); - - final H2Config h2Config = H2Config.custom() - .setPingPolicy(H2PingPolicy.custom() - .setIdleTime(idleTime) - .setAckTimeout(ackTimeout) - .build()) - .build(); + final Timeout ackTimeout = Timeout.ofSeconds(5); final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, - httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); mux.onConnect(); completeSettingsHandshake(mux); @@ -1225,18 +1226,12 @@ void testKeepAliveIdleTimeoutSendsPingAndSetsAckTimeout() throws Exception { Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); final Timeout idleTime = Timeout.ofMilliseconds(5); - final Timeout ackTimeout = Timeout.ofMilliseconds(50); - - final H2Config h2Config = H2Config.custom() - .setPingPolicy(H2PingPolicy.custom() - .setIdleTime(idleTime) - .setAckTimeout(ackTimeout) - .build()) - .build(); + final Timeout ackTimeout = Timeout.ofSeconds(5); final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, - httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); mux.onConnect(); completeSettingsHandshake(mux); @@ -1268,18 +1263,11 @@ void testKeepAlivePingAckReturnsToIdleTimeout() throws Exception { Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); final Timeout idleTime = Timeout.ofMilliseconds(5); - final Timeout ackTimeout = Timeout.ofMilliseconds(50); - - final H2Config h2Config = H2Config.custom() - .setPingPolicy(H2PingPolicy.custom() - .setIdleTime(idleTime) - .setAckTimeout(ackTimeout) - .build()) - .build(); final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, - httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); mux.onConnect(); completeSettingsHandshake(mux); @@ -1315,18 +1303,12 @@ void testKeepAliveAckTimeoutShutsDownAndFailsStreams() throws Exception { Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); final Timeout idleTime = Timeout.ofMilliseconds(5); - final Timeout ackTimeout = Timeout.ofMilliseconds(20); - - final H2Config h2Config = H2Config.custom() - .setPingPolicy(H2PingPolicy.custom() - .setIdleTime(idleTime) - .setAckTimeout(ackTimeout) - .build()) - .build(); + final Timeout ackTimeout = Timeout.ofSeconds(5); final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, - httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); mux.onConnect(); completeSettingsHandshake(mux); @@ -1366,13 +1348,10 @@ void testKeepAliveDisabledNeverEmitsPing() throws Exception { Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); - final H2Config h2Config = H2Config.custom() - .setPingPolicy(H2PingPolicy.disabled()) - .build(); - final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, - httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler); + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + Timeout.DISABLED); mux.onConnect(); writes.clear(); diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java deleted file mode 100644 index 3968c42c43..0000000000 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestH2KeepAlivePingPolicyIT.java +++ /dev/null @@ -1,357 +0,0 @@ -/* - * ==================================================================== - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * ==================================================================== - * - * This software consists of voluntary contributions made by many - * individuals on behalf of the Apache Software Foundation. For more - * information on the Apache Software Foundation, please see - * . - * - */ - -package org.apache.hc.core5.testing.nio; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.LockSupport; - -import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.HttpException; -import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.HttpRequest; -import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.HttpStatus; -import org.apache.hc.core5.http.Message; -import org.apache.hc.core5.http.URIScheme; -import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; -import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; -import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; -import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; -import org.apache.hc.core5.http.nio.support.BasicRequestProducer; -import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; -import org.apache.hc.core5.http.protocol.HttpContext; -import org.apache.hc.core5.http.support.BasicRequestBuilder; -import org.apache.hc.core5.http2.config.H2Config; -import org.apache.hc.core5.http2.config.H2PingPolicy; -import org.apache.hc.core5.http2.nio.AsyncPingHandler; -import org.apache.hc.core5.http2.nio.command.PingCommand; -import org.apache.hc.core5.reactor.Command; -import org.apache.hc.core5.reactor.IOSession; -import org.apache.hc.core5.testing.extension.nio.H2TestResources; -import org.apache.hc.core5.util.Timeout; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -public class TestH2KeepAlivePingPolicyIT { - - private static final Timeout TIMEOUT = Timeout.ofSeconds(30); - - @RegisterExtension - private final H2TestResources resources = new H2TestResources(URIScheme.HTTP, TIMEOUT); - - @Test - void keepAlivePing_keepsConnectionOpenPastIdleTimeout() throws Exception { - final H2TestServer server = resources.server(); - final H2TestClient client = resources.client(); - - server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) { - @Override - protected void handle( - final Message request, - final AsyncServerRequestHandler.ResponseTrigger responseTrigger, - final HttpContext context) throws IOException, HttpException { - responseTrigger.submitResponse( - AsyncResponseBuilder.create(HttpStatus.SC_OK) - .setEntity("OK", ContentType.TEXT_PLAIN) - .build(), - context); - } - }); - - final Timeout idleTime = Timeout.ofMilliseconds(200); - final Timeout ackTimeout = Timeout.ofSeconds(2); - - final H2PingPolicy pingPolicy = H2PingPolicy.custom() - .setIdleTime(idleTime) - .setAckTimeout(ackTimeout) - .build(); - - final H2Config h2Config = H2Config.custom() - .setPushEnabled(false) - .setPingPolicy(pingPolicy) - .build(); - - server.configure(h2Config); - final InetSocketAddress serverEndpoint = server.start(); - - client.configure(h2Config); - client.start(); - - final IOSession ioSession = client.requestSession( - new HttpHost("localhost", serverEndpoint.getPort()), - TIMEOUT, - null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); - - // Make the inactivity timeout aggressive; keep-alive must prevent it from killing the session. - ioSession.setSocketTimeout(idleTime); - - try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) { - final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort()); - - final Message r1 = executeHello(streamEndpoint, target); - Assertions.assertEquals(200, r1.getHead().getCode()); - Assertions.assertEquals("OK", r1.getBody()); - - parkAtLeast(idleTime.toMilliseconds() * 6L); - - Assertions.assertTrue(ioSession.isOpen(), "Expected session to stay open with keep-alive enabled"); - - final Message r2 = executeHello(streamEndpoint, target); - Assertions.assertEquals(200, r2.getHead().getCode()); - Assertions.assertEquals("OK", r2.getBody()); - } - } - - @Test - void keepAlivePing_disabled_connectionClosesOnIdleTimeout() throws Exception { - final H2TestServer server = resources.server(); - final H2TestClient client = resources.client(); - - server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) { - @Override - protected void handle( - final Message request, - final AsyncServerRequestHandler.ResponseTrigger responseTrigger, - final HttpContext context) throws IOException, HttpException { - responseTrigger.submitResponse( - AsyncResponseBuilder.create(HttpStatus.SC_OK) - .setEntity("OK", ContentType.TEXT_PLAIN) - .build(), - context); - } - }); - - final H2Config h2Config = H2Config.custom() - .setPushEnabled(false) - // pingPolicy is intentionally not set (disabled) - .build(); - - server.configure(h2Config); - final InetSocketAddress serverEndpoint = server.start(); - - client.configure(h2Config); - client.start(); - - final IOSession ioSession = client.requestSession( - new HttpHost("localhost", serverEndpoint.getPort()), - TIMEOUT, - null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); - - final Timeout idleTimeout = Timeout.ofMilliseconds(200); - ioSession.setSocketTimeout(idleTimeout); - - try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) { - final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort()); - - final Message r1 = executeHello(streamEndpoint, target); - Assertions.assertEquals(200, r1.getHead().getCode()); - Assertions.assertEquals("OK", r1.getBody()); - - awaitTrue(() -> !ioSession.isOpen(), Timeout.ofSeconds(5), "Expected session to close without keep-alive"); - - Assertions.assertFalse(ioSession.isOpen(), "Expected session to close without keep-alive"); - - final Future> f = executeHelloAsync(streamEndpoint, target); - Assertions.assertThrows(ExecutionException.class, () -> f.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); - } - } - - @Test - void keepAlivePing_enabled_doesNotStealAckFromExplicitPingCommand() throws Exception { - final H2TestServer server = resources.server(); - final H2TestClient client = resources.client(); - - server.register("/hello", () -> new MessageExchangeHandler(new DiscardingEntityConsumer()) { - @Override - protected void handle( - final Message request, - final AsyncServerRequestHandler.ResponseTrigger responseTrigger, - final HttpContext context) throws IOException, HttpException { - responseTrigger.submitResponse( - AsyncResponseBuilder.create(HttpStatus.SC_OK) - .setEntity("OK", ContentType.TEXT_PLAIN) - .build(), - context); - } - }); - - final Timeout idleTime = Timeout.ofMilliseconds(100); - final Timeout ackTimeout = Timeout.ofSeconds(2); - - final H2PingPolicy pingPolicy = H2PingPolicy.custom() - .setIdleTime(idleTime) - .setAckTimeout(ackTimeout) - .build(); - - final H2Config h2Config = H2Config.custom() - .setPushEnabled(false) - .setPingPolicy(pingPolicy) - .build(); - - server.configure(h2Config); - final InetSocketAddress serverEndpoint = server.start(); - - client.configure(h2Config); - client.start(); - - final IOSession ioSession = client.requestSession( - new HttpHost("localhost", serverEndpoint.getPort()), - TIMEOUT, - null).get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); - - ioSession.setSocketTimeout(idleTime); - - try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) { - final HttpHost target = new HttpHost(URIScheme.HTTP.id, "localhost", serverEndpoint.getPort()); - - // Warm-up to complete the HTTP/2 session & SETTINGS handshake. - final Message r1 = executeHello(streamEndpoint, target); - Assertions.assertEquals(200, r1.getHead().getCode()); - Assertions.assertEquals("OK", r1.getBody()); - - // Give the keep-alive logic a chance to become active (no hard assumptions about socket timeout changes). - parkAtLeast(idleTime.toMilliseconds() * 3L); - - final byte[] expected = new byte[]{0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55}; - final CompletableFuture acked = new CompletableFuture<>(); - - final AsyncPingHandler handler = new AsyncPingHandler() { - - @Override - public ByteBuffer getData() { - return ByteBuffer.wrap(expected).asReadOnlyBuffer(); - } - - @Override - public void consumeResponse(final ByteBuffer feedback) throws IOException, HttpException { - if (feedback == null || feedback.remaining() != expected.length) { - acked.completeExceptionally(new AssertionError("Unexpected ping ACK payload")); - return; - } - final ByteBuffer dup = feedback.slice(); - final byte[] actual = new byte[expected.length]; - dup.get(actual); - for (int i = 0; i < expected.length; i++) { - if (actual[i] != expected[i]) { - acked.completeExceptionally(new AssertionError("Ping ACK payload mismatch")); - return; - } - } - acked.complete(null); - } - - @Override - public void failed(final Exception cause) { - acked.completeExceptionally(cause); - } - - @Override - public void cancel() { - acked.cancel(false); - } - }; - - ioSession.enqueue(new PingCommand(handler), Command.Priority.NORMAL); - ioSession.setEvent(SelectionKey.OP_WRITE); - - try { - acked.get(5, TimeUnit.SECONDS); - } catch (final TimeoutException ex) { - Assertions.fail("Timed out waiting for explicit PING ACK"); - } - - // Still usable. - Assertions.assertTrue(ioSession.isOpen(), "Expected session to stay open"); - final Message r2 = executeHello(streamEndpoint, target); - Assertions.assertEquals(200, r2.getHead().getCode()); - Assertions.assertEquals("OK", r2.getBody()); - } - } - - @Test - void keepAlivePingPolicy_rejectsDisabledAckTimeoutWhenIdleEnabled() { - Assertions.assertThrows(IllegalArgumentException.class, () -> H2PingPolicy.custom() - .setIdleTime(Timeout.ofSeconds(1)) - .setAckTimeout(Timeout.DISABLED) - .build()); - } - - private static Message executeHello( - final ClientSessionEndpoint endpoint, - final HttpHost target) throws Exception { - final Future> f = executeHelloAsync(endpoint, target); - return f.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); - } - - private static Future> executeHelloAsync( - final ClientSessionEndpoint endpoint, - final HttpHost target) { - - final org.apache.hc.core5.http.message.BasicHttpRequest request = BasicRequestBuilder.get() - .setHttpHost(target) - .setPath("/hello") - .build(); - - return endpoint.execute( - new BasicRequestProducer(request, null), - new BasicResponseConsumer(new StringAsyncEntityConsumer()), - null); - } - - private static void parkAtLeast(final long millis) { - final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis); - while (System.nanoTime() < deadlineNanos) { - LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10)); - } - } - - private interface Condition { - boolean get(); - } - - private static void awaitTrue(final Condition condition, final Timeout timeout, final String message) { - final long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout.toMilliseconds()); - while (System.nanoTime() < deadlineNanos) { - if (condition.get()) { - return; - } - LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); - } - Assertions.fail(message); - } - -}