Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
private final AtomicInteger connOutputWindow;
private final AtomicInteger outputRequests;
private final H2StreamListener streamListener;
private final KeepAlivePingSupport keepAlivePingSupport;

private final Timeout validateAfterInactivity;
private static final Timeout KEEP_ALIVE_PING_ACK_TIMEOUT = Timeout.ofSeconds(5);

private ConnectionHandshake connState = ConnectionHandshake.READY;
private SettingsHandshake localSettingState = SettingsHandshake.READY;
Expand All @@ -148,7 +152,6 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
private static final long STREAM_TIMEOUT_GRANULARITY_MILLIS = 1000;
private long lastStreamTimeoutCheckMillis;


AbstractH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
Expand All @@ -157,6 +160,18 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
final CharCodingConfig charCodingConfig,
final H2Config h2Config,
final H2StreamListener streamListener) {
this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, null);
}

AbstractH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
final StreamIdGenerator idGenerator,
final HttpProcessor httpProcessor,
final CharCodingConfig charCodingConfig,
final H2Config h2Config,
final H2StreamListener streamListener,
final Timeout validateAfterInactivity) {
this.ioSession = Args.notNull(ioSession, "IO session");
this.frameFactory = Args.notNull(frameFactory, "Frame factory");
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
Expand All @@ -183,6 +198,12 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }

this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
this.streamListener = streamListener;

this.validateAfterInactivity = validateAfterInactivity;

this.keepAlivePingSupport = this.validateAfterInactivity != null && this.validateAfterInactivity.isEnabled()
? new KeepAlivePingSupport(this.validateAfterInactivity, KEEP_ALIVE_PING_ACK_TIMEOUT)
: null;
}

