From f74fcd844d1605bd292ee4e28e56c519923c22a9 Mon Sep 17 00:00:00 2001 From: William Chong Date: Tue, 27 Jan 2026 12:53:09 +0400 Subject: [PATCH 1/2] readme --- README.md | 1 + src/main/java/io/kurrent/dbclient/AppendToStream.java | 1 + 2 files changed, 2 insertions(+) diff --git a/README.md b/README.md index 3f1f6909..b272af99 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ + diff --git a/src/main/java/io/kurrent/dbclient/AppendToStream.java b/src/main/java/io/kurrent/dbclient/AppendToStream.java index 4302420e..99fb7813 100644 --- a/src/main/java/io/kurrent/dbclient/AppendToStream.java +++ b/src/main/java/io/kurrent/dbclient/AppendToStream.java @@ -7,6 +7,7 @@ import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; From 4ca1758c2ebab57de5ddbc5e1c48f3bf01312ac1 Mon Sep 17 00:00:00 2001 From: William Chong Date: Tue, 3 Feb 2026 21:08:09 +0400 Subject: [PATCH 2/2] fix: missing noStream case to expected revision error handling Handle EXPECTED_NO_STREAM case in AppendToStream error response parsing. Previously this case fell through to the else branch, incorrectly returning a streamRevision state instead of noStream. --- README.md | 1 - .../io/kurrent/dbclient/AppendToStream.java | 2 + .../kurrent/dbclient/streams/AppendTests.java | 82 ++++++++++++++++--- 3 files changed, 71 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index b272af99..3f1f6909 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,3 @@ - diff --git a/src/main/java/io/kurrent/dbclient/AppendToStream.java b/src/main/java/io/kurrent/dbclient/AppendToStream.java index 99fb7813..e483a7b8 100644 --- a/src/main/java/io/kurrent/dbclient/AppendToStream.java +++ b/src/main/java/io/kurrent/dbclient/AppendToStream.java @@ -70,6 +70,8 @@ private CompletableFuture append(ManagedChannel channel, List Assertions.assertEquals(streamName, first.getStreamId()), - () -> Assertions.assertEquals(eventType, first.getEventType()), + () -> Assertions.assertEquals("TestEvent", first.getEventType()), () -> Assertions.assertEquals(eventId.toString(), first.getEventId().toString()), () -> Assertions.assertEquals(foo, mapper.readValue(first.getEventData(), Foo.class)), () -> Assertions.assertEquals(foo, mapper.readValue(first.getUserMetadata(), Foo.class)), @@ -49,4 +40,69 @@ default void testAppendSingleEventNoStream() throws Throwable { () -> Assertions.assertFalse(userMetadata.has(ClientTelemetryConstants.Metadata.SPAN_ID)) ); } + + @Test + default void testAppendMultipleEventsAtOnce() throws Throwable { + KurrentDBClient client = getDatabase().defaultClient(); + String streamName = generateName(); + int eventCount = 5; + + WriteResult result = client.appendToStream(streamName, + AppendToStreamOptions.get().streamState(StreamState.noStream()), + generateEvents(eventCount, "TestEvent").iterator()).get(); + + Assertions.assertEquals(StreamState.streamRevision(eventCount - 1), result.getNextExpectedRevision()); + Assertions.assertEquals(eventCount, client.readStream(streamName, ReadStreamOptions.get()).get().getEvents().size()); + } + + @Test + default void testStreamStateOptimisticConcurrency() throws Throwable { + KurrentDBClient client = getDatabase().defaultClient(); + + String anyStream = generateName(); + appendEvent(client, anyStream, StreamState.any()); + appendEvent(client, anyStream, StreamState.any()); + Assertions.assertEquals(2, client.readStream(anyStream, ReadStreamOptions.get()).get().getEvents().size()); + + String existsStream = generateName(); + assertWrongExpectedVersion(client, existsStream, StreamState.streamExists(), StreamState.streamExists(), StreamState.noStream()); + appendEvent(client, existsStream, StreamState.noStream()); + appendEvent(client, existsStream, StreamState.streamExists()); + + String noStream = generateName(); + appendEvent(client, noStream, StreamState.noStream()); + assertWrongExpectedVersion(client, noStream, StreamState.noStream(), StreamState.noStream(), StreamState.streamRevision(0)); + + String revStream = generateName(); + appendEvent(client, revStream, StreamState.noStream()); + appendEvent(client, revStream, StreamState.streamRevision(0)); + assertWrongExpectedVersion(client, revStream, StreamState.streamRevision(99), StreamState.streamRevision(99), StreamState.streamRevision(1)); + } + + default EventData createTestEvent() throws Exception { + return EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo())) + .eventId(UUID.randomUUID()) + .build(); + } + + default void appendEvent(KurrentDBClient client, String streamName, StreamState state) throws Exception { + client.appendToStream(streamName, AppendToStreamOptions.get().streamState(state), createTestEvent()).get(); + } + + default void assertWrongExpectedVersion(KurrentDBClient client, String streamName, StreamState state, StreamState expectedState, StreamState actualState) { + WrongExpectedVersionException ex = Assertions.assertThrows(WrongExpectedVersionException.class, () -> { + try { + appendEvent(client, streamName, state); + } catch (java.util.concurrent.ExecutionException e) { + if (e.getCause() != null) { + throw e.getCause(); + } + throw e; + } + }); + Assertions.assertEquals(streamName, ex.getStreamName()); + Assertions.assertEquals(expectedState, ex.getExpectedState()); + Assertions.assertEquals(actualState, ex.getActualState()); + } + }