From bf737fa1bb604f3cd98f65d566bbaa0509036ee8 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 6 Apr 2026 15:41:52 +0800 Subject: [PATCH 1/2] Expose batchReadUnconfirmedAsync to ReadHandle.java --- .../bookkeeper/client/LedgerHandle.java | 33 ++++++++ .../bookkeeper/client/api/ReadHandle.java | 16 ++++ .../client/MockBookKeeperTestCase.java | 80 +++++++++++++++++++ .../client/api/BookKeeperApiTest.java | 39 +++++++++ 4 files changed, 168 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 7d1f67a2e4b..6c88ccfd9d2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -941,6 +941,39 @@ public CompletableFuture batchReadAsync(long startEntry, int maxC return future; } + @Override + public CompletableFuture batchReadUnconfirmedAsync(long startEntry, int maxCount, long maxSize) { + // Little sanity check + if (startEntry < 0 || maxCount < 0 || maxSize < 0) { + LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} when batch read", ledgerId, startEntry); + return FutureUtils.exception(new BKIncorrectParameterException()); + } + if (notSupportBatchRead()) { + long lastEntry = startEntry + maxCount - 1; + return readUnconfirmedAsync(startEntry, lastEntry); + } + + CompletableFuture future = new CompletableFuture<>(); + if (!clientCtx.isClientClosed()) { + batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false) + .whenCompleteAsync((entries, t) -> { + if (t != null) { + if (t instanceof BKException) { + BKException bke = (BKException) t; + future.completeExceptionally(bke); + } else { + future.completeExceptionally(BKException.create(Code.UnexpectedConditionException)); + } + } else { + future.complete(entries); + } + }, clientCtx.getMainWorkerPool().chooseThread(ledgerId)); + } else { + future.completeExceptionally(BKException.create(ClientClosedException)); + } + return future; + } + private boolean notSupportBatchRead() { if (!clientCtx.getConf().batchReadEnabled) { return true; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java index 8e2e633a35a..18ac4c73fca 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java @@ -91,6 +91,22 @@ default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize) return FutureUtils.result(batchReadAsync(startEntry, maxCount, maxSize), BKException.HANDLER); } + + /** + * Read a sequence of entries in batch asynchronously, allowing to read after the LastAddConfirmed range. + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @return he results of the operation + */ + default CompletableFuture batchReadUnconfirmedAsync(long startEntry, int maxCount, long maxSize) { + return readUnconfirmedAsync(startEntry, startEntry + maxCount - 1); + } + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. *
This is the same of diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java index b1def134d45..c58e49b0b61 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -247,6 +248,7 @@ public ByteBufAllocator getByteBufAllocator() { setupBookieWatcherForNewEnsemble(); setupBookieWatcherForEnsembleChange(); setupBookieClientReadEntry(); + setupBookieClientBatchReadEntry(); setupBookieClientReadLac(); setupBookieClientAddEntry(); setupBookieClientForceLedger(); @@ -546,6 +548,84 @@ protected void setupBookieClientReadEntry() { any(), anyInt(), any(), anyBoolean()); } + protected void setupBookieClientBatchReadEntry() { + final Stubber stub = doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + BookieId bookieSocketAddress = (BookieId) args[0]; + long ledgerId = (Long) args[1]; + long startEntryId = (Long) args[2]; + int maxCount = (Integer) args[3]; + long maxSize = (Long) args[4]; + BookkeeperInternalCallbacks.BatchedReadEntryCallback callback = + (BookkeeperInternalCallbacks.BatchedReadEntryCallback) args[5]; + Object ctx = args[6]; + boolean fenced = (((Integer) args[7]) & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING; + + executor.executeOrdered(ledgerId, () -> { + DigestManager macManager = null; + try { + macManager = getDigestType(ledgerId); + } catch (GeneralSecurityException gse){ + LOG.error("Initialize macManager fail", gse); + } + + Map mockEntries = new HashMap<>(); + for (long entryId = startEntryId; entryId < startEntryId + maxCount; entryId++) { + MockEntry mockEntry; + try { + mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId); + if (mockEntry == null) { + break; + } + mockEntries.put(entryId, mockEntry); + } catch (BKException bke) { + LOG.info("batchReadEntryAndFenceLedger - occur BKException {}@{} at {}", entryId, ledgerId, + bookieSocketAddress); + if (mockEntries.isEmpty()) { + break; + } + callback.readEntriesComplete(bke.getCode(), ledgerId, startEntryId, null, ctx); + } + + } + + if (fenced) { + fencedLedgers.add(ledgerId); + } + + ByteBufList bufList = ByteBufList.get(); + long totalBytes = 0; + if (mockEntries.isEmpty()) { + callback.readEntriesComplete(BKException.Code.NoSuchEntryException, ledgerId, startEntryId, null, + ctx); + } else { + for (Map.Entry kv : mockEntries.entrySet()) { + long entryId = kv.getKey(); + MockEntry mockEntry = kv.getValue(); + ReferenceCounted entry = macManager.computeDigestAndPackageForSending(entryId, + mockEntry.lastAddConfirmed, mockEntry.payload.length, + Unpooled.wrappedBuffer(mockEntry.payload), new byte[20], 0); + ByteBuf entryData = MockBookieClient.copyData(entry); + totalBytes += entryData.readableBytes(); + if (totalBytes > maxSize) { + break; + } + bufList.add(entryData); + } + callback.readEntriesComplete(BKException.Code.OK, ledgerId, startEntryId, bufList, ctx); + } + }); + return null; + }); + + stub.when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt()); + stub.when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt(), any()); + stub.when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt(), any(), anyBoolean()); + } + @SuppressWarnings("unchecked") protected void setupBookieClientReadLac() { final Stubber stub = doAnswer(invocation -> { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java index 75ba8e30b9b..6cd3090d238 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java @@ -429,6 +429,45 @@ public void testLedgerEntriesIterable() throws Exception { } } + @Test + public void testBatchedReadUnconfirmedAsync() throws Exception { + long lId; + try (WriteHandle writer = newCreateLedgerOp() + .withAckQuorumSize(2) + .withWriteQuorumSize(2) + .withEnsembleSize(2) + .withPassword(password) + .execute().get()) { + lId = writer.getId(); + // write data and populate LastAddConfirmed + writer.append(ByteBuffer.wrap(data)); + writer.append(ByteBuffer.wrap(data)); + writer.append(ByteBuffer.wrap(data)); + } + + try (ReadHandle reader = newOpenLedgerOp() + .withPassword(password) + .withRecovery(false) + .withLedgerId(lId) + .execute().get()) { + long lac = reader.getLastAddConfirmed(); + assertEquals(2, lac); + + try (LedgerEntries entries = reader.batchReadUnconfirmedAsync(0, 5, 5 * 1024 * 1024).get()) { + AtomicLong i = new AtomicLong(0); + for (LedgerEntry e : entries) { + assertEquals(i.getAndIncrement(), e.getEntryId()); + assertArrayEquals(data, e.getEntryBytes()); + } + i.set(0); + entries.forEach((e) -> { + assertEquals(i.getAndIncrement(), e.getEntryId()); + assertArrayEquals(data, e.getEntryBytes()); + }); + } + } + } + @Test public void testBKExceptionCodeLogger() { assertEquals("OK: No problem", BKException.codeLogger(0).toString()); From a27657309d22a4a4aebc5f99e4e3f009241b3014 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 9 Apr 2026 15:50:09 +0800 Subject: [PATCH 2/2] Address review comments --- .../bookkeeper/client/LedgerHandle.java | 62 +++++--- .../client/MockBookKeeperTestCase.java | 12 +- .../client/api/BookKeeperApiTest.java | 149 +++++++++++++++--- 3 files changed, 179 insertions(+), 44 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 6c88ccfd9d2..bc57646cce5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -948,32 +948,58 @@ public CompletableFuture batchReadUnconfirmedAsync(long startEntr LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} when batch read", ledgerId, startEntry); return FutureUtils.exception(new BKIncorrectParameterException()); } + + long lastEntry = startEntry + maxCount - 1; if (notSupportBatchRead()) { - long lastEntry = startEntry + maxCount - 1; return readUnconfirmedAsync(startEntry, lastEntry); } + if (clientCtx.isClientClosed()) { + return FutureUtils.exception(BKException.create(ClientClosedException)); + } CompletableFuture future = new CompletableFuture<>(); - if (!clientCtx.isClientClosed()) { - batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false) - .whenCompleteAsync((entries, t) -> { - if (t != null) { - if (t instanceof BKException) { - BKException bke = (BKException) t; - future.completeExceptionally(bke); - } else { - future.completeExceptionally(BKException.create(Code.UnexpectedConditionException)); - } - } else { - future.complete(entries); - } - }, clientCtx.getMainWorkerPool().chooseThread(ledgerId)); - } else { - future.completeExceptionally(BKException.create(ClientClosedException)); - } + batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false) + .whenCompleteAsync((entries, error) -> completeBatchReadUnconfirmed( + startEntry, lastEntry, entries, error, future), + clientCtx.getMainWorkerPool().chooseThread(ledgerId)); return future; } + private void completeBatchReadUnconfirmed(long startEntry, + long lastEntry, + LedgerEntries entries, + Throwable error, + CompletableFuture future) { + if (error == null) { + future.complete(entries); + return; + } + + if (error instanceof BKException.BKBookieHandleNotAvailableException) { + notSupportBatch = true; + readUnconfirmedAsync(startEntry, lastEntry) + .whenComplete((fallbackEntries, fallbackError) -> + completeBatchReadUnconfirmedResult(fallbackEntries, fallbackError, future)); + return; + } + + completeBatchReadUnconfirmedResult(null, error, future); + } + + private void completeBatchReadUnconfirmedResult(LedgerEntries entries, + Throwable error, + CompletableFuture future) { + if (error == null) { + future.complete(entries); + return; + } + if (error instanceof BKException) { + future.completeExceptionally(error); + return; + } + future.completeExceptionally(BKException.create(Code.UnexpectedConditionException)); + } + private boolean notSupportBatchRead() { if (!clientCtx.getConf().batchReadEnabled) { return true; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java index c58e49b0b61..3090cd51349 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java @@ -40,8 +40,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -569,8 +569,8 @@ protected void setupBookieClientBatchReadEntry() { LOG.error("Initialize macManager fail", gse); } - Map mockEntries = new HashMap<>(); - for (long entryId = startEntryId; entryId < startEntryId + maxCount; entryId++) { + Map mockEntries = new LinkedHashMap<>(); + for (long entryId = startEntryId; maxCount <= 0 || entryId < startEntryId + maxCount; entryId++) { MockEntry mockEntry; try { mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId); @@ -581,10 +581,8 @@ protected void setupBookieClientBatchReadEntry() { } catch (BKException bke) { LOG.info("batchReadEntryAndFenceLedger - occur BKException {}@{} at {}", entryId, ledgerId, bookieSocketAddress); - if (mockEntries.isEmpty()) { - break; - } callback.readEntriesComplete(bke.getCode(), ledgerId, startEntryId, null, ctx); + return; } } @@ -606,8 +604,10 @@ protected void setupBookieClientBatchReadEntry() { mockEntry.lastAddConfirmed, mockEntry.payload.length, Unpooled.wrappedBuffer(mockEntry.payload), new byte[20], 0); ByteBuf entryData = MockBookieClient.copyData(entry); + entry.release(); totalBytes += entryData.readableBytes(); if (totalBytes > maxSize) { + entryData.release(); break; } bufList.add(entryData); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java index 6cd3090d238..b8eebb51808 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java @@ -32,6 +32,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; import io.netty.buffer.Unpooled; import java.nio.ByteBuffer; @@ -39,13 +44,18 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BKException.BKClientClosedException; import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.client.BKException.BKDuplicateEntryIdException; +import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException; import org.apache.bookkeeper.client.BKException.BKLedgerFencedException; +import org.apache.bookkeeper.client.BKException.BKNoSuchEntryException; import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsOnMetadataServerException; import org.apache.bookkeeper.client.BKException.BKUnauthorizedAccessException; import org.apache.bookkeeper.client.MockBookKeeperTestCase; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; import org.apache.bookkeeper.util.LoggerOutput; import org.junit.Rule; import org.junit.jupiter.api.Test; @@ -72,6 +82,59 @@ private static void checkEntries(LedgerEntries entries, byte[] data) } } + private long createLedgerWithEntries(int numEntries) throws Exception { + try (WriteHandle writer = result(newCreateLedgerOp() + .withAckQuorumSize(2) + .withWriteQuorumSize(2) + .withEnsembleSize(2) + .withPassword(password) + .execute())) { + long ledgerId = writer.getId(); + for (int i = 0; i < numEntries; i++) { + writer.append(ByteBuffer.wrap(data)); + } + return ledgerId; + } + } + + private ReadHandle openReadHandle(long ledgerId) throws Exception { + return result(newOpenLedgerOp() + .withPassword(password) + .withRecovery(false) + .withLedgerId(ledgerId) + .execute()); + } + + private void failBatchReadRequests(int rc) { + doAnswer(invocation -> { + BookkeeperInternalCallbacks.BatchedReadEntryCallback callback = invocation.getArgument(5); + long ledgerId = invocation.getArgument(1); + long startEntryId = invocation.getArgument(2); + Object ctx = invocation.getArgument(6); + callback.readEntriesComplete(rc, ledgerId, startEntryId, null, ctx); + return null; + }).when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt()); + doAnswer(invocation -> { + BookkeeperInternalCallbacks.BatchedReadEntryCallback callback = invocation.getArgument(5); + long ledgerId = invocation.getArgument(1); + long startEntryId = invocation.getArgument(2); + Object ctx = invocation.getArgument(6); + callback.readEntriesComplete(rc, ledgerId, startEntryId, null, ctx); + return null; + }).when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt(), any()); + doAnswer(invocation -> { + BookkeeperInternalCallbacks.BatchedReadEntryCallback callback = invocation.getArgument(5); + long ledgerId = invocation.getArgument(1); + long startEntryId = invocation.getArgument(2); + Object ctx = invocation.getArgument(6); + callback.readEntriesComplete(rc, ledgerId, startEntryId, null, ctx); + return null; + }).when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt(), any(), anyBoolean()); + } + @Test public void testWriteHandle() throws Exception { try (WriteHandle writer = result(newCreateLedgerOp() @@ -431,29 +494,12 @@ public void testLedgerEntriesIterable() throws Exception { @Test public void testBatchedReadUnconfirmedAsync() throws Exception { - long lId; - try (WriteHandle writer = newCreateLedgerOp() - .withAckQuorumSize(2) - .withWriteQuorumSize(2) - .withEnsembleSize(2) - .withPassword(password) - .execute().get()) { - lId = writer.getId(); - // write data and populate LastAddConfirmed - writer.append(ByteBuffer.wrap(data)); - writer.append(ByteBuffer.wrap(data)); - writer.append(ByteBuffer.wrap(data)); - } - - try (ReadHandle reader = newOpenLedgerOp() - .withPassword(password) - .withRecovery(false) - .withLedgerId(lId) - .execute().get()) { + long lId = createLedgerWithEntries(3); + try (ReadHandle reader = openReadHandle(lId)) { long lac = reader.getLastAddConfirmed(); assertEquals(2, lac); - try (LedgerEntries entries = reader.batchReadUnconfirmedAsync(0, 5, 5 * 1024 * 1024).get()) { + try (LedgerEntries entries = result(reader.batchReadUnconfirmedAsync(0, 5, 5 * 1024 * 1024))) { AtomicLong i = new AtomicLong(0); for (LedgerEntry e : entries) { assertEquals(i.getAndIncrement(), e.getEntryId()); @@ -468,6 +514,69 @@ public void testBatchedReadUnconfirmedAsync() throws Exception { } } + @Test + public void testBatchedReadUnconfirmedAsyncRejectsInvalidArguments() throws Exception { + long lId = createLedgerWithEntries(3); + try (ReadHandle reader = openReadHandle(lId)) { + assertThrows(BKIncorrectParameterException.class, + () -> result(reader.batchReadUnconfirmedAsync(-1, 1, 1024))); + assertThrows(BKIncorrectParameterException.class, + () -> result(reader.batchReadUnconfirmedAsync(0, -1, 1024))); + assertThrows(BKIncorrectParameterException.class, + () -> result(reader.batchReadUnconfirmedAsync(0, 1, -1))); + } + } + + @Test + public void testBatchedReadUnconfirmedAsyncPropagatesNoSuchEntryException() throws Exception { + long lId = createLedgerWithEntries(3); + try (ReadHandle reader = openReadHandle(lId)) { + assertThrows(BKNoSuchEntryException.class, + () -> result(reader.batchReadUnconfirmedAsync(5, 1, 1024))); + } + } + + @Test + public void testBatchedReadUnconfirmedAsyncFailsIfClientClosed() throws Exception { + long lId = createLedgerWithEntries(3); + try (ReadHandle reader = openReadHandle(lId)) { + closeBookkeeper(); + assertThrows(BKClientClosedException.class, + () -> result(reader.batchReadUnconfirmedAsync(0, 1, 1024))); + } + } + + @Test + public void testBatchedReadUnconfirmedAsyncFallsBackWhenBatchReadIsUnsupported() throws Exception { + long lId = createLedgerWithEntries(3); + failBatchReadRequests(BKException.Code.BookieHandleNotAvailableException); + + try (ReadHandle reader = openReadHandle(lId); + LedgerEntries entries = result(reader.batchReadUnconfirmedAsync(0, 3, 5 * 1024 * 1024))) { + AtomicLong i = new AtomicLong(0); + for (LedgerEntry e : entries) { + assertEquals(i.getAndIncrement(), e.getEntryId()); + assertArrayEquals(data, e.getEntryBytes()); + } + assertEquals(3, i.get()); + } + } + + @Test + public void testBatchedReadUnconfirmedAsyncTreatsZeroMaxCountAsUnlimited() throws Exception { + long lId = createLedgerWithEntries(3); + + try (ReadHandle reader = openReadHandle(lId); + LedgerEntries entries = result(reader.batchReadUnconfirmedAsync(0, 0, 5 * 1024 * 1024))) { + AtomicLong i = new AtomicLong(0); + for (LedgerEntry e : entries) { + assertEquals(i.getAndIncrement(), e.getEntryId()); + assertArrayEquals(data, e.getEntryBytes()); + } + assertEquals(3, i.get()); + } + } + @Test public void testBKExceptionCodeLogger() { assertEquals("OK: No problem", BKException.codeLogger(0).toString());