@Override
Expand Down Expand Up @@ -444,6 +465,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio
for (;;) {
final RawFrame frame = inputBuffer.read(src, ioSession);
if (frame != null) {
if (keepAlivePingSupport != null) {
keepAlivePingSupport.onFrameInput(frame);
}
if (streamListener != null) {
streamListener.onFrameInput(this, frame.getStreamId(), frame);
}
Expand Down Expand Up @@ -487,6 +511,10 @@ public final void onOutput() throws HttpException, IOException {
ioSession.getLock().unlock();
}

if (keepAlivePingSupport != null) {
keepAlivePingSupport.activateIfReady();
}

if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {

if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
Expand Down Expand Up @@ -592,6 +620,15 @@ public final void onOutput() throws HttpException, IOException {
}

public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
if (keepAlivePingSupport != null
&& connState.compareTo(ConnectionHandshake.ACTIVE) <= 0
&& localSettingState == SettingsHandshake.ACKED
&& remoteSettingState == SettingsHandshake.ACKED) {
if (keepAlivePingSupport.onTimeout(timeout)) {
return;
}
}

connState = ConnectionHandshake.SHUTDOWN;

final RawFrame goAway;
Expand Down Expand Up @@ -636,13 +673,17 @@ private void executeShutdown(final ShutdownCommand shutdownCommand) throws IOExc
}
}

private void executePing(final PingCommand pingCommand) throws IOException {
final AsyncPingHandler handler = pingCommand.getHandler();
pingHandlers.add(handler);
final RawFrame ping = frameFactory.createPing(handler.getData());
private void sendPing(final AsyncPingHandler handler) throws IOException {
final AsyncPingHandler pingHandler = Args.notNull(handler, "PING handler");
pingHandlers.add(pingHandler);
final RawFrame ping = frameFactory.createPing(pingHandler.getData());
commitFrame(ping);
}

private void executePing(final PingCommand pingCommand) throws IOException {
sendPing(pingCommand.getHandler());
}

private void executeStaleCheck(final StaleCheckCommand staleCheckCommand) {
final Consumer<Boolean> callback = staleCheckCommand.getCallback();
callback.accept(ioSession.isOpen() &&
Expand Down Expand Up @@ -888,6 +929,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
}
if (frame.isFlagSet(FrameFlag.ACK)) {
if (keepAlivePingSupport != null && keepAlivePingSupport.consumePingAck(ping)) {
break;
}
final AsyncPingHandler pingHandler = pingHandlers.poll();
if (pingHandler != null) {
pingHandler.consumeResponse(ping);
Expand All @@ -910,6 +954,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
localSettingState = SettingsHandshake.ACKED;
ioSession.setEvent(SelectionKey.OP_WRITE);
applyLocalSettings();
if (keepAlivePingSupport != null) {
keepAlivePingSupport.activateIfReady();
}
}
} else {
final ByteBuffer payload = frame.getPayload();
Expand All @@ -923,6 +970,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
final RawFrame response = frameFactory.createSettingsAck();
commitFrame(response);
remoteSettingState = SettingsHandshake.ACKED;
if (keepAlivePingSupport != null) {
keepAlivePingSupport.activateIfReady();
}
}
}
break;
Expand Down Expand Up @@ -1328,6 +1378,171 @@ void appendState(final StringBuilder buf) {
.append(", streams.lastRemote=").append(streams.getLastRemoteId());
}

private final class KeepAlivePingSupport {

private static final int PING_DATA_LEN = 8;

private final Timeout idleTime;
private final Timeout ackTimeout;

private boolean active;
private boolean awaitingAck;

private long lastActivityNanos;
private long pingSeq;
private long expectedAckSeq;

private AsyncPingHandler keepAliveHandler;

KeepAlivePingSupport(final Timeout idleTime, final Timeout ackTimeout) {
this.idleTime = Args.notNull(idleTime, "Idle time");
this.ackTimeout = Args.notNull(ackTimeout, "ACK timeout");
this.active = false;
this.awaitingAck = false;
this.lastActivityNanos = System.nanoTime();
this.pingSeq = 0L;
this.expectedAckSeq = 0L;
this.keepAliveHandler = null;
}

void activateIfReady() {
if (active) {
return;
}
if (localSettingState == SettingsHandshake.ACKED && remoteSettingState == SettingsHandshake.ACKED) {
active = true;
awaitingAck = false;
lastActivityNanos = System.nanoTime();
ioSession.setSocketTimeout(idleTime);
}
}

void onFrameInput(final RawFrame frame) {
if (!active) {
return;
}
lastActivityNanos = System.nanoTime();
if (awaitingAck) {
if (!(frame.getType() == FrameType.PING.getValue() && frame.isFlagSet(FrameFlag.ACK))) {
awaitingAck = false;
clearKeepAliveHandler();
}
}
ioSession.setSocketTimeout(idleTime);
}

boolean consumePingAck(final ByteBuffer payload) {
if (!active || !awaitingAck) {
return false;
}
if (payload == null || payload.remaining() != PING_DATA_LEN) {
return false;
}
final long ack = payload.getLong(payload.position());
if (ack != expectedAckSeq) {
return false;
}
awaitingAck = false;
clearKeepAliveHandler();
lastActivityNanos = System.nanoTime();
ioSession.setSocketTimeout(idleTime);
return true;
}

boolean onTimeout(final Timeout timeout) throws IOException {
activateIfReady();
if (!active) {
return false;
}

if (awaitingAck) {
shutdownKeepAlive(timeout);
return true;
}

final long idleNanos = idleTime.toNanoseconds();
if (idleNanos > 0L) {
final long elapsed = System.nanoTime() - lastActivityNanos;
if (elapsed < idleNanos) {
final long remainingMs = Math.max(1L, (idleNanos - elapsed) / 1_000_000L);
ioSession.setSocketTimeout(Timeout.ofMilliseconds(remainingMs));
return true;
}
}

awaitingAck = true;
sendKeepAlivePing();
ioSession.setSocketTimeout(ackTimeout);
return true;
}

private void sendKeepAlivePing() throws IOException {
final long v = ++pingSeq;
expectedAckSeq = v;

final ByteBuffer data = ByteBuffer.allocate(PING_DATA_LEN);
data.putLong(v);
data.flip();

final ByteBuffer ro = data.asReadOnlyBuffer();

keepAliveHandler = new AsyncPingHandler() {

@Override
public ByteBuffer getData() {
return ro.duplicate();
}

@Override
public void consumeResponse(final ByteBuffer payload) {
// Keep-alive ACK is consumed by consumePingAck(...) before the handler queue is polled.
}

@Override
public void failed(final Exception cause) {
// No-op; connection error handling is performed elsewhere.
}

@Override
public void cancel() {
// No-op.
}

};

// Use the same PING sending path as PingCommand (existing command path).
sendPing(keepAliveHandler);
}

private void clearKeepAliveHandler() {
final AsyncPingHandler handler = keepAliveHandler;
keepAliveHandler = null;
if (handler != null) {
pingHandlers.remove(handler);
}
}

private void shutdownKeepAlive(final Timeout timeout) throws IOException {
clearKeepAliveHandler();
connState = ConnectionHandshake.SHUTDOWN;

final RawFrame goAway = frameFactory.createGoAway(
streams.getLastRemoteId(),
H2Error.NO_ERROR,
"Ping response timeout (" + timeout + ")");
commitFrame(goAway);

for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
final H2Stream stream = it.next();
stream.fail(new H2StreamResetException(
H2Error.NO_ERROR,
"Ping response timeout (" + timeout + ")"));
}
streams.shutdownAndReleaseAll();
}

}

private static class Continuation {

final int streamId;
Expand Down Expand Up @@ -1592,6 +1807,10 @@ public String toString() {
return buf.toString();
}

public boolean isLocalReset() {
return localResetTime > 0;
}

}

private void checkStreamTimeouts(final long nowNanos) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hc.core5.http2.frame.FrameFactory;
import org.apache.hc.core5.http2.frame.StreamIdGenerator;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.util.Timeout;

/**
* I/O event handler for events fired by {@link ProtocolIOSession} that implements
Expand All @@ -66,18 +67,31 @@ public ClientH2StreamMultiplexer(
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener) {
super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener);
final H2StreamListener streamListener,
final Timeout validateAfterInactivity) {
super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener,
validateAfterInactivity);
this.pushHandlerFactory = pushHandlerFactory;
}

public ClientH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig,
final H2StreamListener streamListener) {
this(ioSession, frameFactory, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, null);
}

public ClientH2StreamMultiplexer(
final ProtocolIOSession ioSession,
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig) {
this(ioSession, DefaultFrameFactory.INSTANCE, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, null);
this(ioSession, DefaultFrameFactory.INSTANCE, httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, null, null);
}

public ClientH2StreamMultiplexer(
Expand Down
Loading
Loading