diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java b/marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java index 8ad5cec52..bc2906909 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/impl/OkHttpServices.java @@ -120,7 +120,8 @@ public class OkHttpServices implements RESTServices { private static final Set RETRYABLE_STATUS_CODES = Set.of(STATUS_BAD_GATEWAY, STATUS_SERVICE_UNAVAILABLE, STATUS_GATEWAY_TIMEOUT); - private boolean checkFirstRequest = true; + // When true (digest auth), ping server before streaming non-resendable content to avoid wasted uploads + private boolean useDigestAuthPing = true; static protected class ThreadState { boolean isFirstRequest; @@ -131,7 +132,7 @@ static protected class ThreadState { } private final ThreadLocal threadState = - ThreadLocal.withInitial(() -> new ThreadState(checkFirstRequest)); + ThreadLocal.withInitial(() -> new ThreadState(useDigestAuthPing)); public record ConnectionConfig(String host, int port, String basePath, String database, SecurityContext securityContext, List clientConfigurators) { @@ -197,7 +198,7 @@ private OkHttpClient connect(ConnectionConfig config) { throw new IllegalArgumentException("No security context provided"); } - this.checkFirstRequest = config.securityContext instanceof DigestAuthContext; + this.useDigestAuthPing = config.securityContext instanceof DigestAuthContext; this.database = config.database; this.baseUri = HttpUrlBuilder.newBaseUrl(config.host, config.port, config.basePath, config.securityContext.getSSLContext()); @@ -283,15 +284,17 @@ private void setFirstRequest(boolean value) { threadState.get().isFirstRequest = value; } - private void checkFirstRequest() { - if (checkFirstRequest) setFirstRequest(true); + private void resetFirstRequestFlag() { + if (useDigestAuthPing) { + setFirstRequest(true); + } } - private int makeFirstRequest(int retry) { - return makeFirstRequest(baseUri, "ping", retry); + private int pingServerBeforeStreaming(int retry) { + return pingServer(baseUri, "ping", retry); } - private int makeFirstRequest(HttpUrl requestUri, String path, int retry) { + private int pingServer(HttpUrl requestUri, String path, int retry) { Response response = sendRequestOnce(setupRequest(requestUri, path, null).head()); int statusCode = response.code(); if (!RETRYABLE_STATUS_CODES.contains(statusCode)) { @@ -515,7 +518,12 @@ private Response sendRequestWithRetry(Request.Builder requestBldr, Function doFunction, Consumer resendableConsumer ) { - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + // If the thread is already interrupted, fail fast rather than attempting a request + if (Thread.currentThread().isInterrupted()) { + throw new MarkLogicIOException("Request cancelled: thread was interrupted before request could be sent"); + } + + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; @@ -523,11 +531,7 @@ private Response sendRequestWithRetry( * This loop is for retrying the request if the service is unavailable */ for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); /* * Execute the function which is passed as an argument @@ -1191,16 +1195,12 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth } boolean isResendable = handleBase.isResendable(); - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; Headers responseHeaders = null; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); Object value = handleBase.sendContent(); if (value == null) { @@ -1209,7 +1209,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth } if (isFirstRequest() && !isResendable && isStreaming(value)) { - int firstRequestDelay = makeFirstRequest(retryContext.getRetry()); + int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry()); if (firstRequestDelay != 0) { retryContext.calculateNextDelay(0, firstRequestDelay); continue; @@ -1247,7 +1247,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth closeResponse(response); if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + ((uri != null) ? uri : "new document")); @@ -1348,16 +1348,12 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth requestBldr = addVersionHeader(desc, requestBldr, "If-Match"); } - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; Headers responseHeaders = null; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); MultipartBody.Builder multiPart = new MultipartBody.Builder(); boolean hasStreamingPart = addParts(multiPart, reqlog, @@ -1365,7 +1361,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth new AbstractWriteHandle[]{metadataHandle, contentHandle}); if (isFirstRequest() && hasStreamingPart) { - int firstRequestDelay = makeFirstRequest(retryContext.getRetry()); + int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry()); if (firstRequestDelay != 0) { retryContext.calculateNextDelay(0, firstRequestDelay); continue; @@ -1923,7 +1919,6 @@ public T search(RequestLogger reqlog, T searchHandl addPointInTimeQueryParam(params, searchHandle); - @SuppressWarnings("rawtypes") HandleImplementation searchBase = HandleAccessor.checkHandle(searchHandle, "search"); Format searchFormat = searchBase.getFormat(); @@ -2112,15 +2107,11 @@ void init() { } Response getResponse() { - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); if (queryDef instanceof StructuredQueryDefinition && !(queryDef instanceof RawQueryDefinition)) { response = doPost(reqlog, requestBldr, structure); @@ -2644,15 +2635,11 @@ private void putPostValueImpl(RequestLogger reqlog, String method, String connectPath = null; Request.Builder requestBldr = null; - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); Object nextValue = (handle != null) ? handle.sendContent() : value; @@ -2673,7 +2660,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method, boolean isResendable = (handle == null) ? !isStreaming : handle.isResendable(); if (isFirstRequest() && !isResendable && isStreaming) { - int firstRequestDelay = makeFirstRequest(retryContext.getRetry()); + int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry()); if (firstRequestDelay != 0) { retryContext.calculateNextDelay(0, firstRequestDelay); continue; @@ -2720,7 +2707,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method, closeResponse(response); if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + connectPath); } @@ -3029,7 +3016,7 @@ public R putResource(RequestLogger reqlog, Consumer resendableConsumer = (resendable) -> { if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + path); } @@ -3073,15 +3060,11 @@ public R putResour String outputMimetype = outputBase.getMimetype(); Class as = outputBase.receiveAs(); - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); MultipartBody.Builder multiPart = new MultipartBody.Builder(); boolean hasStreamingPart = addParts(multiPart, reqlog, input); @@ -3182,7 +3165,7 @@ public R postResource(RequestLogger reqlog, Consumer resendableConsumer = new Consumer() { public void accept(Boolean resendable) { if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException("Cannot retry request for " + path); } } @@ -3240,15 +3223,11 @@ public R postResou String outputMimetype = outputBase != null ? outputBase.getMimetype() : null; Class as = outputBase != null ? outputBase.receiveAs() : null; - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); MultipartBody.Builder multiPart = new MultipartBody.Builder(); boolean hasStreamingPart = addParts(multiPart, reqlog, null, input, requestHeaders); @@ -3778,7 +3757,7 @@ private U postIteratedResourceImpl( Consumer resendableConsumer = resendable -> { if (!isResendable) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + path); } @@ -3843,15 +3822,11 @@ private U postIt throws ResourceNotFoundException, ResourceNotResendableException, ForbiddenUserException, FailedRequestException { if (params == null) params = new RequestParameters(); if (transaction != null) params.add("txid", transaction.getTransactionId()); - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); MultipartBody.Builder multiPart = new MultipartBody.Builder(); boolean hasStreamingPart = addParts(multiPart, reqlog, input); @@ -3966,7 +3941,7 @@ private Request.Builder makePutWebResource(String path, private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object value) { if (value == null) throw new IllegalArgumentException("Resource write with null value"); - if (isFirstRequest() && isStreaming(value)) makeFirstRequest(0); + if (isFirstRequest() && isStreaming(value)) pingServerBeforeStreaming(0); MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE)); if (value instanceof OutputStreamSender) { @@ -3987,7 +3962,7 @@ private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object private Response doPut(Request.Builder requestBldr, MultipartBody.Builder multiPart, boolean hasStreamingPart) { - if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0); + if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0); requestBldr = requestBldr.put(multiPart.build()); Response response = sendRequestOnce(requestBldr); @@ -4007,7 +3982,7 @@ private Request.Builder makePostWebResource(String path, RequestParameters param private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Object value) { if (isFirstRequest() && isStreaming(value)) { - makeFirstRequest(0); + pingServerBeforeStreaming(0); } MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE)); @@ -4034,7 +4009,7 @@ private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Objec private Response doPost(Request.Builder requestBldr, MultipartBody.Builder multiPart, boolean hasStreamingPart) { - if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0); + if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0); Response response = sendRequestOnce(requestBldr.post(multiPart.build())); @@ -4953,17 +4928,11 @@ public InputStream match(QueryDefinition queryDef, } requestBldr = addTelemetryAgentId(requestBldr); - MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE)); - - RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); + RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); Response response = null; int status = -1; for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { - try { - retryContext.sleepIfNeeded(); - } catch (InterruptedException e) { - // Ignore interruption - } + retryContext.sleepIfNeeded(); if (queryDef instanceof StructuredQueryDefinition) { response = doPost(null, requestBldr, structure); @@ -5599,14 +5568,32 @@ private void executeRequest(CallResponseImpl responseImpl) { boolean hasStreamingPart = hasStreamingPart(); Consumer resendableConsumer = resendable -> { if (hasStreamingPart) { - checkFirstRequest(); + resetFirstRequestFlag(); throw new ResourceNotResendableException( "Cannot retry request for " + getEndpoint()); } }; Function sendRequestFunction = requestBldr -> { - if (isFirstRequest() && hasStreamingPart) makeFirstRequest(callBaseUri, "", 0); + if (isFirstRequest() && hasStreamingPart) { + // Ping the server before streaming; if unavailable, wait and retry the ping + int pingDelay = pingServer(callBaseUri, "", 0); + int pingRetries = 0; + int maxPingRetries = 10; // Prevent infinite loop + while (pingDelay > 0 && pingRetries < maxPingRetries && !Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(pingDelay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting to ping server before streaming", e); + } + pingRetries++; + pingDelay = pingServer(callBaseUri, "", 0); + } + if (pingRetries >= maxPingRetries) { + logger.warn("Server still unavailable after {} ping attempts before streaming", maxPingRetries); + } + } Response response = sendRequestOnce(requestBldr); if (isFirstRequest()) setFirstRequest(false); return response; diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/impl/RetryContext.java b/marklogic-client-api/src/main/java/com/marklogic/client/impl/RetryContext.java index 4f0632f34..fe17af67d 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/impl/RetryContext.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/impl/RetryContext.java @@ -33,15 +33,24 @@ class RetryContext { } boolean shouldContinueRetrying(int minAttempts, int maxDelay) { + // Stop retrying if thread has been interrupted + if (Thread.currentThread().isInterrupted()) { + return false; + } return retry < minAttempts || (System.currentTimeMillis() - startTime) < maxDelay; } - void sleepIfNeeded() throws InterruptedException { + void sleepIfNeeded() { if (nextDelay > 0) { if (logger.isDebugEnabled()) { logger.debug("Retrying request after {} ms delay (attempt {})", nextDelay, retry); } - Thread.sleep(nextDelay); + try { + Thread.sleep(nextDelay); + } catch (InterruptedException e) { + // Restore interrupt status for higher-level cancellation/shutdown logic + Thread.currentThread().interrupt(); + } } }