Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public class OkHttpServices implements RESTServices {
private static final Set<Integer> RETRYABLE_STATUS_CODES =
Set.of(STATUS_BAD_GATEWAY, STATUS_SERVICE_UNAVAILABLE, STATUS_GATEWAY_TIMEOUT);

private boolean checkFirstRequest = true;
// When true (digest auth), ping server before streaming non-resendable content to avoid wasted uploads
private boolean useDigestAuthPing = true;

static protected class ThreadState {
boolean isFirstRequest;
Expand All @@ -131,7 +132,7 @@ static protected class ThreadState {
}

private final ThreadLocal<ThreadState> threadState =
ThreadLocal.withInitial(() -> new ThreadState(checkFirstRequest));
ThreadLocal.withInitial(() -> new ThreadState(useDigestAuthPing));

public record ConnectionConfig(String host, int port, String basePath, String database,
SecurityContext securityContext, List<OkHttpClientConfigurator> clientConfigurators) {
Expand Down Expand Up @@ -197,7 +198,7 @@ private OkHttpClient connect(ConnectionConfig config) {
throw new IllegalArgumentException("No security context provided");
}

this.checkFirstRequest = config.securityContext instanceof DigestAuthContext;
this.useDigestAuthPing = config.securityContext instanceof DigestAuthContext;
this.database = config.database;
this.baseUri = HttpUrlBuilder.newBaseUrl(config.host, config.port, config.basePath, config.securityContext.getSSLContext());

Expand Down Expand Up @@ -283,15 +284,17 @@ private void setFirstRequest(boolean value) {
threadState.get().isFirstRequest = value;
}

private void checkFirstRequest() {
if (checkFirstRequest) setFirstRequest(true);
private void resetFirstRequestFlag() {
if (useDigestAuthPing) {
setFirstRequest(true);
}
}

private int makeFirstRequest(int retry) {
return makeFirstRequest(baseUri, "ping", retry);
private int pingServerBeforeStreaming(int retry) {
return pingServer(baseUri, "ping", retry);
}

private int makeFirstRequest(HttpUrl requestUri, String path, int retry) {
private int pingServer(HttpUrl requestUri, String path, int retry) {
Response response = sendRequestOnce(setupRequest(requestUri, path, null).head());
int statusCode = response.code();
if (!RETRYABLE_STATUS_CODES.contains(statusCode)) {
Expand Down Expand Up @@ -515,19 +518,20 @@ private Response sendRequestWithRetry(Request.Builder requestBldr, Function<Requ
private Response sendRequestWithRetry(
Request.Builder requestBldr, boolean isRetryable, Function<Request.Builder, Response> doFunction, Consumer<Boolean> resendableConsumer
) {
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
// If the thread is already interrupted, fail fast rather than attempting a request
if (Thread.currentThread().isInterrupted()) {
throw new MarkLogicIOException("Request cancelled: thread was interrupted before request could be sent");
}

RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
Response response = null;
int status = -1;

/*
* This loop is for retrying the request if the service is unavailable
*/
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
try {
retryContext.sleepIfNeeded();
} catch (InterruptedException e) {
// Ignore interruption
}
retryContext.sleepIfNeeded();

/*
* Execute the function which is passed as an argument
Expand Down Expand Up @@ -1191,16 +1195,12 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
}
boolean isResendable = handleBase.isResendable();

RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
Response response = null;
int status = -1;
Headers responseHeaders = null;
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
try {
retryContext.sleepIfNeeded();
} catch (InterruptedException e) {
// Ignore interruption
}
retryContext.sleepIfNeeded();

Object value = handleBase.sendContent();
if (value == null) {
Expand All @@ -1209,7 +1209,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
}

if (isFirstRequest() && !isResendable && isStreaming(value)) {
int firstRequestDelay = makeFirstRequest(retryContext.getRetry());
int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry());
if (firstRequestDelay != 0) {
retryContext.calculateNextDelay(0, firstRequestDelay);
continue;
Expand Down Expand Up @@ -1247,7 +1247,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
closeResponse(response);

if (!isResendable) {
checkFirstRequest();
resetFirstRequestFlag();
throw new ResourceNotResendableException(
"Cannot retry request for " +
((uri != null) ? uri : "new document"));
Expand Down Expand Up @@ -1348,24 +1348,20 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth
requestBldr = addVersionHeader(desc, requestBldr, "If-Match");
}

RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
Response response = null;
int status = -1;
Headers responseHeaders = null;
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
try {
retryContext.sleepIfNeeded();
} catch (InterruptedException e) {
// Ignore interruption
}
retryContext.sleepIfNeeded();

MultipartBody.Builder multiPart = new MultipartBody.Builder();
boolean hasStreamingPart = addParts(multiPart, reqlog,
new String[]{metadataMimetype, contentMimetype},
new AbstractWriteHandle[]{metadataHandle, contentHandle});

if (isFirstRequest() && hasStreamingPart) {
int firstRequestDelay = makeFirstRequest(retryContext.getRetry());
int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry());
if (firstRequestDelay != 0) {
retryContext.calculateNextDelay(0, firstRequestDelay);
continue;
Expand Down Expand Up @@ -1923,7 +1919,6 @@ public <T extends SearchReadHandle> T search(RequestLogger reqlog, T searchHandl

addPointInTimeQueryParam(params, searchHandle);

@SuppressWarnings("rawtypes")
HandleImplementation searchBase = HandleAccessor.checkHandle(searchHandle, "search");

Format searchFormat = searchBase.getFormat();
Expand Down Expand Up @@ -2112,15 +2107,11 @@ void init() {
}

Response getResponse() {
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::checkFirstRequest);
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::resetFirstRequestFlag);
Response response = null;
int status = -1;
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
try {
retryContext.sleepIfNeeded();
} catch (InterruptedException e) {
// Ignore interruption
}
retryContext.sleepIfNeeded();

if (queryDef instanceof StructuredQueryDefinition && !(queryDef instanceof RawQueryDefinition)) {
response = doPost(reqlog, requestBldr, structure);
Expand Down Expand Up @@ -2644,15 +2635,11 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
String connectPath = null;
Request.Builder requestBldr = null;

RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
Response response = null;
int status = -1;
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
try {
retryContext.sleepIfNeeded();
} catch (InterruptedException e) {
// Ignore interruption
}
retryContext.sleepIfNeeded();

Object nextValue = (handle != null) ? handle.sendContent() : value;

Expand All @@ -2673,7 +2660,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
boolean isResendable = (handle == null) ? !isStreaming : handle.isResendable();

if (isFirstRequest() && !isResendable && isStreaming) {
int firstRequestDelay = makeFirstRequest(retryContext.getRetry());
int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry());
if (firstRequestDelay != 0) {
retryContext.calculateNextDelay(0, firstRequestDelay);
continue;
Expand Down Expand Up @@ -2720,7 +2707,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method,
closeResponse(response);

if (!isResendable) {
checkFirstRequest();
resetFirstRequestFlag();
throw new ResourceNotResendableException(
"Cannot retry request for " + connectPath);
}
Expand Down Expand Up @@ -3029,7 +3016,7 @@ public <R extends AbstractReadHandle> R putResource(RequestLogger reqlog,

Consumer<Boolean> resendableConsumer = (resendable) -> {
if (!isResendable) {
checkFirstRequest();
resetFirstRequestFlag();
throw new ResourceNotResendableException(
"Cannot retry request for " + path);
}
Expand Down Expand Up @@ -3073,15 +3060,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R putResour
String outputMimetype = outputBase.getMimetype();
Class as = outputBase.receiveAs();

RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
Response response = null;
int status = -1;
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
try {
retryContext.sleepIfNeeded();
} catch (InterruptedException e) {
// Ignore interruption
}
retryContext.sleepIfNeeded();

MultipartBody.Builder multiPart = new MultipartBody.Builder();
boolean hasStreamingPart = addParts(multiPart, reqlog, input);
Expand Down Expand Up @@ -3182,7 +3165,7 @@ public <R extends AbstractReadHandle> R postResource(RequestLogger reqlog,
Consumer<Boolean> resendableConsumer = new Consumer<Boolean>() {
public void accept(Boolean resendable) {
if (!isResendable) {
checkFirstRequest();
resetFirstRequestFlag();
throw new ResourceNotResendableException("Cannot retry request for " + path);
}
}
Expand Down Expand Up @@ -3240,15 +3223,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R postResou
String outputMimetype = outputBase != null ? outputBase.getMimetype() : null;
Class as = outputBase != null ? outputBase.receiveAs() : null;

RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
Response response = null;
int status = -1;
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
try {
retryContext.sleepIfNeeded();
} catch (InterruptedException e) {
// Ignore interruption
}
retryContext.sleepIfNeeded();

MultipartBody.Builder multiPart = new MultipartBody.Builder();
boolean hasStreamingPart = addParts(multiPart, reqlog, null, input, requestHeaders);
Expand Down Expand Up @@ -3778,7 +3757,7 @@ private <U extends OkHttpResultIterator> U postIteratedResourceImpl(

Consumer<Boolean> resendableConsumer = resendable -> {
if (!isResendable) {
checkFirstRequest();
resetFirstRequestFlag();
throw new ResourceNotResendableException(
"Cannot retry request for " + path);
}
Expand Down Expand Up @@ -3843,15 +3822,11 @@ private <W extends AbstractWriteHandle, U extends OkHttpResultIterator> U postIt
throws ResourceNotFoundException, ResourceNotResendableException, ForbiddenUserException, FailedRequestException {
if (params == null) params = new RequestParameters();
if (transaction != null) params.add("txid", transaction.getTransactionId());
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
Response response = null;
int status = -1;
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
try {
retryContext.sleepIfNeeded();
} catch (InterruptedException e) {
// Ignore interruption
}
retryContext.sleepIfNeeded();

MultipartBody.Builder multiPart = new MultipartBody.Builder();
boolean hasStreamingPart = addParts(multiPart, reqlog, input);
Expand Down Expand Up @@ -3966,7 +3941,7 @@ private Request.Builder makePutWebResource(String path,
private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object value) {
if (value == null) throw new IllegalArgumentException("Resource write with null value");

if (isFirstRequest() && isStreaming(value)) makeFirstRequest(0);
if (isFirstRequest() && isStreaming(value)) pingServerBeforeStreaming(0);

MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE));
if (value instanceof OutputStreamSender) {
Expand All @@ -3987,7 +3962,7 @@ private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object

private Response doPut(Request.Builder requestBldr,
MultipartBody.Builder multiPart, boolean hasStreamingPart) {
if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0);
if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0);

requestBldr = requestBldr.put(multiPart.build());
Response response = sendRequestOnce(requestBldr);
Expand All @@ -4007,7 +3982,7 @@ private Request.Builder makePostWebResource(String path, RequestParameters param

private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Object value) {
if (isFirstRequest() && isStreaming(value)) {
makeFirstRequest(0);
pingServerBeforeStreaming(0);
}

MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE));
Expand All @@ -4034,7 +4009,7 @@ private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Objec

private Response doPost(Request.Builder requestBldr,
MultipartBody.Builder multiPart, boolean hasStreamingPart) {
if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0);
if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0);

Response response = sendRequestOnce(requestBldr.post(multiPart.build()));

Expand Down Expand Up @@ -4953,17 +4928,11 @@ public InputStream match(QueryDefinition queryDef,
}
requestBldr = addTelemetryAgentId(requestBldr);

MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE));

RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest);
RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag);
Response response = null;
int status = -1;
for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) {
try {
retryContext.sleepIfNeeded();
} catch (InterruptedException e) {
// Ignore interruption
}
retryContext.sleepIfNeeded();

if (queryDef instanceof StructuredQueryDefinition) {
response = doPost(null, requestBldr, structure);
Expand Down Expand Up @@ -5599,14 +5568,32 @@ private void executeRequest(CallResponseImpl responseImpl) {
boolean hasStreamingPart = hasStreamingPart();
Consumer<Boolean> resendableConsumer = resendable -> {
if (hasStreamingPart) {
checkFirstRequest();
resetFirstRequestFlag();
throw new ResourceNotResendableException(
"Cannot retry request for " + getEndpoint());
}
};

Function<Request.Builder, Response> sendRequestFunction = requestBldr -> {
if (isFirstRequest() && hasStreamingPart) makeFirstRequest(callBaseUri, "", 0);
if (isFirstRequest() && hasStreamingPart) {
// Ping the server before streaming; if unavailable, wait and retry the ping
int pingDelay = pingServer(callBaseUri, "", 0);
int pingRetries = 0;
int maxPingRetries = 10; // Prevent infinite loop
while (pingDelay > 0 && pingRetries < maxPingRetries && !Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(pingDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting to ping server before streaming", e);
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing a generic RuntimeException on InterruptedException here changes how interruption/cancellation is surfaced compared to the rest of this class (which typically throws MarkLogicIOException/FailedRequestException types). Consider throwing a consistent client exception type (and preserving the interrupt flag) so callers can reliably handle cancellation.

Suggested change
throw new RuntimeException("Interrupted while waiting to ping server before streaming", e);
throw new MarkLogicIOException("Interrupted while waiting to ping server before streaming", e);

Copilot uses AI. Check for mistakes.
}
pingRetries++;
pingDelay = pingServer(callBaseUri, "", 0);
}
Comment on lines +5580 to +5592
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the ping retry loop, pingServer(callBaseUri, "", 0) is called on every iteration with a constant retry value of 0, so calculateDelay(retry) never backs off across attempts. Consider passing pingRetries (or similar) into pingServer so delay increases with each ping attempt and aligns with the existing retry/backoff behavior elsewhere in this class.

Copilot uses AI. Check for mistakes.
if (pingRetries >= maxPingRetries) {
logger.warn("Server still unavailable after {} ping attempts before streaming", maxPingRetries);
}
}
Response response = sendRequestOnce(requestBldr);
if (isFirstRequest()) setFirstRequest(false);
return response;
Expand Down
Loading