diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 5c4b5b397..1a97fb7bd 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -125,6 +125,10 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private final AtomicInteger connOutputWindow; private final AtomicInteger outputRequests; private final H2StreamListener streamListener; + private final KeepAlivePingSupport keepAlivePingSupport; + + private final Timeout validateAfterInactivity; + private static final Timeout KEEP_ALIVE_PING_ACK_TIMEOUT = Timeout.ofSeconds(5); private ConnectionHandshake connState = ConnectionHandshake.READY; private SettingsHandshake localSettingState = SettingsHandshake.READY; @@ -148,7 +152,6 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private static final long STREAM_TIMEOUT_GRANULARITY_MILLIS = 1000; private long lastStreamTimeoutCheckMillis; - AbstractH2StreamMultiplexer( final ProtocolIOSession ioSession, final FrameFactory frameFactory, @@ -157,6 +160,18 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } final CharCodingConfig charCodingConfig, final H2Config h2Config, final H2StreamListener streamListener) { + this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, null); + } + + AbstractH2StreamMultiplexer( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final StreamIdGenerator idGenerator, + final HttpProcessor httpProcessor, + final CharCodingConfig charCodingConfig, + final H2Config h2Config, + final H2StreamListener streamListener, + final Timeout validateAfterInactivity) { this.ioSession = Args.notNull(ioSession, "IO session"); this.frameFactory = Args.notNull(frameFactory, "Frame factory"); this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); @@ -183,6 +198,12 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } this.lowMark = H2Config.INIT.getInitialWindowSize() / 2; this.streamListener = streamListener; + + this.validateAfterInactivity = validateAfterInactivity; + + this.keepAlivePingSupport = this.validateAfterInactivity != null && this.validateAfterInactivity.isEnabled() + ? new KeepAlivePingSupport(this.validateAfterInactivity, KEEP_ALIVE_PING_ACK_TIMEOUT) + : null; } @Override @@ -444,6 +465,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio for (;;) { final RawFrame frame = inputBuffer.read(src, ioSession); if (frame != null) { + if (keepAlivePingSupport != null) { + keepAlivePingSupport.onFrameInput(frame); + } if (streamListener != null) { streamListener.onFrameInput(this, frame.getStreamId(), frame); } @@ -487,6 +511,10 @@ public final void onOutput() throws HttpException, IOException { ioSession.getLock().unlock(); } + if (keepAlivePingSupport != null) { + keepAlivePingSupport.activateIfReady(); + } + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) { @@ -592,6 +620,15 @@ public final void onOutput() throws HttpException, IOException { } public final void onTimeout(final Timeout timeout) throws HttpException, IOException { + if (keepAlivePingSupport != null + && connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 + && localSettingState == SettingsHandshake.ACKED + && remoteSettingState == SettingsHandshake.ACKED) { + if (keepAlivePingSupport.onTimeout(timeout)) { + return; + } + } + connState = ConnectionHandshake.SHUTDOWN; final RawFrame goAway; @@ -636,13 +673,17 @@ private void executeShutdown(final ShutdownCommand shutdownCommand) throws IOExc } } - private void executePing(final PingCommand pingCommand) throws IOException { - final AsyncPingHandler handler = pingCommand.getHandler(); - pingHandlers.add(handler); - final RawFrame ping = frameFactory.createPing(handler.getData()); + private void sendPing(final AsyncPingHandler handler) throws IOException { + final AsyncPingHandler pingHandler = Args.notNull(handler, "PING handler"); + pingHandlers.add(pingHandler); + final RawFrame ping = frameFactory.createPing(pingHandler.getData()); commitFrame(ping); } + private void executePing(final PingCommand pingCommand) throws IOException { + sendPing(pingCommand.getHandler()); + } + private void executeStaleCheck(final StaleCheckCommand staleCheckCommand) { final Consumer callback = staleCheckCommand.getCallback(); callback.accept(ioSession.isOpen() && @@ -888,6 +929,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload"); } if (frame.isFlagSet(FrameFlag.ACK)) { + if (keepAlivePingSupport != null && keepAlivePingSupport.consumePingAck(ping)) { + break; + } final AsyncPingHandler pingHandler = pingHandlers.poll(); if (pingHandler != null) { pingHandler.consumeResponse(ping); @@ -910,6 +954,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio localSettingState = SettingsHandshake.ACKED; ioSession.setEvent(SelectionKey.OP_WRITE); applyLocalSettings(); + if (keepAlivePingSupport != null) { + keepAlivePingSupport.activateIfReady(); + } } } else { final ByteBuffer payload = frame.getPayload(); @@ -923,6 +970,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio final RawFrame response = frameFactory.createSettingsAck(); commitFrame(response); remoteSettingState = SettingsHandshake.ACKED; + if (keepAlivePingSupport != null) { + keepAlivePingSupport.activateIfReady(); + } } } break; @@ -1328,6 +1378,171 @@ void appendState(final StringBuilder buf) { .append(", streams.lastRemote=").append(streams.getLastRemoteId()); } + private final class KeepAlivePingSupport { + + private static final int PING_DATA_LEN = 8; + + private final Timeout idleTime; + private final Timeout ackTimeout; + + private boolean active; + private boolean awaitingAck; + + private long lastActivityNanos; + private long pingSeq; + private long expectedAckSeq; + + private AsyncPingHandler keepAliveHandler; + + KeepAlivePingSupport(final Timeout idleTime, final Timeout ackTimeout) { + this.idleTime = Args.notNull(idleTime, "Idle time"); + this.ackTimeout = Args.notNull(ackTimeout, "ACK timeout"); + this.active = false; + this.awaitingAck = false; + this.lastActivityNanos = System.nanoTime(); + this.pingSeq = 0L; + this.expectedAckSeq = 0L; + this.keepAliveHandler = null; + } + + void activateIfReady() { + if (active) { + return; + } + if (localSettingState == SettingsHandshake.ACKED && remoteSettingState == SettingsHandshake.ACKED) { + active = true; + awaitingAck = false; + lastActivityNanos = System.nanoTime(); + ioSession.setSocketTimeout(idleTime); + } + } + + void onFrameInput(final RawFrame frame) { + if (!active) { + return; + } + lastActivityNanos = System.nanoTime(); + if (awaitingAck) { + if (!(frame.getType() == FrameType.PING.getValue() && frame.isFlagSet(FrameFlag.ACK))) { + awaitingAck = false; + clearKeepAliveHandler(); + } + } + ioSession.setSocketTimeout(idleTime); + } + + boolean consumePingAck(final ByteBuffer payload) { + if (!active || !awaitingAck) { + return false; + } + if (payload == null || payload.remaining() != PING_DATA_LEN) { + return false; + } + final long ack = payload.getLong(payload.position()); + if (ack != expectedAckSeq) { + return false; + } + awaitingAck = false; + clearKeepAliveHandler(); + lastActivityNanos = System.nanoTime(); + ioSession.setSocketTimeout(idleTime); + return true; + } + + boolean onTimeout(final Timeout timeout) throws IOException { + activateIfReady(); + if (!active) { + return false; + } + + if (awaitingAck) { + shutdownKeepAlive(timeout); + return true; + } + + final long idleNanos = idleTime.toNanoseconds(); + if (idleNanos > 0L) { + final long elapsed = System.nanoTime() - lastActivityNanos; + if (elapsed < idleNanos) { + final long remainingMs = Math.max(1L, (idleNanos - elapsed) / 1_000_000L); + ioSession.setSocketTimeout(Timeout.ofMilliseconds(remainingMs)); + return true; + } + } + + awaitingAck = true; + sendKeepAlivePing(); + ioSession.setSocketTimeout(ackTimeout); + return true; + } + + private void sendKeepAlivePing() throws IOException { + final long v = ++pingSeq; + expectedAckSeq = v; + + final ByteBuffer data = ByteBuffer.allocate(PING_DATA_LEN); + data.putLong(v); + data.flip(); + + final ByteBuffer ro = data.asReadOnlyBuffer(); + + keepAliveHandler = new AsyncPingHandler() { + + @Override + public ByteBuffer getData() { + return ro.duplicate(); + } + + @Override + public void consumeResponse(final ByteBuffer payload) { + // Keep-alive ACK is consumed by consumePingAck(...) before the handler queue is polled. + } + + @Override + public void failed(final Exception cause) { + // No-op; connection error handling is performed elsewhere. + } + + @Override + public void cancel() { + // No-op. + } + + }; + + // Use the same PING sending path as PingCommand (existing command path). + sendPing(keepAliveHandler); + } + + private void clearKeepAliveHandler() { + final AsyncPingHandler handler = keepAliveHandler; + keepAliveHandler = null; + if (handler != null) { + pingHandlers.remove(handler); + } + } + + private void shutdownKeepAlive(final Timeout timeout) throws IOException { + clearKeepAliveHandler(); + connState = ConnectionHandshake.SHUTDOWN; + + final RawFrame goAway = frameFactory.createGoAway( + streams.getLastRemoteId(), + H2Error.NO_ERROR, + "Ping response timeout (" + timeout + ")"); + commitFrame(goAway); + + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + stream.fail(new H2StreamResetException( + H2Error.NO_ERROR, + "Ping response timeout (" + timeout + ")")); + } + streams.shutdownAndReleaseAll(); + } + + } + private static class Continuation { final int streamId; @@ -1592,6 +1807,10 @@ public String toString() { return buf.toString(); } + public boolean isLocalReset() { + return localResetTime > 0; + } + } private void checkStreamTimeouts(final long nowNanos) throws IOException { diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java index ab8ecc48a..7b313f8ee 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java @@ -46,6 +46,7 @@ import org.apache.hc.core5.http2.frame.FrameFactory; import org.apache.hc.core5.http2.frame.StreamIdGenerator; import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.util.Timeout; /** * I/O event handler for events fired by {@link ProtocolIOSession} that implements @@ -66,18 +67,31 @@ public ClientH2StreamMultiplexer( final HandlerFactory pushHandlerFactory, final H2Config h2Config, final CharCodingConfig charCodingConfig, - final H2StreamListener streamListener) { - super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener); + final H2StreamListener streamListener, + final Timeout validateAfterInactivity) { + super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener, + validateAfterInactivity); this.pushHandlerFactory = pushHandlerFactory; } + public ClientH2StreamMultiplexer( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final HttpProcessor httpProcessor, + final HandlerFactory pushHandlerFactory, + final H2Config h2Config, + final CharCodingConfig charCodingConfig, + final H2StreamListener streamListener) { + this(ioSession, frameFactory, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null); + } + public ClientH2StreamMultiplexer( final ProtocolIOSession ioSession, final HttpProcessor httpProcessor, final HandlerFactory pushHandlerFactory, final H2Config h2Config, final CharCodingConfig charCodingConfig) { - this(ioSession, DefaultFrameFactory.INSTANCE, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, null); + this(ioSession, DefaultFrameFactory.INSTANCE, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, null, null); } public ClientH2StreamMultiplexer( diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java index c2b9afeea..a2315554e 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java @@ -30,6 +30,7 @@ import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.HandlerFactory; @@ -39,6 +40,8 @@ import org.apache.hc.core5.http2.frame.FrameFactory; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; /** * {@link ClientH2StreamMultiplexer} factory. @@ -55,6 +58,7 @@ public final class ClientH2StreamMultiplexerFactory { private final CharCodingConfig charCodingConfig; private final H2StreamListener streamListener; private final FrameFactory frameFactory; + private final Supplier validateAfterInactivitySupplier; public ClientH2StreamMultiplexerFactory( final HttpProcessor httpProcessor, @@ -62,13 +66,25 @@ public ClientH2StreamMultiplexerFactory( final H2Config h2Config, final CharCodingConfig charCodingConfig, final H2StreamListener streamListener, - final FrameFactory frameFactory) { + final FrameFactory frameFactory, + final Supplier validateAfterInactivitySupplier) { this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); this.pushHandlerFactory = pushHandlerFactory; this.h2Config = h2Config != null ? h2Config : H2Config.DEFAULT; this.charCodingConfig = charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT; this.streamListener = streamListener; this.frameFactory = frameFactory != null ? frameFactory : DefaultFrameFactory.INSTANCE; + this.validateAfterInactivitySupplier = validateAfterInactivitySupplier; + } + + public ClientH2StreamMultiplexerFactory( + final HttpProcessor httpProcessor, + final HandlerFactory pushHandlerFactory, + final H2Config h2Config, + final CharCodingConfig charCodingConfig, + final H2StreamListener streamListener, + final FrameFactory frameFactory) { + this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, frameFactory, null); } public ClientH2StreamMultiplexerFactory( @@ -78,14 +94,14 @@ public ClientH2StreamMultiplexerFactory( final CharCodingConfig charCodingConfig, final H2StreamListener streamListener ) { - this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null); + this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null, null); } public ClientH2StreamMultiplexerFactory( final HttpProcessor httpProcessor, final HandlerFactory pushHandlerFactory, final H2StreamListener streamListener) { - this(httpProcessor, pushHandlerFactory, null, null, streamListener, null); + this(httpProcessor, pushHandlerFactory, null, null, streamListener, null, null); } public ClientH2StreamMultiplexerFactory( @@ -96,7 +112,18 @@ public ClientH2StreamMultiplexerFactory( public ClientH2StreamMultiplexer create(final ProtocolIOSession ioSession) { return new ClientH2StreamMultiplexer(ioSession, frameFactory, httpProcessor, - pushHandlerFactory, h2Config, charCodingConfig, streamListener); + pushHandlerFactory, h2Config, charCodingConfig, streamListener, resolveValidateAfterInactivity()); + } + + private Timeout resolveValidateAfterInactivity() { + if (validateAfterInactivitySupplier == null) { + return null; + } + final TimeValue timeValue = validateAfterInactivitySupplier.get(); + if (!TimeValue.isNonNegative(timeValue)) { + return null; + } + return timeValue.toTimeout(); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java index 476ecacbd..f00b612be 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java @@ -32,6 +32,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -87,6 +88,7 @@ public class H2MultiplexingRequester extends AsyncRequester { private final H2ConnPool connPool; + private final AtomicReference validateAfterInactivityRef; /** * Hard cap on per-connection queued / in-flight commands. @@ -108,11 +110,16 @@ public H2MultiplexingRequester( final TlsStrategy tlsStrategy, final IOReactorMetricsListener threadPoolListener, final IOWorkerSelector workerSelector, + final AtomicReference validateAfterInactivityRef, final int maxCommandsPerConnection) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener, workerSelector); this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy); + this.validateAfterInactivityRef = validateAfterInactivityRef; + if (this.validateAfterInactivityRef != null) { + this.validateAfterInactivityRef.set(this.connPool.getValidateAfterInactivity()); + } this.maxCommandsPerConnection = maxCommandsPerConnection; } @@ -130,6 +137,9 @@ public TimeValue getValidateAfterInactivity() { public void setValidateAfterInactivity(final TimeValue timeValue) { connPool.setValidateAfterInactivity(timeValue); + if (validateAfterInactivityRef != null) { + validateAfterInactivityRef.set(timeValue); + } } /** diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java index f48900317..c4523c800 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.core5.annotation.Experimental; import org.apache.hc.core5.function.Callback; @@ -53,6 +54,7 @@ import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.TimeValue; /** * {@link H2MultiplexingRequester} bootstrap. @@ -277,13 +279,15 @@ public final H2MultiplexingRequesterBootstrap registerVirtual(final String hostn public H2MultiplexingRequester create() { final RequestRouter> requestRouter = RequestRouter.create( null, uriPatternType, routeEntries, RequestRouter.LOCAL_AUTHORITY_RESOLVER, null); + final AtomicReference validateAfterInactivityRef = new AtomicReference<>(TimeValue.NEG_ONE_MILLISECOND); final ClientH2StreamMultiplexerFactory http2StreamHandlerFactory = new ClientH2StreamMultiplexerFactory( httpProcessor != null ? httpProcessor : H2Processors.client(), new DefaultAsyncPushConsumerFactory(requestRouter), h2Config != null ? h2Config : H2Config.DEFAULT, charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT, streamListener, - frameFactory); + frameFactory, + validateAfterInactivityRef::get); return new H2MultiplexingRequester( ioReactorConfig, (ioSession, attachment) -> @@ -295,6 +299,7 @@ public H2MultiplexingRequester create() { tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(), threadPoolListener, null, + validateAfterInactivityRef, maxCommandsPerConnection); } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java new file mode 100644 index 000000000..396a08af9 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2KeepAlivePingClientExample.java @@ -0,0 +1,288 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.net.URI; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.frame.FrameFlag; +import org.apache.hc.core5.http2.frame.FrameType; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.IOSessionListener; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; + +/** + * HTTP/2 keepalive sanity-check for HttpComponents Core (no HttpClient classes). + * + * This version enables keep-alive by setting validateAfterInactivity, which is what + * AbstractH2StreamMultiplexer currently uses to arm KeepAlivePingSupport. + * + * @since 5.5 + */ +public final class H2KeepAlivePingClientExample { + + private static final URI TARGET = URI.create("https://nghttp2.org/httpbin/get"); + + private static final class Counters { + final AtomicInteger sessionsConnected = new AtomicInteger(0); + final AtomicInteger reactorTimeoutEvents = new AtomicInteger(0); + final AtomicInteger pingsOut = new AtomicInteger(0); + final AtomicInteger pingAcksIn = new AtomicInteger(0); + final AtomicInteger goAwayIn = new AtomicInteger(0); + final AtomicInteger rstStreamIn = new AtomicInteger(0); + final AtomicInteger exceptions = new AtomicInteger(0); + } + + public static void main(final String[] args) throws Exception { + + // Base socket timeout BEFORE keepalive activates. + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(30)) + .build(); + + // Keep-alive idle time (what your multiplexer currently wires via validateAfterInactivity). + final TimeValue validateAfterInactivity = TimeValue.ofSeconds(3); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(100) + .build(); + + final Counters counters = new Counters(); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(h2Config) + .setIOSessionListener(new IOSessionListener() { + + @Override + public void connected(final IOSession session) { + counters.sessionsConnected.incrementAndGet(); + log("session.connected id=" + safeId(session) + + " remote=" + session.getRemoteAddress() + + " soTimeout=" + session.getSocketTimeout()); + } + + @Override + public void startTls(final IOSession session) { + log("session.startTls id=" + safeId(session) + + " soTimeout=" + session.getSocketTimeout()); + } + + @Override + public void inputReady(final IOSession session) { + // no-op + } + + @Override + public void outputReady(final IOSession session) { + // no-op + } + + @Override + public void timeout(final IOSession session) { + // Expected: keep-alive uses the reactor timer. + counters.reactorTimeoutEvents.incrementAndGet(); + log("session.timeout (reactor) id=" + safeId(session) + + " soTimeout=" + session.getSocketTimeout()); + } + + @Override + public void exception(final IOSession session, final Exception ex) { + counters.exceptions.incrementAndGet(); + log("session.exception id=" + safeId(session) + " ex=" + ex); + } + + @Override + public void disconnected(final IOSession session) { + log("session.disconnected id=" + safeId(session)); + } + + private String safeId(final IOSession session) { + try { + return session.getId(); + } catch (final RuntimeException ignore) { + return "n/a"; + } + } + + }) + .setStreamListener(new H2StreamListener() { + + @Override + public void onHeaderInput( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final List headers) { + // no-op + } + + @Override + public void onHeaderOutput( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final List headers) { + // no-op + } + + @Override + public void onFrameInput( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final RawFrame frame) { + + final FrameType type = FrameType.valueOf(frame.getType()); + + if (type == FrameType.PING && frame.isFlagSet(FrameFlag.ACK)) { + counters.pingAcksIn.incrementAndGet(); + log("<< PING[ACK]"); + } else if (type == FrameType.GOAWAY) { + counters.goAwayIn.incrementAndGet(); + log("<< GOAWAY"); + } else if (type == FrameType.RST_STREAM) { + counters.rstStreamIn.incrementAndGet(); + log("<< RST_STREAM streamId=" + streamId); + } + } + + @Override + public void onFrameOutput( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final RawFrame frame) { + + final FrameType type = FrameType.valueOf(frame.getType()); + + if (type == FrameType.PING && !frame.isFlagSet(FrameFlag.ACK)) { + counters.pingsOut.incrementAndGet(); + log(">> PING"); + } + } + + @Override + public void onInputFlowControl( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // no-op + } + + @Override + public void onOutputFlowControl( + final org.apache.hc.core5.http.HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + // no-op + } + + }) + .create(); + + // IMPORTANT FIX: arm keep-alive before any connection is created. + requester.setValidateAfterInactivity(validateAfterInactivity); + + requester.start(); + try { + log("requester.validateAfterInactivity=" + requester.getValidateAfterInactivity()); + + final Message m1 = executeSimpleGet(requester, TARGET); + log("response1=" + m1.getHead().getCode()); + + Thread.sleep(250); + + // Wait long enough to force keepalive activity. + final long waitMs = 13_000L; + log("waiting=" + waitMs + "ms for keepalive..."); + Thread.sleep(waitMs); + + final Message m2 = executeSimpleGet(requester, TARGET); + log("response2=" + m2.getHead().getCode()); + + log("stats: sessionsConnected=" + counters.sessionsConnected.get() + + ", pingsOut=" + counters.pingsOut.get() + + ", pingAcksIn=" + counters.pingAcksIn.get() + + ", goAwayIn=" + counters.goAwayIn.get() + + ", rstStreamIn=" + counters.rstStreamIn.get() + + ", reactorTimeoutEvents=" + counters.reactorTimeoutEvents.get() + + ", exceptions=" + counters.exceptions.get()); + + } finally { + requester.close(CloseMode.GRACEFUL); + } + } + + private static Message executeSimpleGet( + final H2MultiplexingRequester requester, + final URI uri) throws Exception { + + final Timeout timeout = Timeout.ofSeconds(30); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(uri).build(); + final BasicResponseConsumer responseConsumer = + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()); + + try { + final Future> f = requester.execute( + requestProducer, + responseConsumer, + timeout, + HttpCoreContext.create(), + null); + return f.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } finally { + requestProducer.releaseResources(); + responseConsumer.releaseResources(); + } + } + + private static void log(final String s) { + System.out.println(Instant.now() + " " + s); + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index c85d10aa9..d89ff6106 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -61,6 +61,7 @@ import org.apache.hc.core5.http2.frame.DefaultFrameFactory; import org.apache.hc.core5.http2.frame.FrameConsts; import org.apache.hc.core5.http2.frame.FrameFactory; +import org.apache.hc.core5.http2.frame.FrameFlag; import org.apache.hc.core5.http2.frame.FrameType; import org.apache.hc.core5.http2.frame.RawFrame; import org.apache.hc.core5.http2.frame.StreamIdGenerator; @@ -116,7 +117,22 @@ public H2StreamMultiplexerImpl( final H2Config h2Config, final H2StreamListener streamListener, final Supplier streamHandlerSupplier) { - super(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener); + this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, + streamHandlerSupplier, null); + } + + public H2StreamMultiplexerImpl( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final StreamIdGenerator idGenerator, + final HttpProcessor httpProcessor, + final CharCodingConfig charCodingConfig, + final H2Config h2Config, + final H2StreamListener streamListener, + final Supplier streamHandlerSupplier, + final Timeout validateAfterInactivity) { + super(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, + validateAfterInactivity); this.streamHandlerSupplier = streamHandlerSupplier; } @@ -846,28 +862,6 @@ static final class PriorityHeaderSender implements H2StreamHandler { @Override public void releaseResources() { } } - // Small struct + parser to decode the frames we capture from writes - private static final class FrameStub { - final int type; - final int streamId; - FrameStub(final int type, final int streamId) { this.type = type; this.streamId = streamId; } - } - private static List parseFrames(final byte[] all) { - final List out = new ArrayList<>(); - int p = 0; - while (p + 9 <= all.length) { - final int len = ((all[p] & 0xff) << 16) | ((all[p + 1] & 0xff) << 8) | (all[p + 2] & 0xff); - final int type = all[p + 3] & 0xff; - final int sid = ((all[p + 5] & 0x7f) << 24) | ((all[p + 6] & 0xff) << 16) - | ((all[p + 7] & 0xff) << 8) | (all[p + 8] & 0xff); - p += 9; - if (p + len > all.length) break; - out.add(new FrameStub(type, sid)); - p += len; - } - return out; - } - // 2) Client emits PRIORITY_UPDATE BEFORE HEADERS when Priority header present @Test void testSubmitWithPriorityHeaderEmitsPriorityUpdateBeforeHeaders() throws Exception { @@ -1083,5 +1077,291 @@ void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { Assertions.assertEquals(1, timeoutEx.getStreamId()); } + private static byte[] encodeFrame(final RawFrame frame) throws IOException { + final WritableByteChannelMock writableChannel = new WritableByteChannelMock(256); + final FrameOutputBuffer outBuffer = new FrameOutputBuffer(16 * 1024); + outBuffer.write(frame, writableChannel); + return writableChannel.toByteArray(); + } + + private static void feedFrame(final AbstractH2StreamMultiplexer mux, final RawFrame frame) throws Exception { + mux.onInput(ByteBuffer.wrap(encodeFrame(frame))); + } + + private static void completeSettingsHandshake(final AbstractH2StreamMultiplexer mux) throws Exception { + // Remote SETTINGS (non-ACK) -> mux replies with SETTINGS ACK and marks remoteSettingState ACKED + final RawFrame remoteSettings = FRAME_FACTORY.createSettings(new H2Setting[] { + new H2Setting(H2Param.MAX_FRAME_SIZE, FrameConsts.MIN_FRAME_SIZE) + }); + feedFrame(mux, remoteSettings); + + // Remote ACK of our SETTINGS -> localSettingState ACKED + feedFrame(mux, new RawFrame(FrameType.SETTINGS.getValue(), FrameFlag.ACK.getValue(), 0, null)); + } + + private static final class FrameStub { + final int type; + final int flags; + final int streamId; + final byte[] payload; + + FrameStub(final int type, final int flags, final int streamId, final byte[] payload) { + this.type = type; + this.flags = flags; + this.streamId = streamId; + this.payload = payload; + } + + boolean isPing() { + return type == FrameType.PING.getValue(); + } + + boolean isGoAway() { + return type == FrameType.GOAWAY.getValue(); + } + + boolean isAck() { + return (flags & FrameFlag.ACK.getValue()) != 0; + } + } + + private static List parseFrames(final byte[] all) { + final List out = new ArrayList<>(); + int p = 0; + while (p + 9 <= all.length) { + final int len = ((all[p] & 0xff) << 16) | ((all[p + 1] & 0xff) << 8) | (all[p + 2] & 0xff); + final int type = all[p + 3] & 0xff; + final int flags = all[p + 4] & 0xff; + final int sid = ((all[p + 5] & 0x7f) << 24) | ((all[p + 6] & 0xff) << 16) + | ((all[p + 7] & 0xff) << 8) | (all[p + 8] & 0xff); + p += 9; + if (p + len > all.length) { + break; + } + final byte[] payload = new byte[len]; + System.arraycopy(all, p, payload, 0, len); + out.add(new FrameStub(type, flags, sid, payload)); + p += len; + } + return out; + } + + private static byte[] concat(final List writes) { + final int total = writes.stream().mapToInt(a -> a.length).sum(); + final byte[] all = new byte[total]; + int p = 0; + for (final byte[] a : writes) { + System.arraycopy(a, 0, all, p, a.length); + p += a.length; + } + return all; + } + + + @Test + void testKeepAliveNotActiveBeforeSettingsHandshake() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); + + mux.onConnect(); + writes.clear(); + + // BEFORE SETTINGS handshake is fully ACKed, keepalive must NOT run + mux.onTimeout(idleTime); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Must not emit PING before handshake"); + Assertions.assertTrue(frames.stream().anyMatch(FrameStub::isGoAway), "Default timeout path must emit GOAWAY"); + } + + @Test + void testKeepAliveActivatesAfterSettingsAckedSetsIdleTimeout() throws Exception { + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))).thenReturn(0); + + final Timeout idleTime = Timeout.ofMilliseconds(50); + final Timeout ackTimeout = Timeout.ofSeconds(5); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); + + mux.onConnect(); + completeSettingsHandshake(mux); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(idleTime)); + Mockito.verify(protocolIOSession, Mockito.never()).setSocketTimeout(ArgumentMatchers.eq(ackTimeout)); + } + + @Test + void testKeepAliveIdleTimeoutSendsPingAndSetsAckTimeout() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofSeconds(5); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); + + mux.onConnect(); + completeSettingsHandshake(mux); + + writes.clear(); + Thread.sleep(idleTime.toMilliseconds() + 10); + + mux.onTimeout(idleTime); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(ackTimeout)); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().anyMatch(f -> f.isPing() && !f.isAck()), "Must emit keepalive PING"); + Assertions.assertTrue(mux.isOpen(), "Connection should still be open after sending PING"); + } + + @Test + void testKeepAlivePingAckReturnsToIdleTimeout() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); + + mux.onConnect(); + completeSettingsHandshake(mux); + + writes.clear(); + Thread.sleep(idleTime.toMilliseconds() + 10); + mux.onTimeout(idleTime); + + final List frames = parseFrames(concat(writes)); + final FrameStub ping = frames.stream().filter(f -> f.isPing() && !f.isAck()).findFirst().orElse(null); + Assertions.assertNotNull(ping, "Expected a keepalive PING frame"); + Assertions.assertEquals(8, ping.payload.length, "PING payload must be 8 bytes"); + + // Feed an ACK with the same 8 bytes + final RawFrame pingAck = new RawFrame(FrameType.PING.getValue(), FrameFlag.ACK.getValue(), 0, ByteBuffer.wrap(ping.payload)); + feedFrame(mux, pingAck); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(idleTime)); + } + + @Test + void testKeepAliveAckTimeoutShutsDownAndFailsStreams() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final Timeout idleTime = Timeout.ofMilliseconds(5); + final Timeout ackTimeout = Timeout.ofSeconds(5); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + idleTime); + + mux.onConnect(); + completeSettingsHandshake(mux); + + // Ensure at least one live stream to be failed + final H2StreamChannel channel = mux.createChannel(1); + mux.createStream(channel, streamHandler); + + writes.clear(); + Thread.sleep(idleTime.toMilliseconds() + 10); + mux.onTimeout(idleTime); // send PING, awaiting ACK + writes.clear(); + + // No ACK arrives -> next timeout closes via keepalive path (GOAWAY + fail streams) + mux.onTimeout(ackTimeout); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().anyMatch(FrameStub::isGoAway), "Must emit GOAWAY on ping ACK timeout"); + + Mockito.verify(streamHandler, Mockito.atLeastOnce()).failed(exceptionCaptor.capture()); + Assertions.assertInstanceOf(H2StreamResetException.class, exceptionCaptor.getValue()); + + Assertions.assertFalse(mux.isOpen(), "Connection must not be open after keepalive shutdown"); + } + + @Test + void testKeepAliveDisabledNeverEmitsPing() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, h2StreamListener, () -> streamHandler, + Timeout.DISABLED); + + mux.onConnect(); + writes.clear(); + + mux.onTimeout(Timeout.ofMilliseconds(1)); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Disabled policy must never emit PING"); + } + + }