-
Notifications
You must be signed in to change notification settings - Fork 73
MLE-27388: Refactor: Renamed the "checkFirstRequest" stuff #1915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -120,7 +120,8 @@ public class OkHttpServices implements RESTServices { | |||||
| private static final Set<Integer> RETRYABLE_STATUS_CODES = | ||||||
| Set.of(STATUS_BAD_GATEWAY, STATUS_SERVICE_UNAVAILABLE, STATUS_GATEWAY_TIMEOUT); | ||||||
|
|
||||||
| private boolean checkFirstRequest = true; | ||||||
| // When true (digest auth), ping server before streaming non-resendable content to avoid wasted uploads | ||||||
| private boolean useDigestAuthPing = true; | ||||||
|
|
||||||
| static protected class ThreadState { | ||||||
| boolean isFirstRequest; | ||||||
|
|
@@ -131,7 +132,7 @@ static protected class ThreadState { | |||||
| } | ||||||
|
|
||||||
| private final ThreadLocal<ThreadState> threadState = | ||||||
| ThreadLocal.withInitial(() -> new ThreadState(checkFirstRequest)); | ||||||
| ThreadLocal.withInitial(() -> new ThreadState(useDigestAuthPing)); | ||||||
|
|
||||||
| public record ConnectionConfig(String host, int port, String basePath, String database, | ||||||
| SecurityContext securityContext, List<OkHttpClientConfigurator> clientConfigurators) { | ||||||
|
|
@@ -197,7 +198,7 @@ private OkHttpClient connect(ConnectionConfig config) { | |||||
| throw new IllegalArgumentException("No security context provided"); | ||||||
| } | ||||||
|
|
||||||
| this.checkFirstRequest = config.securityContext instanceof DigestAuthContext; | ||||||
| this.useDigestAuthPing = config.securityContext instanceof DigestAuthContext; | ||||||
| this.database = config.database; | ||||||
| this.baseUri = HttpUrlBuilder.newBaseUrl(config.host, config.port, config.basePath, config.securityContext.getSSLContext()); | ||||||
|
|
||||||
|
|
@@ -283,15 +284,17 @@ private void setFirstRequest(boolean value) { | |||||
| threadState.get().isFirstRequest = value; | ||||||
| } | ||||||
|
|
||||||
| private void checkFirstRequest() { | ||||||
| if (checkFirstRequest) setFirstRequest(true); | ||||||
| private void resetFirstRequestFlag() { | ||||||
| if (useDigestAuthPing) { | ||||||
| setFirstRequest(true); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private int makeFirstRequest(int retry) { | ||||||
| return makeFirstRequest(baseUri, "ping", retry); | ||||||
| private int pingServerBeforeStreaming(int retry) { | ||||||
| return pingServer(baseUri, "ping", retry); | ||||||
| } | ||||||
|
|
||||||
| private int makeFirstRequest(HttpUrl requestUri, String path, int retry) { | ||||||
| private int pingServer(HttpUrl requestUri, String path, int retry) { | ||||||
| Response response = sendRequestOnce(setupRequest(requestUri, path, null).head()); | ||||||
| int statusCode = response.code(); | ||||||
| if (!RETRYABLE_STATUS_CODES.contains(statusCode)) { | ||||||
|
|
@@ -515,19 +518,20 @@ private Response sendRequestWithRetry(Request.Builder requestBldr, Function<Requ | |||||
| private Response sendRequestWithRetry( | ||||||
| Request.Builder requestBldr, boolean isRetryable, Function<Request.Builder, Response> doFunction, Consumer<Boolean> resendableConsumer | ||||||
| ) { | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); | ||||||
| // If the thread is already interrupted, fail fast rather than attempting a request | ||||||
| if (Thread.currentThread().isInterrupted()) { | ||||||
| throw new MarkLogicIOException("Request cancelled: thread was interrupted before request could be sent"); | ||||||
| } | ||||||
|
|
||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); | ||||||
| Response response = null; | ||||||
| int status = -1; | ||||||
|
|
||||||
| /* | ||||||
| * This loop is for retrying the request if the service is unavailable | ||||||
| */ | ||||||
| for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { | ||||||
| try { | ||||||
| retryContext.sleepIfNeeded(); | ||||||
| } catch (InterruptedException e) { | ||||||
| // Ignore interruption | ||||||
| } | ||||||
| retryContext.sleepIfNeeded(); | ||||||
|
|
||||||
| /* | ||||||
| * Execute the function which is passed as an argument | ||||||
|
|
@@ -1191,16 +1195,12 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth | |||||
| } | ||||||
| boolean isResendable = handleBase.isResendable(); | ||||||
|
|
||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); | ||||||
| Response response = null; | ||||||
| int status = -1; | ||||||
| Headers responseHeaders = null; | ||||||
| for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { | ||||||
| try { | ||||||
| retryContext.sleepIfNeeded(); | ||||||
| } catch (InterruptedException e) { | ||||||
| // Ignore interruption | ||||||
| } | ||||||
| retryContext.sleepIfNeeded(); | ||||||
|
|
||||||
| Object value = handleBase.sendContent(); | ||||||
| if (value == null) { | ||||||
|
|
@@ -1209,7 +1209,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth | |||||
| } | ||||||
|
|
||||||
| if (isFirstRequest() && !isResendable && isStreaming(value)) { | ||||||
| int firstRequestDelay = makeFirstRequest(retryContext.getRetry()); | ||||||
| int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry()); | ||||||
| if (firstRequestDelay != 0) { | ||||||
| retryContext.calculateNextDelay(0, firstRequestDelay); | ||||||
| continue; | ||||||
|
|
@@ -1247,7 +1247,7 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth | |||||
| closeResponse(response); | ||||||
|
|
||||||
| if (!isResendable) { | ||||||
| checkFirstRequest(); | ||||||
| resetFirstRequestFlag(); | ||||||
| throw new ResourceNotResendableException( | ||||||
| "Cannot retry request for " + | ||||||
| ((uri != null) ? uri : "new document")); | ||||||
|
|
@@ -1348,24 +1348,20 @@ private TemporalDescriptor putPostDocumentImpl(RequestLogger reqlog, String meth | |||||
| requestBldr = addVersionHeader(desc, requestBldr, "If-Match"); | ||||||
| } | ||||||
|
|
||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); | ||||||
| Response response = null; | ||||||
| int status = -1; | ||||||
| Headers responseHeaders = null; | ||||||
| for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { | ||||||
| try { | ||||||
| retryContext.sleepIfNeeded(); | ||||||
| } catch (InterruptedException e) { | ||||||
| // Ignore interruption | ||||||
| } | ||||||
| retryContext.sleepIfNeeded(); | ||||||
|
|
||||||
| MultipartBody.Builder multiPart = new MultipartBody.Builder(); | ||||||
| boolean hasStreamingPart = addParts(multiPart, reqlog, | ||||||
| new String[]{metadataMimetype, contentMimetype}, | ||||||
| new AbstractWriteHandle[]{metadataHandle, contentHandle}); | ||||||
|
|
||||||
| if (isFirstRequest() && hasStreamingPart) { | ||||||
| int firstRequestDelay = makeFirstRequest(retryContext.getRetry()); | ||||||
| int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry()); | ||||||
| if (firstRequestDelay != 0) { | ||||||
| retryContext.calculateNextDelay(0, firstRequestDelay); | ||||||
| continue; | ||||||
|
|
@@ -1923,7 +1919,6 @@ public <T extends SearchReadHandle> T search(RequestLogger reqlog, T searchHandl | |||||
|
|
||||||
| addPointInTimeQueryParam(params, searchHandle); | ||||||
|
|
||||||
| @SuppressWarnings("rawtypes") | ||||||
| HandleImplementation searchBase = HandleAccessor.checkHandle(searchHandle, "search"); | ||||||
|
|
||||||
| Format searchFormat = searchBase.getFormat(); | ||||||
|
|
@@ -2112,15 +2107,11 @@ void init() { | |||||
| } | ||||||
|
|
||||||
| Response getResponse() { | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::checkFirstRequest); | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, OkHttpServices.this::resetFirstRequestFlag); | ||||||
| Response response = null; | ||||||
| int status = -1; | ||||||
| for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { | ||||||
| try { | ||||||
| retryContext.sleepIfNeeded(); | ||||||
| } catch (InterruptedException e) { | ||||||
| // Ignore interruption | ||||||
| } | ||||||
| retryContext.sleepIfNeeded(); | ||||||
|
|
||||||
| if (queryDef instanceof StructuredQueryDefinition && !(queryDef instanceof RawQueryDefinition)) { | ||||||
| response = doPost(reqlog, requestBldr, structure); | ||||||
|
|
@@ -2644,15 +2635,11 @@ private void putPostValueImpl(RequestLogger reqlog, String method, | |||||
| String connectPath = null; | ||||||
| Request.Builder requestBldr = null; | ||||||
|
|
||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); | ||||||
| Response response = null; | ||||||
| int status = -1; | ||||||
| for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { | ||||||
| try { | ||||||
| retryContext.sleepIfNeeded(); | ||||||
| } catch (InterruptedException e) { | ||||||
| // Ignore interruption | ||||||
| } | ||||||
| retryContext.sleepIfNeeded(); | ||||||
|
|
||||||
| Object nextValue = (handle != null) ? handle.sendContent() : value; | ||||||
|
|
||||||
|
|
@@ -2673,7 +2660,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method, | |||||
| boolean isResendable = (handle == null) ? !isStreaming : handle.isResendable(); | ||||||
|
|
||||||
| if (isFirstRequest() && !isResendable && isStreaming) { | ||||||
| int firstRequestDelay = makeFirstRequest(retryContext.getRetry()); | ||||||
| int firstRequestDelay = pingServerBeforeStreaming(retryContext.getRetry()); | ||||||
| if (firstRequestDelay != 0) { | ||||||
| retryContext.calculateNextDelay(0, firstRequestDelay); | ||||||
| continue; | ||||||
|
|
@@ -2720,7 +2707,7 @@ private void putPostValueImpl(RequestLogger reqlog, String method, | |||||
| closeResponse(response); | ||||||
|
|
||||||
| if (!isResendable) { | ||||||
| checkFirstRequest(); | ||||||
| resetFirstRequestFlag(); | ||||||
| throw new ResourceNotResendableException( | ||||||
| "Cannot retry request for " + connectPath); | ||||||
| } | ||||||
|
|
@@ -3029,7 +3016,7 @@ public <R extends AbstractReadHandle> R putResource(RequestLogger reqlog, | |||||
|
|
||||||
| Consumer<Boolean> resendableConsumer = (resendable) -> { | ||||||
| if (!isResendable) { | ||||||
| checkFirstRequest(); | ||||||
| resetFirstRequestFlag(); | ||||||
| throw new ResourceNotResendableException( | ||||||
| "Cannot retry request for " + path); | ||||||
| } | ||||||
|
|
@@ -3073,15 +3060,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R putResour | |||||
| String outputMimetype = outputBase.getMimetype(); | ||||||
| Class as = outputBase.receiveAs(); | ||||||
|
|
||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); | ||||||
| Response response = null; | ||||||
| int status = -1; | ||||||
| for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { | ||||||
| try { | ||||||
| retryContext.sleepIfNeeded(); | ||||||
| } catch (InterruptedException e) { | ||||||
| // Ignore interruption | ||||||
| } | ||||||
| retryContext.sleepIfNeeded(); | ||||||
|
|
||||||
| MultipartBody.Builder multiPart = new MultipartBody.Builder(); | ||||||
| boolean hasStreamingPart = addParts(multiPart, reqlog, input); | ||||||
|
|
@@ -3182,7 +3165,7 @@ public <R extends AbstractReadHandle> R postResource(RequestLogger reqlog, | |||||
| Consumer<Boolean> resendableConsumer = new Consumer<Boolean>() { | ||||||
| public void accept(Boolean resendable) { | ||||||
| if (!isResendable) { | ||||||
| checkFirstRequest(); | ||||||
| resetFirstRequestFlag(); | ||||||
| throw new ResourceNotResendableException("Cannot retry request for " + path); | ||||||
| } | ||||||
| } | ||||||
|
|
@@ -3240,15 +3223,11 @@ public <R extends AbstractReadHandle, W extends AbstractWriteHandle> R postResou | |||||
| String outputMimetype = outputBase != null ? outputBase.getMimetype() : null; | ||||||
| Class as = outputBase != null ? outputBase.receiveAs() : null; | ||||||
|
|
||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); | ||||||
| Response response = null; | ||||||
| int status = -1; | ||||||
| for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { | ||||||
| try { | ||||||
| retryContext.sleepIfNeeded(); | ||||||
| } catch (InterruptedException e) { | ||||||
| // Ignore interruption | ||||||
| } | ||||||
| retryContext.sleepIfNeeded(); | ||||||
|
|
||||||
| MultipartBody.Builder multiPart = new MultipartBody.Builder(); | ||||||
| boolean hasStreamingPart = addParts(multiPart, reqlog, null, input, requestHeaders); | ||||||
|
|
@@ -3778,7 +3757,7 @@ private <U extends OkHttpResultIterator> U postIteratedResourceImpl( | |||||
|
|
||||||
| Consumer<Boolean> resendableConsumer = resendable -> { | ||||||
| if (!isResendable) { | ||||||
| checkFirstRequest(); | ||||||
| resetFirstRequestFlag(); | ||||||
| throw new ResourceNotResendableException( | ||||||
| "Cannot retry request for " + path); | ||||||
| } | ||||||
|
|
@@ -3843,15 +3822,11 @@ private <W extends AbstractWriteHandle, U extends OkHttpResultIterator> U postIt | |||||
| throws ResourceNotFoundException, ResourceNotResendableException, ForbiddenUserException, FailedRequestException { | ||||||
| if (params == null) params = new RequestParameters(); | ||||||
| if (transaction != null) params.add("txid", transaction.getTransactionId()); | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); | ||||||
| Response response = null; | ||||||
| int status = -1; | ||||||
| for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { | ||||||
| try { | ||||||
| retryContext.sleepIfNeeded(); | ||||||
| } catch (InterruptedException e) { | ||||||
| // Ignore interruption | ||||||
| } | ||||||
| retryContext.sleepIfNeeded(); | ||||||
|
|
||||||
| MultipartBody.Builder multiPart = new MultipartBody.Builder(); | ||||||
| boolean hasStreamingPart = addParts(multiPart, reqlog, input); | ||||||
|
|
@@ -3966,7 +3941,7 @@ private Request.Builder makePutWebResource(String path, | |||||
| private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object value) { | ||||||
| if (value == null) throw new IllegalArgumentException("Resource write with null value"); | ||||||
|
|
||||||
| if (isFirstRequest() && isStreaming(value)) makeFirstRequest(0); | ||||||
| if (isFirstRequest() && isStreaming(value)) pingServerBeforeStreaming(0); | ||||||
|
|
||||||
| MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE)); | ||||||
| if (value instanceof OutputStreamSender) { | ||||||
|
|
@@ -3987,7 +3962,7 @@ private Response doPut(RequestLogger reqlog, Request.Builder requestBldr, Object | |||||
|
|
||||||
| private Response doPut(Request.Builder requestBldr, | ||||||
| MultipartBody.Builder multiPart, boolean hasStreamingPart) { | ||||||
| if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0); | ||||||
| if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0); | ||||||
|
|
||||||
| requestBldr = requestBldr.put(multiPart.build()); | ||||||
| Response response = sendRequestOnce(requestBldr); | ||||||
|
|
@@ -4007,7 +3982,7 @@ private Request.Builder makePostWebResource(String path, RequestParameters param | |||||
|
|
||||||
| private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Object value) { | ||||||
| if (isFirstRequest() && isStreaming(value)) { | ||||||
| makeFirstRequest(0); | ||||||
| pingServerBeforeStreaming(0); | ||||||
| } | ||||||
|
|
||||||
| MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE)); | ||||||
|
|
@@ -4034,7 +4009,7 @@ private Response doPost(RequestLogger reqlog, Request.Builder requestBldr, Objec | |||||
|
|
||||||
| private Response doPost(Request.Builder requestBldr, | ||||||
| MultipartBody.Builder multiPart, boolean hasStreamingPart) { | ||||||
| if (isFirstRequest() && hasStreamingPart) makeFirstRequest(0); | ||||||
| if (isFirstRequest() && hasStreamingPart) pingServerBeforeStreaming(0); | ||||||
|
|
||||||
| Response response = sendRequestOnce(requestBldr.post(multiPart.build())); | ||||||
|
|
||||||
|
|
@@ -4953,17 +4928,11 @@ public InputStream match(QueryDefinition queryDef, | |||||
| } | ||||||
| requestBldr = addTelemetryAgentId(requestBldr); | ||||||
|
|
||||||
| MediaType mediaType = makeType(requestBldr.build().header(HEADER_CONTENT_TYPE)); | ||||||
|
|
||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::checkFirstRequest); | ||||||
| RetryContext retryContext = new RetryContext(logger, RETRYABLE_STATUS_CODES, this::resetFirstRequestFlag); | ||||||
| Response response = null; | ||||||
| int status = -1; | ||||||
| for (; retryContext.shouldContinueRetrying(minRetryAttempts, maxDelayForRetries); retryContext.incrementRetry()) { | ||||||
| try { | ||||||
| retryContext.sleepIfNeeded(); | ||||||
| } catch (InterruptedException e) { | ||||||
| // Ignore interruption | ||||||
| } | ||||||
| retryContext.sleepIfNeeded(); | ||||||
|
|
||||||
| if (queryDef instanceof StructuredQueryDefinition) { | ||||||
| response = doPost(null, requestBldr, structure); | ||||||
|
|
@@ -5599,14 +5568,32 @@ private void executeRequest(CallResponseImpl responseImpl) { | |||||
| boolean hasStreamingPart = hasStreamingPart(); | ||||||
| Consumer<Boolean> resendableConsumer = resendable -> { | ||||||
| if (hasStreamingPart) { | ||||||
| checkFirstRequest(); | ||||||
| resetFirstRequestFlag(); | ||||||
| throw new ResourceNotResendableException( | ||||||
| "Cannot retry request for " + getEndpoint()); | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| Function<Request.Builder, Response> sendRequestFunction = requestBldr -> { | ||||||
| if (isFirstRequest() && hasStreamingPart) makeFirstRequest(callBaseUri, "", 0); | ||||||
| if (isFirstRequest() && hasStreamingPart) { | ||||||
| // Ping the server before streaming; if unavailable, wait and retry the ping | ||||||
| int pingDelay = pingServer(callBaseUri, "", 0); | ||||||
| int pingRetries = 0; | ||||||
| int maxPingRetries = 10; // Prevent infinite loop | ||||||
| while (pingDelay > 0 && pingRetries < maxPingRetries && !Thread.currentThread().isInterrupted()) { | ||||||
| try { | ||||||
| Thread.sleep(pingDelay); | ||||||
| } catch (InterruptedException e) { | ||||||
| Thread.currentThread().interrupt(); | ||||||
| throw new RuntimeException("Interrupted while waiting to ping server before streaming", e); | ||||||
|
||||||
| throw new RuntimeException("Interrupted while waiting to ping server before streaming", e); | |
| throw new MarkLogicIOException("Interrupted while waiting to ping server before streaming", e); |
rjrudin marked this conversation as resolved.
Show resolved
Hide resolved
Copilot
AI
Mar 3, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the ping retry loop, pingServer(callBaseUri, "", 0) is called on every iteration with a constant retry value of 0, so calculateDelay(retry) never backs off across attempts. Consider passing pingRetries (or similar) into pingServer so delay increases with each ping attempt and aligns with the existing retry/backoff behavior elsewhere in this class.
Uh oh!
There was an error while loading. Please reload this page.