diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 7d1f67a2e4b..bc57646cce5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -941,6 +941,65 @@ public CompletableFuture batchReadAsync(long startEntry, int maxC return future; } + @Override + public CompletableFuture batchReadUnconfirmedAsync(long startEntry, int maxCount, long maxSize) { + // Little sanity check + if (startEntry < 0 || maxCount < 0 || maxSize < 0) { + LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} when batch read", ledgerId, startEntry); + return FutureUtils.exception(new BKIncorrectParameterException()); + } + + long lastEntry = startEntry + maxCount - 1; + if (notSupportBatchRead()) { + return readUnconfirmedAsync(startEntry, lastEntry); + } + if (clientCtx.isClientClosed()) { + return FutureUtils.exception(BKException.create(ClientClosedException)); + } + + CompletableFuture future = new CompletableFuture<>(); + batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false) + .whenCompleteAsync((entries, error) -> completeBatchReadUnconfirmed( + startEntry, lastEntry, entries, error, future), + clientCtx.getMainWorkerPool().chooseThread(ledgerId)); + return future; + } + + private void completeBatchReadUnconfirmed(long startEntry, + long lastEntry, + LedgerEntries entries, + Throwable error, + CompletableFuture future) { + if (error == null) { + future.complete(entries); + return; + } + + if (error instanceof BKException.BKBookieHandleNotAvailableException) { + notSupportBatch = true; + readUnconfirmedAsync(startEntry, lastEntry) + .whenComplete((fallbackEntries, fallbackError) -> + completeBatchReadUnconfirmedResult(fallbackEntries, fallbackError, future)); + return; + } + + completeBatchReadUnconfirmedResult(null, error, future); + } + + private void completeBatchReadUnconfirmedResult(LedgerEntries entries, + Throwable error, + CompletableFuture future) { + if (error == null) { + future.complete(entries); + return; + } + if (error instanceof BKException) { + future.completeExceptionally(error); + return; + } + future.completeExceptionally(BKException.create(Code.UnexpectedConditionException)); + } + private boolean notSupportBatchRead() { if (!clientCtx.getConf().batchReadEnabled) { return true; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java index 8e2e633a35a..18ac4c73fca 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java @@ -91,6 +91,22 @@ default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize) return FutureUtils.result(batchReadAsync(startEntry, maxCount, maxSize), BKException.HANDLER); } + + /** + * Read a sequence of entries in batch asynchronously, allowing to read after the LastAddConfirmed range. + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @return he results of the operation + */ + default CompletableFuture batchReadUnconfirmedAsync(long startEntry, int maxCount, long maxSize) { + return readUnconfirmedAsync(startEntry, startEntry + maxCount - 1); + } + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. *
This is the same of diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java index b1def134d45..3090cd51349 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -247,6 +248,7 @@ public ByteBufAllocator getByteBufAllocator() { setupBookieWatcherForNewEnsemble(); setupBookieWatcherForEnsembleChange(); setupBookieClientReadEntry(); + setupBookieClientBatchReadEntry(); setupBookieClientReadLac(); setupBookieClientAddEntry(); setupBookieClientForceLedger(); @@ -546,6 +548,84 @@ protected void setupBookieClientReadEntry() { any(), anyInt(), any(), anyBoolean()); } + protected void setupBookieClientBatchReadEntry() { + final Stubber stub = doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + BookieId bookieSocketAddress = (BookieId) args[0]; + long ledgerId = (Long) args[1]; + long startEntryId = (Long) args[2]; + int maxCount = (Integer) args[3]; + long maxSize = (Long) args[4]; + BookkeeperInternalCallbacks.BatchedReadEntryCallback callback = + (BookkeeperInternalCallbacks.BatchedReadEntryCallback) args[5]; + Object ctx = args[6]; + boolean fenced = (((Integer) args[7]) & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING; + + executor.executeOrdered(ledgerId, () -> { + DigestManager macManager = null; + try { + macManager = getDigestType(ledgerId); + } catch (GeneralSecurityException gse){ + LOG.error("Initialize macManager fail", gse); + } + + Map mockEntries = new LinkedHashMap<>(); + for (long entryId = startEntryId; maxCount <= 0 || entryId < startEntryId + maxCount; entryId++) { + MockEntry mockEntry; + try { + mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId); + if (mockEntry == null) { + break; + } + mockEntries.put(entryId, mockEntry); + } catch (BKException bke) { + LOG.info("batchReadEntryAndFenceLedger - occur BKException {}@{} at {}", entryId, ledgerId, + bookieSocketAddress); + callback.readEntriesComplete(bke.getCode(), ledgerId, startEntryId, null, ctx); + return; + } + + } + + if (fenced) { + fencedLedgers.add(ledgerId); + } + + ByteBufList bufList = ByteBufList.get(); + long totalBytes = 0; + if (mockEntries.isEmpty()) { + callback.readEntriesComplete(BKException.Code.NoSuchEntryException, ledgerId, startEntryId, null, + ctx); + } else { + for (Map.Entry kv : mockEntries.entrySet()) { + long entryId = kv.getKey(); + MockEntry mockEntry = kv.getValue(); + ReferenceCounted entry = macManager.computeDigestAndPackageForSending(entryId, + mockEntry.lastAddConfirmed, mockEntry.payload.length, + Unpooled.wrappedBuffer(mockEntry.payload), new byte[20], 0); + ByteBuf entryData = MockBookieClient.copyData(entry); + entry.release(); + totalBytes += entryData.readableBytes(); + if (totalBytes > maxSize) { + entryData.release(); + break; + } + bufList.add(entryData); + } + callback.readEntriesComplete(BKException.Code.OK, ledgerId, startEntryId, bufList, ctx); + } + }); + return null; + }); + + stub.when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt()); + stub.when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt(), any()); + stub.when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt(), any(), anyBoolean()); + } + @SuppressWarnings("unchecked") protected void setupBookieClientReadLac() { final Stubber stub = doAnswer(invocation -> { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java index 75ba8e30b9b..b8eebb51808 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperApiTest.java @@ -32,6 +32,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; import io.netty.buffer.Unpooled; import java.nio.ByteBuffer; @@ -39,13 +44,18 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BKException.BKClientClosedException; import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.client.BKException.BKDuplicateEntryIdException; +import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException; import org.apache.bookkeeper.client.BKException.BKLedgerFencedException; +import org.apache.bookkeeper.client.BKException.BKNoSuchEntryException; import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsOnMetadataServerException; import org.apache.bookkeeper.client.BKException.BKUnauthorizedAccessException; import org.apache.bookkeeper.client.MockBookKeeperTestCase; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; import org.apache.bookkeeper.util.LoggerOutput; import org.junit.Rule; import org.junit.jupiter.api.Test; @@ -72,6 +82,59 @@ private static void checkEntries(LedgerEntries entries, byte[] data) } } + private long createLedgerWithEntries(int numEntries) throws Exception { + try (WriteHandle writer = result(newCreateLedgerOp() + .withAckQuorumSize(2) + .withWriteQuorumSize(2) + .withEnsembleSize(2) + .withPassword(password) + .execute())) { + long ledgerId = writer.getId(); + for (int i = 0; i < numEntries; i++) { + writer.append(ByteBuffer.wrap(data)); + } + return ledgerId; + } + } + + private ReadHandle openReadHandle(long ledgerId) throws Exception { + return result(newOpenLedgerOp() + .withPassword(password) + .withRecovery(false) + .withLedgerId(ledgerId) + .execute()); + } + + private void failBatchReadRequests(int rc) { + doAnswer(invocation -> { + BookkeeperInternalCallbacks.BatchedReadEntryCallback callback = invocation.getArgument(5); + long ledgerId = invocation.getArgument(1); + long startEntryId = invocation.getArgument(2); + Object ctx = invocation.getArgument(6); + callback.readEntriesComplete(rc, ledgerId, startEntryId, null, ctx); + return null; + }).when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt()); + doAnswer(invocation -> { + BookkeeperInternalCallbacks.BatchedReadEntryCallback callback = invocation.getArgument(5); + long ledgerId = invocation.getArgument(1); + long startEntryId = invocation.getArgument(2); + Object ctx = invocation.getArgument(6); + callback.readEntriesComplete(rc, ledgerId, startEntryId, null, ctx); + return null; + }).when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt(), any()); + doAnswer(invocation -> { + BookkeeperInternalCallbacks.BatchedReadEntryCallback callback = invocation.getArgument(5); + long ledgerId = invocation.getArgument(1); + long startEntryId = invocation.getArgument(2); + Object ctx = invocation.getArgument(6); + callback.readEntriesComplete(rc, ledgerId, startEntryId, null, ctx); + return null; + }).when(bookieClient).batchReadEntries(any(BookieId.class), anyLong(), anyLong(), anyInt(), anyLong(), + any(), any(), anyInt(), any(), anyBoolean()); + } + @Test public void testWriteHandle() throws Exception { try (WriteHandle writer = result(newCreateLedgerOp() @@ -429,6 +492,91 @@ public void testLedgerEntriesIterable() throws Exception { } } + @Test + public void testBatchedReadUnconfirmedAsync() throws Exception { + long lId = createLedgerWithEntries(3); + try (ReadHandle reader = openReadHandle(lId)) { + long lac = reader.getLastAddConfirmed(); + assertEquals(2, lac); + + try (LedgerEntries entries = result(reader.batchReadUnconfirmedAsync(0, 5, 5 * 1024 * 1024))) { + AtomicLong i = new AtomicLong(0); + for (LedgerEntry e : entries) { + assertEquals(i.getAndIncrement(), e.getEntryId()); + assertArrayEquals(data, e.getEntryBytes()); + } + i.set(0); + entries.forEach((e) -> { + assertEquals(i.getAndIncrement(), e.getEntryId()); + assertArrayEquals(data, e.getEntryBytes()); + }); + } + } + } + + @Test + public void testBatchedReadUnconfirmedAsyncRejectsInvalidArguments() throws Exception { + long lId = createLedgerWithEntries(3); + try (ReadHandle reader = openReadHandle(lId)) { + assertThrows(BKIncorrectParameterException.class, + () -> result(reader.batchReadUnconfirmedAsync(-1, 1, 1024))); + assertThrows(BKIncorrectParameterException.class, + () -> result(reader.batchReadUnconfirmedAsync(0, -1, 1024))); + assertThrows(BKIncorrectParameterException.class, + () -> result(reader.batchReadUnconfirmedAsync(0, 1, -1))); + } + } + + @Test + public void testBatchedReadUnconfirmedAsyncPropagatesNoSuchEntryException() throws Exception { + long lId = createLedgerWithEntries(3); + try (ReadHandle reader = openReadHandle(lId)) { + assertThrows(BKNoSuchEntryException.class, + () -> result(reader.batchReadUnconfirmedAsync(5, 1, 1024))); + } + } + + @Test + public void testBatchedReadUnconfirmedAsyncFailsIfClientClosed() throws Exception { + long lId = createLedgerWithEntries(3); + try (ReadHandle reader = openReadHandle(lId)) { + closeBookkeeper(); + assertThrows(BKClientClosedException.class, + () -> result(reader.batchReadUnconfirmedAsync(0, 1, 1024))); + } + } + + @Test + public void testBatchedReadUnconfirmedAsyncFallsBackWhenBatchReadIsUnsupported() throws Exception { + long lId = createLedgerWithEntries(3); + failBatchReadRequests(BKException.Code.BookieHandleNotAvailableException); + + try (ReadHandle reader = openReadHandle(lId); + LedgerEntries entries = result(reader.batchReadUnconfirmedAsync(0, 3, 5 * 1024 * 1024))) { + AtomicLong i = new AtomicLong(0); + for (LedgerEntry e : entries) { + assertEquals(i.getAndIncrement(), e.getEntryId()); + assertArrayEquals(data, e.getEntryBytes()); + } + assertEquals(3, i.get()); + } + } + + @Test + public void testBatchedReadUnconfirmedAsyncTreatsZeroMaxCountAsUnlimited() throws Exception { + long lId = createLedgerWithEntries(3); + + try (ReadHandle reader = openReadHandle(lId); + LedgerEntries entries = result(reader.batchReadUnconfirmedAsync(0, 0, 5 * 1024 * 1024))) { + AtomicLong i = new AtomicLong(0); + for (LedgerEntry e : entries) { + assertEquals(i.getAndIncrement(), e.getEntryId()); + assertArrayEquals(data, e.getEntryBytes()); + } + assertEquals(3, i.get()); + } + } + @Test public void testBKExceptionCodeLogger() { assertEquals("OK: No problem", BKException.codeLogger(0).toString());