-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Summary
- Context:
HybridVectorServiceis responsible for all write operations (upsert, delete, update) and some read operations (count, scroll) in the Qdrant vector store using the gRPC client. - Bug: The retry logic in
doUpsert,doDeleteByUrl, anddoCountPointsForUrlis broken because the asynchronous operation (and its associatedListenableFuture) is initiated outside the retry loop. - Actual vs. expected: When a transient error occurs, the service retries calling
.get()on the same failed future object, which immediately re-throws the same exception; it should instead re-initiate the asynchronous call (e.g.,qdrantClient.upsertAsync) inside the retry lambda to create a new future for each attempt. - Impact: Transient network glitches or temporary Qdrant unavailability will cause ingestion and count operations to fail immediately despite being wrapped in retry logic, leading to unstable data synchronization.
Code with bug
// src/main/java/com/williamcallahan/javachat/service/HybridVectorService.java
private void doUpsert(String collectionName, List<Document> documents) {
// ... vector preparation logic ...
// BUG: Future is created ONCE here, outside the retry block 🔴
var upsertFuture = qdrantClient.upsertAsync(Objects.requireNonNull(collectionName), points);
RetrySupport.executeWithRetry(
() -> QdrantFutureAwaiter.awaitFuture(upsertFuture, UPSERT_TIMEOUT_SECONDS), "Qdrant hybrid upsert");
log.info("[QDRANT] Upserted {} hybrid points", points.size());
}// src/main/java/com/williamcallahan/javachat/service/HybridVectorService.java
private void doDeleteByUrl(String collectionName, String url) {
// ... filter preparation ...
// BUG: Future is created ONCE here, outside the retry block 🔴
var deleteFuture =
qdrantClient.deleteAsync(Objects.requireNonNull(collectionName), Objects.requireNonNull(filter));
RetrySupport.executeWithRetry(
() -> QdrantFutureAwaiter.awaitFuture(deleteFuture, DELETE_TIMEOUT_SECONDS), "Qdrant delete by URL");
log.debug("[QDRANT] Deleted points by URL filter");
}Evidence
1. Unit Test Reproduction
A reproduction test was created using Mockito to simulate a transient "service unavailable" error from Qdrant. The test confirmed that upsertAsync is only invoked once, even though RetrySupport is configured for 3 attempts and correctly identifies the error as transient.
@Test
void upsertDoesNotRetryAsyncCallBecauseFutureIsCreatedOutsideRetryBlock() {
// ... setup mock behavior to return failed future ...
RuntimeException transientError = new RuntimeException("service unavailable");
when(qdrantClient.upsertAsync(any(), anyList()))
.thenReturn(Futures.immediateFailedFuture(transientError));
assertThrows(IllegalStateException.class, () ->
hybridVectorService.upsert(QdrantCollectionKind.DOCS, documents));
// This verification fails if we expect 3 calls; it only ever calls once.
verify(qdrantClient, times(1)).upsertAsync(any(), anyList());
}2. Contrast with Correct Implementation
Other methods in the same class correctly place the async call inside the retry lambda. For example, scrollAllUrlsInCollection and updatePayloadByFilter are correctly implemented:
// Correct implementation in the same file:
RetrySupport.executeWithRetry(
() -> QdrantFutureAwaiter.awaitFuture(
qdrantClient.setPayloadAsync(...), // FIX: Re-creates future on each retry 🟢
SET_PAYLOAD_TIMEOUT_SECONDS),
"Qdrant set payload by filter");Why has this bug gone undetected?
The bug went undetected because Qdrant is typically highly stable in local and production environments, making transient failures rare. When failures do occur, the RetrySupport log messages still appear ("failed with transient error... retrying"), but because future.get() returns the cached failure instantly, the "retries" happen in a few milliseconds without actually hitting the network, making it look like the service is simply failing persistently.
Recommended fix
Move the qdrantClient async call into the lambda passed to RetrySupport.executeWithRetry.
private void doUpsert(String collectionName, List<Document> documents) {
// ...
RetrySupport.executeWithRetry(
() -> QdrantFutureAwaiter.awaitFuture(
qdrantClient.upsertAsync(collectionName, points), // <-- FIX 🟢
UPSERT_TIMEOUT_SECONDS),
"Qdrant hybrid upsert");
}