From f3366f862faea4f691e3d4a99e9d693f3edb8afb Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Tue, 3 Mar 2026 11:23:36 -0500 Subject: [PATCH] MLE-27388: Refactor: Renamed the "checkFirstRequest" stuff The new names actually convey what is going on as opposed to making you wonder what "check" might mean. Also modified sleepIfNeeded to catch the interrupted exception instead of repeating that catch. --- .../marklogic/client/impl/OkHttpServices.java | 145 ++++++++---------- .../marklogic/client/impl/RetryContext.java | 13 +- 2 files changed, 77 insertions(+), 81 deletions(-) diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java b/marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java index 8ad5cec52..bc2906909 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java @@ -120,7 +120,8 @@ public class OkHttpServices implements RESTServices { private static final Set RETRYABLE_STATUS_CODES = Set.of(STATUS_BAD_GATEWAY, STATUS_SERVICE_UNAVAILABLE, STATUS_GATEWAY_TIMEOUT); - private boolean checkFirstRequest = true; + // When true (digest auth), ping server before streaming non-resendable content to avoid wasted uploads + private boolean useDigestAuthPing = true; static protected class ThreadState { boolean isFirstRequest; @@ -131,7 +132,7 @@ static protected class ThreadState { } private final ThreadLocal threadState = - ThreadLocal.withInitial(() -> new ThreadState(checkFirstRequest)); + ThreadLocal.withInitial(() -> new ThreadState(useDigestAuthPing)); public record ConnectionConfig(String host, int port, String basePath, String database, SecurityContext securityContext, List clientConfigurators) { @@ -197,7 +198,7 @@ private OkHttpClient connect(ConnectionConfig config) { throw new IllegalArgumentException("No security context provided"); } - this.checkFirstRequest = config.securityContext instanceof DigestAuthContext; + this.useDigestAuthPing = config.securityContext instanceof DigestAuthContext; this.database = config.database; this.baseUri = HttpUrlBuilder.newBaseUrl(config.host, config.port, config.basePath, config.securityContext.getSSLContext()); @@ -283,15 +284,17 @@ private void setFirstRequest(boolean value) { threadState.get().isFirstRequest = value; } - private void checkFirstRequest() { - if (checkFirstRequest) setFirstRequest(true); + private void resetFirstRequestFlag() { + if (useDigestAuthPing) { + setFirstRequest(true); + } } - private int makeFirstRequest(int retry) { - return makeFirstRequest(baseUri, "ping", retry); + private int pingServerBeforeStreaming(int retry) { + return pingServer(baseUri, "ping", retry); } - private int makeFirstRequest(HttpUrl requestUri, String path, int retry) { + private int pingServer(HttpUrl requestUri, String path, int retry) { Response response = sendRequestOnce(setupRequest(requestUri, path, null).head()); int statusCode = response.code(); if (!RETRYABLE_STATUS_CODES.contains(statusCode)) { @@ -515,7 +518,12 @@ private Response sendRequestWithRetry(Request.Builder requestBldr, Function doFunction, Consumer resendableConsumer ) { - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + // If the thread is already interrupted, fail fast rather than attempting a request + if (Thread.currentThread().isInterrupted()) { + throw new MarkLogicIOException("Request cancelled: thread was interrupted before request could be sent"); + } + + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; @@ -523,11 +531,7 @@ private Response sendRequestWithRetry( * This loop is for retrying the request if the service is unavailable */ for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); /* * Execute the function which is passed as an argument @@ -1191,16 +1195,12 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth } boolean isResendable = handleBase.isResendable(); - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; Headers responseHeaders = null; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); Object value = handleBase.sendContent(); if (value == null) { @@ -1209,7 +1209,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth } if (isFirstRequest() && !isResendable && isStreaming(value)) { - int firstRequestDelay = makeFirstRequest(retryContext.getRetry()); + int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry()); if (firstRequestDelay != 0) { retryContext.calculateNextDelay(0, firstRequestDelay); continue; @@ -1247,7 +1247,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth closeResponse(response); if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + ((uri != null) ? uri : "new document")); @@ -1348,16 +1348,12 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth requestBldr = addVersionHeader(desc, requestBldr, "If-Match"); } - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; Headers responseHeaders = null; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); MultipartBody.Builder multiPart = new MultipartBody.Builder(); boolean hasStreamingPart = addParts(multiPart, reqlog, @@ -1365,7 +1361,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth new AbstractWriteHandle[]{metadataHandle, contentHandle}); if (isFirstRequest() && hasStreamingPart) { - int firstRequestDelay = makeFirstRequest(retryContext.getRetry()); + int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry()); if (firstRequestDelay != 0) { retryContext.calculateNextDelay(0, firstRequestDelay); continue; @@ -1923,7 +1919,6 @@ public T search(RequestLogger reqlog, T searchHandl addPointInTimeQueryParam(params, searchHandle); - @SuppressWarnings("rawtypes") HandleImplementation searchBase = HandleAccessor.checkHandle(searchHandle, "search"); Format searchFormat = searchBase.getFormat(); @@ -2112,15 +2107,11 @@ void init() { } Response getResponse() { - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); if (queryDef instanceof StructuredQueryDefinition && !(queryDef instanceof RawQueryDefinition)) { response = doPost(reqlog, requestBldr, structure); @@ -2644,15 +2635,11 @@ private void putPostValueImpl(RequestLogger reqlog, String method, String connectPath = null; Request.Builder requestBldr = null; - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); Object nextValue = (handle != null) ? handle.sendContent() : value; @@ -2673,7 +2660,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method, boolean isResendable = (handle == null) ? !isStreaming : handle.isResendable(); if (isFirstRequest() && !isResendable && isStreaming) { - int firstRequestDelay = makeFirstRequest(retryContext.getRetry()); + int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry()); if (firstRequestDelay != 0) { retryContext.calculateNextDelay(0, firstRequestDelay); continue; @@ -2720,7 +2707,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method, closeResponse(response); if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + connectPath); } @@ -3029,7 +3016,7 @@ public R putResource(RequestLogger reqlog, Consumer resendableConsumer = (resendable) -> { if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + path); } @@ -3073,15 +3060,11 @@ public R putResour String outputMimetype = outputBase.getMimetype(); Class as = outputBase.receiveAs(); - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); MultipartBody.Builder multiPart = new MultipartBody.Builder(); boolean hasStreamingPart = addParts(multiPart, reqlog, input); @@ -3182,7 +3165,7 @@ public R postResource(RequestLogger reqlog, Consumer resendableConsumer = new Consumer() { public void accept(Boolean resendable) { if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException("Cannot retry request for " + path); } } @@ -3240,15 +3223,11 @@ public R postResou String outputMimetype = outputBase != null ? outputBase.getMimetype() : null; Class as = outputBase != null ? outputBase.receiveAs() : null; - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); MultipartBody.Builder multiPart = new MultipartBody.Builder(); boolean hasStreamingPart = addParts(multiPart, reqlog, null, input, requestHeaders); @@ -3778,7 +3757,7 @@ private U postIteratedResourceImpl( Consumer resendableConsumer = resendable -> { if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + path); } @@ -3843,15 +3822,11 @@ private U postIt throws ResourceNotFoundException, ResourceNotResendableException, ForbiddenUserException, FailedRequestException { if (params == null) params = new RequestParameters(); if (transaction != null) params.add("txid", transaction.getTransactionId()); - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); MultipartBody.Builder multiPart = new MultipartBody.Builder(); boolean hasStreamingPart = addParts(multiPart, reqlog, input); @@ -3966,7 +3941,7 @@ private Request.Builder makePutWebResource(String path, private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object value) { if (value == null) throw new IllegalArgumentException("Resource write with null value"); - if (isFirstRequest() && isStreaming(value)) makeFirstRequest(0); + if (isFirstRequest() && isStreaming(value)) pingServerBeforeStreaming(0); MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE)); if (value instanceof OutputStreamSender) { @@ -3987,7 +3962,7 @@ private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object private Response doPut(Request.Builder requestBldr, MultipartBody.Builder multiPart, boolean hasStreamingPart) { - if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0); + if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0); requestBldr = requestBldr.put(multiPart.build()); Response response = sendRequestOnce(requestBldr); @@ -4007,7 +3982,7 @@ private Request.Builder makePostWebResource(String path, RequestParameters param private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Object value) { if (isFirstRequest() && isStreaming(value)) { - makeFirstRequest(0); + pingServerBeforeStreaming(0); } MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE)); @@ -4034,7 +4009,7 @@ private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Objec private Response doPost(Request.Builder requestBldr, MultipartBody.Builder multiPart, boolean hasStreamingPart) { - if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0); + if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0); Response response = sendRequestOnce(requestBldr.post(multiPart.build())); @@ -4953,17 +4928,11 @@ public InputStream match(QueryDefinition queryDef, } requestBldr = addTelemetryAgentId(requestBldr); - MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE)); - - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); if (queryDef instanceof StructuredQueryDefinition) { response = doPost(null, requestBldr, structure); @@ -5599,14 +5568,32 @@ private void executeRequest(CallResponseImpl responseImpl) { boolean hasStreamingPart = hasStreamingPart(); Consumer resendableConsumer = resendable -> { if (hasStreamingPart) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + getEndpoint()); } }; Function sendRequestFunction = requestBldr -> { - if (isFirstRequest() && hasStreamingPart) makeFirstRequest(callBaseUri, "", 0); + if (isFirstRequest() && hasStreamingPart) { + // Ping the server before streaming; if unavailable, wait and retry the ping + int pingDelay = pingServer(callBaseUri, "", 0); + int pingRetries = 0; + int maxPingRetries = 10; // Prevent infinite loop + while (pingDelay > 0 && pingRetries < maxPingRetries && !Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(pingDelay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting to ping server before streaming", e); + } + pingRetries++; + pingDelay = pingServer(callBaseUri, "", 0); + } + if (pingRetries >= maxPingRetries) { + logger.warn("Server still unavailable after {} ping attempts before streaming", maxPingRetries); + } + } Response response = sendRequestOnce(requestBldr); if (isFirstRequest()) setFirstRequest(false); return response; diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/impl/RetryContext.java b/marklogic-client-api/src/main/java/com/marklogic/client/impl/RetryContext.java index 4f0632f34..fe17af67d 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/impl/RetryContext.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/impl/RetryContext.java @@ -33,15 +33,24 @@ class RetryContext { } boolean shouldContinueRetrying(int minAttempts, int maxDelay) { + // Stop retrying if thread has been interrupted + if (Thread.currentThread().isInterrupted()) { + return false; + } return retry < minAttempts || (System.currentTimeMillis() - startTime) < maxDelay; } - void sleepIfNeeded() throws InterruptedException { + void sleepIfNeeded() { if (nextDelay > 0) { if (logger.isDebugEnabled()) { logger.debug("Retrying request after {} ms delay (attempt {})", nextDelay, retry); } - Thread.sleep(nextDelay); + try { + Thread.sleep(nextDelay); + } catch (InterruptedException e) { + // Restore interrupt status for higher-level cancellation/shutdown logic + Thread.currentThread().interrupt(); + } } }