From 8d9fdeb8d3352278550728179cc376275bb7eb06 Mon Sep 17 00:00:00 2001 From: "David W. Dougherty" Date: Tue, 24 Mar 2026 11:59:29 -0700 Subject: [PATCH 1/2] DEV: add TCEs to XADD command page --- content/commands/xadd.md | 46 +++-- .../NRedisStack/CmdsStreamExample.cs | 100 +++++++++++ .../cmds_stream/go-redis/cmds_stream_test.go | 170 ++++++++++++++++++ .../cmds_stream/jedis/CmdsStreamExample.java | 106 +++++++++++ .../lettuce-async/CmdsStreamExample.java | 122 +++++++++++++ .../lettuce-reactive/CmdsStreamExample.java | 152 ++++++++++++++++ .../cmds_stream/node-redis/cmds-stream.js | 83 +++++++++ .../cmds_stream/predis/CmdsStreamTest.php | 86 +++++++++ .../cmds_stream/redis-py/cmds_stream.py | 65 +++++++ .../cmds_stream/rust-async/cmds_stream.rs | 148 +++++++++++++++ .../cmds_stream/rust-sync/cmds_stream.rs | 141 +++++++++++++++ 11 files changed, 1206 insertions(+), 13 deletions(-) create mode 100644 local_examples/cmds_stream/NRedisStack/CmdsStreamExample.cs create mode 100644 local_examples/cmds_stream/go-redis/cmds_stream_test.go create mode 100644 local_examples/cmds_stream/jedis/CmdsStreamExample.java create mode 100644 local_examples/cmds_stream/lettuce-async/CmdsStreamExample.java create mode 100644 local_examples/cmds_stream/lettuce-reactive/CmdsStreamExample.java create mode 100644 local_examples/cmds_stream/node-redis/cmds-stream.js create mode 100644 local_examples/cmds_stream/predis/CmdsStreamTest.php create mode 100644 local_examples/cmds_stream/redis-py/cmds_stream.py create mode 100644 local_examples/cmds_stream/rust-async/cmds_stream.rs create mode 100644 local_examples/cmds_stream/rust-sync/cmds_stream.rs diff --git a/content/commands/xadd.md b/content/commands/xadd.md index 8c8c783fc6..806348f14c 100644 --- a/content/commands/xadd.md +++ b/content/commands/xadd.md @@ -306,22 +306,42 @@ For more information about Redis streams, see the ## Examples -{{% redis-cli %}} -XADD mystream * name Sara surname OConnor -XADD mystream * field1 value1 field2 value2 field3 value3 -XLEN mystream -XRANGE mystream - + -{{% /redis-cli %}} +{{< clients-example set="cmds_stream" step="xadd1" description="Basic XADD: Add entries to a stream with auto-generated IDs, check stream th, and read entries" difficulty="beginner" >}} +> XADD mystream * name Sara surname OConnor +4378417975-0" +> XADD mystream * field1 value1 field2 value2 field3 value3 +4378417976-0" +> XLEN mystream +eger) 2 +> XRANGE mystream - + +1) 1) "1774378417975-0" + 2) 1) "name" + 2) "Sara" + 3) "surname" + 4) "OConnor" +2) 1) "1774378417976-0" + 2) 1) "field1" + 2) "value1" + 3) "field2" + 4) "value2" + 5) "field3" + 6) "value3" +{{< /clients-example >}} ### Idempotent message processing examples -{{% redis-cli %}} -XADD mystream IDMP producer1 msg1 * field value -XADD mystream IDMP producer1 msg1 * field different_value -XADD mystream IDMPAUTO producer2 * field value -XADD mystream IDMPAUTO producer2 * field value -XCFGSET mystream IDMP-DURATION 300 IDMP-MAXSIZE 1000 -{{% /redis-cli %}} +{{< clients-example set="cmds_stream" step="xadd2" description="Idempotent XADD (Redis 8.6+): Use IDMP and IDMPAUTO for at-most-once message delivery, preventing duplicate entries" difficulty="intermediate" >}} +> XADD mystream IDMP producer1 msg1 * field value +"1774378417976-0" +> XADD mystream IDMP producer1 msg1 * field different_value +"1774378417976-0" +> XADD mystream IDMPAUTO producer2 * field value +"1774378417977-0" +> XADD mystream IDMPAUTO producer2 * field value +"1774378417977-0" +> XCFGSET mystream IDMP-DURATION 300 IDMP-MAXSIZE 1000 +"OK" +{{< /clients-example >}} ## Redis Software and Redis Cloud compatibility diff --git a/local_examples/cmds_stream/NRedisStack/CmdsStreamExample.cs b/local_examples/cmds_stream/NRedisStack/CmdsStreamExample.cs new file mode 100644 index 0000000000..af779ac67f --- /dev/null +++ b/local_examples/cmds_stream/NRedisStack/CmdsStreamExample.cs @@ -0,0 +1,100 @@ +// EXAMPLE: cmds_stream + +using StackExchange.Redis; +// REMOVE_START +using NRedisStack.Tests; +using Xunit; +// REMOVE_END + +// REMOVE_START +namespace Doc; + +[Collection("DocsTests")] +// REMOVE_END +public class CmdsStreamExample +// REMOVE_START +: AbstractNRedisStackTest, IDisposable +// REMOVE_END +{ + // REMOVE_START + public CmdsStreamExample(EndpointsFixture fixture) : base(fixture) { } + + [Fact] + // REMOVE_END + public void Run() + { + //REMOVE_START + SkipIfTargetConnectionDoesNotExist(EndpointsFixture.Env.Standalone); + var _ = GetCleanDatabase(EndpointsFixture.Env.Standalone); + //REMOVE_END + var muxer = ConnectionMultiplexer.Connect("localhost:6379"); + var db = muxer.GetDatabase(); + + // REMOVE_START + db.KeyDelete("mystream"); + // REMOVE_END + + // STEP_START xadd1 + var res1 = db.StreamAdd("mystream", new NameValueEntry[] { + new NameValueEntry("name", "Sara"), + new NameValueEntry("surname", "OConnor") + }); + Console.WriteLine(res1); // >>> 1726055713866-0 + + var res2 = db.StreamAdd("mystream", new NameValueEntry[] { + new NameValueEntry("field1", "value1"), + new NameValueEntry("field2", "value2"), + new NameValueEntry("field3", "value3") + }); + Console.WriteLine(res2); // >>> 1726055713866-1 + + var res3 = db.StreamLength("mystream"); + Console.WriteLine(res3); // >>> 2 + + var res4 = db.StreamRange("mystream", "-", "+"); + foreach (var entry in res4) + { + Console.WriteLine($"{entry.Id} -> {string.Join(", ", entry.Values.Select(v => $"{v.Name}={v.Value}"))}"); + } + // >>> 1726055713866-0 -> name=Sara, surname=OConnor + // >>> 1726055713866-1 -> field1=value1, field2=value2, field3=value3 + // STEP_END + + // REMOVE_START + Assert.Equal(2, res3); + Assert.Equal(2, res4.Length); + db.KeyDelete("mystream"); + // REMOVE_END + + // STEP_START xadd2 + // Note: IDMP is a Redis 8.6 feature - using Execute for raw command access + var res5 = db.Execute("XADD", "mystream", "IDMP", "producer1", "msg1", "*", "field", "value"); + Console.WriteLine(res5); // >>> 1726055713867-0 + + // Attempting to add the same message again with IDMP returns the original entry ID + var res6 = db.Execute("XADD", "mystream", "IDMP", "producer1", "msg1", "*", "field", "different_value"); + Console.WriteLine(res6); // >>> 1726055713867-0 (same ID as res5, message was deduplicated) + + var res7 = db.Execute("XADD", "mystream", "IDMPAUTO", "producer2", "*", "field", "value"); + Console.WriteLine(res7); // >>> 1726055713867-1 + + // Auto-generated idempotent ID prevents duplicates for same producer+content + var res8 = db.Execute("XADD", "mystream", "IDMPAUTO", "producer2", "*", "field", "value"); + Console.WriteLine(res8); // >>> 1726055713867-1 (same ID as res7, duplicate detected) + + // Configure idempotent message processing settings + var res9 = db.Execute("XCFGSET", "mystream", "IDMP-DURATION", "300", "IDMP-MAXSIZE", "1000"); + Console.WriteLine(res9); // >>> OK + // STEP_END + + // REMOVE_START + Assert.NotNull(res5); + db.KeyDelete("mystream"); + // REMOVE_END + + // HIDE_START + muxer.Close(); + // HIDE_END + } +} + diff --git a/local_examples/cmds_stream/go-redis/cmds_stream_test.go b/local_examples/cmds_stream/go-redis/cmds_stream_test.go new file mode 100644 index 0000000000..f488a09b83 --- /dev/null +++ b/local_examples/cmds_stream/go-redis/cmds_stream_test.go @@ -0,0 +1,170 @@ +// EXAMPLE: cmds_stream +// HIDE_START +package example_commands_test + +import ( + "context" + "fmt" + + "github.com/redis/go-redis/v9" +) + +// HIDE_END + +func ExampleClient_xadd1() { + ctx := context.Background() + + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }) + + // REMOVE_START + rdb.Del(ctx, "mystream") + // REMOVE_END + + // STEP_START xadd1 + res1, err := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: "mystream", + Values: map[string]interface{}{ + "name": "Sara", + "surname": "OConnor", + }, + }).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res1 != "") // >>> true + + res2, err := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: "mystream", + Values: map[string]interface{}{ + "field1": "value1", + "field2": "value2", + "field3": "value3", + }, + }).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res2 != "") // >>> true + + res3, err := rdb.XLen(ctx, "mystream").Result() + + if err != nil { + panic(err) + } + + fmt.Println(res3) // >>> 2 + + res4, err := rdb.XRange(ctx, "mystream", "-", "+").Result() + + if err != nil { + panic(err) + } + + fmt.Println(len(res4)) // >>> 2 + // STEP_END + + // Output: + // true + // true + // 2 + // 2 +} + +func ExampleClient_xadd2() { + ctx := context.Background() + + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }) + + // REMOVE_START + rdb.Del(ctx, "mystream") + // REMOVE_END + + // STEP_START xadd2 + res5, err := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: "mystream", + Values: map[string]interface{}{"field": "value"}, + ProducerID: "producer1", + IdempotentID: "msg1", + }).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res5 != "") // >>> true + + // Attempting to add the same message again with IDMP returns the original entry ID + res6, err := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: "mystream", + Values: map[string]interface{}{"field": "different_value"}, + ProducerID: "producer1", + IdempotentID: "msg1", + }).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res5 == res6) // >>> true (same ID, message was deduplicated) + + res7, err := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: "mystream", + Values: map[string]interface{}{"field": "value"}, + ProducerID: "producer2", + IdempotentAuto: true, + }).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res7 != "") // >>> true + + // Auto-generated idempotent ID prevents duplicates for same producer+content + res8, err := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: "mystream", + Values: map[string]interface{}{"field": "value"}, + ProducerID: "producer2", + IdempotentAuto: true, + }).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res7 == res8) // >>> true (same ID, duplicate detected) + + // Configure idempotent message processing settings + res9, err := rdb.XCfgSet(ctx, &redis.XCfgSetArgs{ + Stream: "mystream", + Duration: 300, // 300 seconds + MaxSize: 1000, // 1000 idempotent IDs + }).Result() + + if err != nil { + panic(err) + } + + fmt.Println(res9) // >>> OK + // STEP_END + + // Output: + // true + // true + // true + // true + // OK +} + diff --git a/local_examples/cmds_stream/jedis/CmdsStreamExample.java b/local_examples/cmds_stream/jedis/CmdsStreamExample.java new file mode 100644 index 0000000000..5812bbdfab --- /dev/null +++ b/local_examples/cmds_stream/jedis/CmdsStreamExample.java @@ -0,0 +1,106 @@ +// EXAMPLE: cmds_stream +// REMOVE_START +package io.redis.examples; + +import org.junit.jupiter.api.Test; +// REMOVE_END + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +// HIDE_START +import redis.clients.jedis.JedisPooled; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.params.XAddParams; +import redis.clients.jedis.params.XCfgSetParams; +import redis.clients.jedis.resps.StreamEntry; +// HIDE_END + +import static org.junit.jupiter.api.Assertions.*; + +// HIDE_START +public class CmdsStreamExample { + + @Test + public void run() { + JedisPooled jedis = new JedisPooled("localhost", 6379); + + // REMOVE_START + jedis.del("mystream"); + // REMOVE_END +// HIDE_END + + // STEP_START xadd1 + Map entry1 = new HashMap<>(); + entry1.put("name", "Sara"); + entry1.put("surname", "OConnor"); + StreamEntryID res1 = jedis.xadd("mystream", entry1, XAddParams.xAddParams()); + System.out.println(res1); // >>> 1726055713866-0 + + Map entry2 = new HashMap<>(); + entry2.put("field1", "value1"); + entry2.put("field2", "value2"); + entry2.put("field3", "value3"); + StreamEntryID res2 = jedis.xadd("mystream", entry2, XAddParams.xAddParams()); + System.out.println(res2); // >>> 1726055713866-1 + + long res3 = jedis.xlen("mystream"); + System.out.println(res3); // >>> 2 + + List res4 = jedis.xrange("mystream", "-", "+"); + for (StreamEntry entry : res4) { + System.out.println(entry.getID() + " -> " + entry.getFields()); + } + // >>> 1726055713866-0 -> {name=Sara, surname=OConnor} + // >>> 1726055713866-1 -> {field1=value1, field2=value2, field3=value3} + // STEP_END + // REMOVE_START + assertEquals(2, res3); + assertEquals(2, res4.size()); + jedis.del("mystream"); + // REMOVE_END + + // STEP_START xadd2 + Map idmpEntry1 = new HashMap<>(); + idmpEntry1.put("field", "value"); + StreamEntryID res5 = jedis.xadd("mystream", idmpEntry1, + XAddParams.xAddParams().idmp("producer1", "msg1")); + System.out.println(res5); // >>> 1726055713867-0 + + // Attempting to add the same message again with IDMP returns the original entry ID + Map idmpEntry2 = new HashMap<>(); + idmpEntry2.put("field", "different_value"); + StreamEntryID res6 = jedis.xadd("mystream", idmpEntry2, + XAddParams.xAddParams().idmp("producer1", "msg1")); + System.out.println(res6); // >>> 1726055713867-0 (same ID as res5, message was deduplicated) + + Map idmpAutoEntry1 = new HashMap<>(); + idmpAutoEntry1.put("field", "value"); + StreamEntryID res7 = jedis.xadd("mystream", idmpAutoEntry1, + XAddParams.xAddParams().idmpAuto("producer2")); + System.out.println(res7); // >>> 1726055713867-1 + + // Auto-generated idempotent ID prevents duplicates for same producer+content + Map idmpAutoEntry2 = new HashMap<>(); + idmpAutoEntry2.put("field", "value"); + StreamEntryID res8 = jedis.xadd("mystream", idmpAutoEntry2, + XAddParams.xAddParams().idmpAuto("producer2")); + System.out.println(res8); // >>> 1726055713867-1 (same ID as res7, duplicate detected) + + // Configure idempotent message processing settings + String res9 = jedis.xcfgset("mystream", + XCfgSetParams.xCfgSetParams().idmpDuration(300).idmpMaxsize(1000)); + System.out.println(res9); // >>> OK + // STEP_END + // REMOVE_START + assertNotNull(res5); + jedis.del("mystream"); + // REMOVE_END + +// HIDE_START + jedis.close(); + } +} +// HIDE_END + diff --git a/local_examples/cmds_stream/lettuce-async/CmdsStreamExample.java b/local_examples/cmds_stream/lettuce-async/CmdsStreamExample.java new file mode 100644 index 0000000000..c4a72ddb6c --- /dev/null +++ b/local_examples/cmds_stream/lettuce-async/CmdsStreamExample.java @@ -0,0 +1,122 @@ +// EXAMPLE: cmds_stream +package io.redis.examples.async; + +import io.lettuce.core.*; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.api.StatefulRedisConnection; + +// REMOVE_START +import org.junit.jupiter.api.Test; +// REMOVE_END +import java.util.*; +import java.util.concurrent.CompletableFuture; +// REMOVE_START +import static org.assertj.core.api.Assertions.assertThat; +// REMOVE_END + +public class CmdsStreamExample { + + @Test + public void run() { + RedisClient redisClient = RedisClient.create("redis://localhost:6379"); + + try (StatefulRedisConnection connection = redisClient.connect()) { + RedisAsyncCommands asyncCommands = connection.async(); + + // REMOVE_START + CompletableFuture delResult = asyncCommands.del("mystream").toCompletableFuture(); + delResult.join(); + // REMOVE_END + + // STEP_START xadd1 + Map entry1 = new HashMap<>(); + entry1.put("name", "Sara"); + entry1.put("surname", "OConnor"); + + CompletableFuture xadd1Ops = asyncCommands.xadd("mystream", entry1) + .thenCompose(res1 -> { + System.out.println(res1); // >>> 1726055713866-0 + Map entry2 = new HashMap<>(); + entry2.put("field1", "value1"); + entry2.put("field2", "value2"); + entry2.put("field3", "value3"); + return asyncCommands.xadd("mystream", entry2); + }) + .thenCompose(res2 -> { + System.out.println(res2); // >>> 1726055713866-1 + return asyncCommands.xlen("mystream"); + }) + .thenCompose(res3 -> { + System.out.println(res3); // >>> 2 + // REMOVE_START + assertThat(res3).isEqualTo(2L); + // REMOVE_END + return asyncCommands.xrange("mystream", Range.unbounded()); + }) + .thenAccept(res4 -> { + for (var entry : res4) { + System.out.println(entry.getId() + " -> " + entry.getBody()); + } + // >>> 1726055713866-0 -> {name=Sara, surname=OConnor} + // >>> 1726055713866-1 -> {field1=value1, field2=value2, field3=value3} + // REMOVE_START + assertThat(res4).hasSize(2); + // REMOVE_END + }) + .toCompletableFuture(); + + xadd1Ops.join(); + // STEP_END + + // REMOVE_START + asyncCommands.del("mystream").toCompletableFuture().join(); + // REMOVE_END + + // STEP_START xadd2 + Map idmpEntry1 = new HashMap<>(); + idmpEntry1.put("field", "value"); + + CompletableFuture xadd2Ops = asyncCommands.xadd( + "mystream", + XAddArgs.Builder.idmp("producer1", "msg1"), + idmpEntry1) + .thenCompose(res5 -> { + System.out.println(res5); // >>> 1726055713867-0 + // Attempting to add same message again with IDMP returns original entry ID + Map idmpEntry2 = new HashMap<>(); + idmpEntry2.put("field", "different_value"); + return asyncCommands.xadd("mystream", + XAddArgs.Builder.idmp("producer1", "msg1"), idmpEntry2); + }) + .thenCompose(res6 -> { + System.out.println(res6); // >>> 1726055713867-0 (deduplicated) + Map idmpAutoEntry1 = new HashMap<>(); + idmpAutoEntry1.put("field", "value"); + return asyncCommands.xadd("mystream", + XAddArgs.Builder.idmpAuto("producer2"), idmpAutoEntry1); + }) + .thenCompose(res7 -> { + System.out.println(res7); // >>> 1726055713867-1 + Map idmpAutoEntry2 = new HashMap<>(); + idmpAutoEntry2.put("field", "value"); + return asyncCommands.xadd("mystream", + XAddArgs.Builder.idmpAuto("producer2"), idmpAutoEntry2); + }) + .thenAccept(res8 -> { + System.out.println(res8); // >>> 1726055713867-1 (duplicate detected) + }) + .toCompletableFuture(); + + xadd2Ops.join(); + // STEP_END + + // REMOVE_START + asyncCommands.del("mystream").toCompletableFuture().join(); + // REMOVE_END + + } finally { + redisClient.shutdown(); + } + } +} + diff --git a/local_examples/cmds_stream/lettuce-reactive/CmdsStreamExample.java b/local_examples/cmds_stream/lettuce-reactive/CmdsStreamExample.java new file mode 100644 index 0000000000..d2096e4926 --- /dev/null +++ b/local_examples/cmds_stream/lettuce-reactive/CmdsStreamExample.java @@ -0,0 +1,152 @@ +// EXAMPLE: cmds_stream +package io.redis.examples.reactive; + +import io.lettuce.core.*; +import io.lettuce.core.api.reactive.RedisReactiveCommands; +import io.lettuce.core.api.StatefulRedisConnection; + +// REMOVE_START +import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; +// REMOVE_END + +import reactor.core.publisher.Mono; + +import java.util.*; + +public class CmdsStreamExample { + + // REMOVE_START + @Test + // REMOVE_END + public void run() { + RedisClient redisClient = RedisClient.create("redis://localhost:6379"); + + try (StatefulRedisConnection connection = redisClient.connect()) { + RedisReactiveCommands reactiveCommands = connection.reactive(); + + // REMOVE_START + reactiveCommands.del("mystream").block(); + // REMOVE_END + + // STEP_START xadd1 + Map entry1 = new HashMap<>(); + entry1.put("name", "Sara"); + entry1.put("surname", "OConnor"); + + Mono addEntry1 = reactiveCommands.xadd("mystream", entry1) + .doOnNext(res1 -> { + System.out.println(res1); // >>> 1726055713866-0 + }); + + addEntry1.block(); + + Map entry2 = new HashMap<>(); + entry2.put("field1", "value1"); + entry2.put("field2", "value2"); + entry2.put("field3", "value3"); + + Mono addEntry2 = reactiveCommands.xadd("mystream", entry2) + .doOnNext(res2 -> { + System.out.println(res2); // >>> 1726055713866-1 + }); + + addEntry2.block(); + + Mono getLen = reactiveCommands.xlen("mystream") + .doOnNext(res3 -> { + System.out.println(res3); // >>> 2 + // REMOVE_START + assertThat(res3).isEqualTo(2L); + // REMOVE_END + }); + + getLen.block(); + + Mono>> getRange = reactiveCommands + .xrange("mystream", Range.unbounded()) + .collectList() + .doOnNext(res4 -> { + for (var entry : res4) { + System.out.println(entry.getId() + " -> " + entry.getBody()); + } + // >>> 1726055713866-0 -> {name=Sara, surname=OConnor} + // >>> 1726055713866-1 -> {field1=value1, field2=value2, field3=value3} + // REMOVE_START + assertThat(res4).hasSize(2); + // REMOVE_END + }); + + getRange.block(); + // STEP_END + + // REMOVE_START + reactiveCommands.del("mystream").block(); + // REMOVE_END + + // STEP_START xadd2 + Map idmpEntry1 = new HashMap<>(); + idmpEntry1.put("field", "value"); + + Mono addIdmp1 = reactiveCommands.xadd( + "mystream", + XAddArgs.Builder.idmp("producer1", "msg1"), + idmpEntry1) + .doOnNext(res5 -> { + System.out.println(res5); // >>> 1726055713867-0 + }); + + addIdmp1.block(); + + // Attempting to add the same message again with IDMP returns the original entry ID + Map idmpEntry2 = new HashMap<>(); + idmpEntry2.put("field", "different_value"); + + Mono addIdmp2 = reactiveCommands.xadd( + "mystream", + XAddArgs.Builder.idmp("producer1", "msg1"), + idmpEntry2) + .doOnNext(res6 -> { + System.out.println(res6); // >>> 1726055713867-0 (deduplicated) + }); + + addIdmp2.block(); + + Map idmpAutoEntry1 = new HashMap<>(); + idmpAutoEntry1.put("field", "value"); + + Mono addIdmpAuto1 = reactiveCommands.xadd( + "mystream", + XAddArgs.Builder.idmpAuto("producer2"), + idmpAutoEntry1) + .doOnNext(res7 -> { + System.out.println(res7); // >>> 1726055713867-1 + }); + + addIdmpAuto1.block(); + + // Auto-generated idempotent ID prevents duplicates for same producer+content + Map idmpAutoEntry2 = new HashMap<>(); + idmpAutoEntry2.put("field", "value"); + + Mono addIdmpAuto2 = reactiveCommands.xadd( + "mystream", + XAddArgs.Builder.idmpAuto("producer2"), + idmpAutoEntry2) + .doOnNext(res8 -> { + System.out.println(res8); // >>> 1726055713867-1 (duplicate detected) + }); + + addIdmpAuto2.block(); + // STEP_END + + // REMOVE_START + reactiveCommands.del("mystream").block(); + // REMOVE_END + + } finally { + redisClient.shutdown(); + } + } +} + diff --git a/local_examples/cmds_stream/node-redis/cmds-stream.js b/local_examples/cmds_stream/node-redis/cmds-stream.js new file mode 100644 index 0000000000..042b33f535 --- /dev/null +++ b/local_examples/cmds_stream/node-redis/cmds-stream.js @@ -0,0 +1,83 @@ +// EXAMPLE: cmds_stream +// HIDE_START +import assert from 'node:assert'; +import { createClient } from 'redis'; + +const client = createClient(); +await client.connect(); +// HIDE_END +// REMOVE_START +await client.del('mystream'); +// REMOVE_END + +// STEP_START xadd1 +const res1 = await client.xAdd('mystream', '*', { + 'name': 'Sara', + 'surname': 'OConnor' +}); +console.log(res1); // >>> 1726055713866-0 + +const res2 = await client.xAdd('mystream', '*', { + 'field1': 'value1', + 'field2': 'value2', + 'field3': 'value3' +}); +console.log(res2); // >>> 1726055713866-1 + +const res3 = await client.xLen('mystream'); +console.log(res3); // >>> 2 + +const res4 = await client.xRange('mystream', '-', '+'); +console.log(res4); +// >>> [ +// { id: '1726055713866-0', message: { name: 'Sara', surname: 'OConnor' } }, +// { id: '1726055713866-1', message: { field1: 'value1', field2: 'value2', field3: 'value3' } } +// ] +// STEP_END + +// REMOVE_START +assert.equal(res3, 2); +assert.equal(res4.length, 2); +await client.del('mystream'); +// REMOVE_END + +// STEP_START xadd2 +const res5 = await client.xAdd('mystream', '*', { 'field': 'value' }, { + IDMP: { pid: 'producer1', iid: 'msg1' } +}); +console.log(res5); // >>> 1726055713867-0 + +// Attempting to add the same message again with IDMP returns the original entry ID +const res6 = await client.xAdd('mystream', '*', { 'field': 'different_value' }, { + IDMP: { pid: 'producer1', iid: 'msg1' } +}); +console.log(res6); // >>> 1726055713867-0 (same ID as res5, message was deduplicated) + +const res7 = await client.xAdd('mystream', '*', { 'field': 'value' }, { + IDMPAUTO: { pid: 'producer2' } +}); +console.log(res7); // >>> 1726055713867-1 + +// Auto-generated idempotent ID prevents duplicates for same producer+content +const res8 = await client.xAdd('mystream', '*', { 'field': 'value' }, { + IDMPAUTO: { pid: 'producer2' } +}); +console.log(res8); // >>> 1726055713867-1 (same ID as res7, duplicate detected) + +// Configure idempotent message processing settings +const res9 = await client.xCfgSet('mystream', { + IDMP_DURATION: 300, + IDMP_MAXSIZE: 1000 +}); +console.log(res9); // >>> OK +// STEP_END + +// REMOVE_START +assert.ok(res5); +await client.del('mystream'); +// REMOVE_END + +// HIDE_START +await client.quit(); +// HIDE_END + diff --git a/local_examples/cmds_stream/predis/CmdsStreamTest.php b/local_examples/cmds_stream/predis/CmdsStreamTest.php new file mode 100644 index 0000000000..ea625dd8d1 --- /dev/null +++ b/local_examples/cmds_stream/predis/CmdsStreamTest.php @@ -0,0 +1,86 @@ +// EXAMPLE: cmds_stream + 'tcp', + 'host' => '127.0.0.1', + 'port' => 6379, + ]); + // REMOVE_START + $redis->del('mystream'); + // REMOVE_END + + // STEP_START xadd1 + $res1 = $redis->xadd('mystream', ['name' => 'Sara', 'surname' => 'OConnor']); + echo $res1 . PHP_EOL; // >>> 1726055713866-0 + + $res2 = $redis->xadd('mystream', ['field1' => 'value1', 'field2' => 'value2', 'field3' => 'value3']); + echo $res2 . PHP_EOL; // >>> 1726055713866-1 + + $res3 = $redis->xlen('mystream'); + echo $res3 . PHP_EOL; // >>> 2 + + $res4 = $redis->xrange('mystream', '-', '+'); + foreach ($res4 as $id => $fields) { + echo $id . ' -> ' . json_encode($fields) . PHP_EOL; + } + // >>> 1726055713866-0 -> {"name":"Sara","surname":"OConnor"} + // >>> 1726055713866-1 -> {"field1":"value1","field2":"value2","field3":"value3"} + // STEP_END + + // REMOVE_START + $this->assertEquals(2, $res3); + $this->assertCount(2, $res4); + $redis->del('mystream'); + // REMOVE_END + } + + public function testXadd2(): void + { + $redis = new PredisClient([ + 'scheme' => 'tcp', + 'host' => '127.0.0.1', + 'port' => 6379, + ]); + // REMOVE_START + $redis->del('mystream'); + // REMOVE_END + + // STEP_START xadd2 + $res5 = $redis->xadd('mystream', ['field' => 'value'], '*', ['idmp' => ['producer1', 'msg1']]); + echo $res5 . PHP_EOL; // >>> 1726055713867-0 + + // Attempting to add the same message again with IDMP returns the original entry ID + $res6 = $redis->xadd('mystream', ['field' => 'different_value'], '*', ['idmp' => ['producer1', 'msg1']]); + echo $res6 . PHP_EOL; // >>> 1726055713867-0 (same ID as res5, message was deduplicated) + + $res7 = $redis->xadd('mystream', ['field' => 'value'], '*', ['idmpauto' => 'producer2']); + echo $res7 . PHP_EOL; // >>> 1726055713867-1 + + // Auto-generated idempotent ID prevents duplicates for same producer+content + $res8 = $redis->xadd('mystream', ['field' => 'value'], '*', ['idmpauto' => 'producer2']); + echo $res8 . PHP_EOL; // >>> 1726055713867-1 (same ID as res7, duplicate detected) + + // Configure idempotent message processing settings + $res9 = $redis->xcfgset('mystream', 300, 1000); + echo $res9 . PHP_EOL; // >>> OK + // STEP_END + + // REMOVE_START + $this->assertNotNull($res5); + $redis->del('mystream'); + // REMOVE_END + } +} + diff --git a/local_examples/cmds_stream/redis-py/cmds_stream.py b/local_examples/cmds_stream/redis-py/cmds_stream.py new file mode 100644 index 0000000000..5b5b0bcf0a --- /dev/null +++ b/local_examples/cmds_stream/redis-py/cmds_stream.py @@ -0,0 +1,65 @@ +# EXAMPLE: cmds_stream +# HIDE_START +""" +Code samples for XADD command: + https://redis.io/docs/latest/commands/xadd/ +""" + +import redis + +r = redis.Redis(decode_responses=True) +# HIDE_END +# REMOVE_START +r.delete("mystream") +# REMOVE_END + +# STEP_START xadd1 +res1 = r.xadd("mystream", {"name": "Sara", "surname": "OConnor"}) +print(res1) # >>> 1726055713866-0 + +res2 = r.xadd("mystream", {"field1": "value1", "field2": "value2", "field3": "value3"}) +print(res2) # >>> 1726055713866-1 + +res3 = r.xlen("mystream") +print(res3) # >>> 2 + +res4 = r.xrange("mystream", "-", "+") +print(res4) +# >>> [ +# ('1726055713866-0', {'name': 'Sara', 'surname': 'OConnor'}), +# ('1726055713866-1', {'field1': 'value1', 'field2': 'value2', 'field3': 'value3'}) +# ] +# STEP_END + +# REMOVE_START +assert res3 == 2 +assert len(res4) == 2 +r.delete("mystream") +# REMOVE_END + +# STEP_START xadd2 +res5 = r.xadd("mystream", {"field": "value"}, idmp=("producer1", b"msg1")) +print(res5) # >>> 1726055713867-0 + +# Attempting to add the same message again with IDMP returns the original entry ID +res6 = r.xadd("mystream", {"field": "different_value"}, idmp=("producer1", b"msg1")) +print(res6) # >>> 1726055713867-0 (same ID as res5, message was deduplicated) + +res7 = r.xadd("mystream", {"field": "value"}, idmpauto="producer2") +print(res7) # >>> 1726055713867-1 + +# Auto-generated idempotent ID prevents duplicates for same producer+content +res8 = r.xadd("mystream", {"field": "value"}, idmpauto="producer2") +print(res8) # >>> 1726055713867-1 (same ID as res7, duplicate detected) + +# Configure idempotent message processing settings +res9 = r.xcfgset("mystream", idmp_duration=300, idmp_maxsize=1000) +print(res9) # >>> True +# STEP_END + +# REMOVE_START +# Note: IDMP is a Redis 8.6 feature - assertions may need adjustment based on server version +assert res5 is not None +r.delete("mystream") +# REMOVE_END + diff --git a/local_examples/cmds_stream/rust-async/cmds_stream.rs b/local_examples/cmds_stream/rust-async/cmds_stream.rs new file mode 100644 index 0000000000..851b823c9c --- /dev/null +++ b/local_examples/cmds_stream/rust-async/cmds_stream.rs @@ -0,0 +1,148 @@ +// EXAMPLE: cmds_stream + +// HIDE_START +use redis::AsyncCommands; +// HIDE_END + +// REMOVE_START +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_xadd1() { +// REMOVE_END + // STEP_START xadd1 + let client = redis::Client::open("redis://127.0.0.1/").unwrap(); + let mut con = client.get_multiplexed_async_connection().await.unwrap(); + + // REMOVE_START + let _: () = redis::cmd("DEL").arg("mystream").query_async(&mut con).await.unwrap(); + // REMOVE_END + + let res1: String = redis::cmd("XADD") + .arg("mystream") + .arg("*") + .arg("name") + .arg("Sara") + .arg("surname") + .arg("OConnor") + .query_async(&mut con) + .await + .unwrap(); + println!("XADD result: {}", res1); // >>> 1726055713866-0 + + let res2: String = redis::cmd("XADD") + .arg("mystream") + .arg("*") + .arg("field1") + .arg("value1") + .arg("field2") + .arg("value2") + .arg("field3") + .arg("value3") + .query_async(&mut con) + .await + .unwrap(); + println!("XADD result: {}", res2); // >>> 1726055713866-1 + + let res3: i64 = con.xlen("mystream").await.unwrap(); + println!("XLEN result: {}", res3); // >>> 2 + + let res4: redis::streams::StreamRangeReply = con.xrange_all("mystream").await.unwrap(); + for entry in &res4.ids { + println!("{} -> {:?}", entry.id, entry.map); + } + // >>> 1726055713866-0 -> {"name": "Sara", "surname": "OConnor"} + // >>> 1726055713866-1 -> {"field1": "value1", "field2": "value2", "field3": "value3"} + // STEP_END + + // REMOVE_START + assert_eq!(res3, 2); + assert_eq!(res4.ids.len(), 2); + let _: () = redis::cmd("DEL").arg("mystream").query_async(&mut con).await.unwrap(); + } + + #[tokio::test] + async fn test_xadd2() { + let client = redis::Client::open("redis://127.0.0.1/").unwrap(); + let mut con = client.get_multiplexed_async_connection().await.unwrap(); + let _: () = redis::cmd("DEL").arg("idmpstream").query_async(&mut con).await.unwrap(); + // REMOVE_END + + // STEP_START xadd2 + // Note: IDMP is a Redis 8.6 feature - using raw commands + let res5: String = redis::cmd("XADD") + .arg("idmpstream") + .arg("IDMP") + .arg("producer1") + .arg("msg1") + .arg("*") + .arg("field") + .arg("value") + .query_async(&mut con) + .await + .unwrap(); + println!("XADD IDMP result: {}", res5); // >>> 1726055713867-0 + + // Attempting to add the same message again with IDMP returns the original entry ID + let res6: String = redis::cmd("XADD") + .arg("idmpstream") + .arg("IDMP") + .arg("producer1") + .arg("msg1") + .arg("*") + .arg("field") + .arg("different_value") + .query_async(&mut con) + .await + .unwrap(); + println!("XADD IDMP result: {}", res6); // >>> 1726055713867-0 (deduplicated) + + let res7: String = redis::cmd("XADD") + .arg("idmpstream") + .arg("IDMPAUTO") + .arg("producer2") + .arg("*") + .arg("field") + .arg("value") + .query_async(&mut con) + .await + .unwrap(); + println!("XADD IDMPAUTO result: {}", res7); // >>> 1726055713867-1 + + // Auto-generated idempotent ID prevents duplicates for same producer+content + let res8: String = redis::cmd("XADD") + .arg("idmpstream") + .arg("IDMPAUTO") + .arg("producer2") + .arg("*") + .arg("field") + .arg("value") + .query_async(&mut con) + .await + .unwrap(); + println!("XADD IDMPAUTO result: {}", res8); // >>> 1726055713867-1 (duplicate detected) + + // Configure idempotent message processing settings + let res9: String = redis::cmd("XCFGSET") + .arg("idmpstream") + .arg("IDMP-DURATION") + .arg("300") + .arg("IDMP-MAXSIZE") + .arg("1000") + .query_async(&mut con) + .await + .unwrap(); + println!("XCFGSET result: {}", res9); // >>> OK + // STEP_END + + // REMOVE_START + assert!(!res5.is_empty()); + let _: () = redis::cmd("DEL").arg("idmpstream").query_async(&mut con).await.unwrap(); +// REMOVE_END +// REMOVE_START + } +} +// REMOVE_END + diff --git a/local_examples/cmds_stream/rust-sync/cmds_stream.rs b/local_examples/cmds_stream/rust-sync/cmds_stream.rs new file mode 100644 index 0000000000..662c25aa96 --- /dev/null +++ b/local_examples/cmds_stream/rust-sync/cmds_stream.rs @@ -0,0 +1,141 @@ +// EXAMPLE: cmds_stream + +// HIDE_START +use redis::Commands; +// HIDE_END + +// REMOVE_START +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_xadd1() { +// REMOVE_END + // STEP_START xadd1 + let client = redis::Client::open("redis://127.0.0.1/").unwrap(); + let mut con = client.get_connection().unwrap(); + + // REMOVE_START + let _: () = redis::cmd("DEL").arg("mystream").query(&mut con).unwrap(); + // REMOVE_END + + let res1: String = redis::cmd("XADD") + .arg("mystream") + .arg("*") + .arg("name") + .arg("Sara") + .arg("surname") + .arg("OConnor") + .query(&mut con) + .unwrap(); + println!("XADD result: {}", res1); // >>> 1726055713866-0 + + let res2: String = redis::cmd("XADD") + .arg("mystream") + .arg("*") + .arg("field1") + .arg("value1") + .arg("field2") + .arg("value2") + .arg("field3") + .arg("value3") + .query(&mut con) + .unwrap(); + println!("XADD result: {}", res2); // >>> 1726055713866-1 + + let res3: i64 = con.xlen("mystream").unwrap(); + println!("XLEN result: {}", res3); // >>> 2 + + let res4: redis::streams::StreamRangeReply = con.xrange_all("mystream").unwrap(); + for entry in &res4.ids { + println!("{} -> {:?}", entry.id, entry.map); + } + // >>> 1726055713866-0 -> {"name": "Sara", "surname": "OConnor"} + // >>> 1726055713866-1 -> {"field1": "value1", "field2": "value2", "field3": "value3"} + // STEP_END + + // REMOVE_START + assert_eq!(res3, 2); + assert_eq!(res4.ids.len(), 2); + let _: () = redis::cmd("DEL").arg("mystream").query(&mut con).unwrap(); + } + + #[test] + fn test_xadd2() { + let client = redis::Client::open("redis://127.0.0.1/").unwrap(); + let mut con = client.get_connection().unwrap(); + let _: () = redis::cmd("DEL").arg("idmpstream").query(&mut con).unwrap(); + // REMOVE_END + + // STEP_START xadd2 + // Note: IDMP is a Redis 8.6 feature - using raw commands + let res5: String = redis::cmd("XADD") + .arg("idmpstream") + .arg("IDMP") + .arg("producer1") + .arg("msg1") + .arg("*") + .arg("field") + .arg("value") + .query(&mut con) + .unwrap(); + println!("XADD IDMP result: {}", res5); // >>> 1726055713867-0 + + // Attempting to add the same message again with IDMP returns the original entry ID + let res6: String = redis::cmd("XADD") + .arg("idmpstream") + .arg("IDMP") + .arg("producer1") + .arg("msg1") + .arg("*") + .arg("field") + .arg("different_value") + .query(&mut con) + .unwrap(); + println!("XADD IDMP result: {}", res6); // >>> 1726055713867-0 (deduplicated) + + let res7: String = redis::cmd("XADD") + .arg("idmpstream") + .arg("IDMPAUTO") + .arg("producer2") + .arg("*") + .arg("field") + .arg("value") + .query(&mut con) + .unwrap(); + println!("XADD IDMPAUTO result: {}", res7); // >>> 1726055713867-1 + + // Auto-generated idempotent ID prevents duplicates for same producer+content + let res8: String = redis::cmd("XADD") + .arg("idmpstream") + .arg("IDMPAUTO") + .arg("producer2") + .arg("*") + .arg("field") + .arg("value") + .query(&mut con) + .unwrap(); + println!("XADD IDMPAUTO result: {}", res8); // >>> 1726055713867-1 (duplicate detected) + + // Configure idempotent message processing settings + let res9: String = redis::cmd("XCFGSET") + .arg("idmpstream") + .arg("IDMP-DURATION") + .arg("300") + .arg("IDMP-MAXSIZE") + .arg("1000") + .query(&mut con) + .unwrap(); + println!("XCFGSET result: {}", res9); // >>> OK + // STEP_END + + // REMOVE_START + assert!(!res5.is_empty()); + let _: () = redis::cmd("DEL").arg("idmpstream").query(&mut con).unwrap(); +// REMOVE_END +// REMOVE_START + } +} +// REMOVE_END + From 21b1846463471c4f4f878102280cec889ec46187 Mon Sep 17 00:00:00 2001 From: David Dougherty Date: Thu, 26 Mar 2026 12:47:39 -0700 Subject: [PATCH 2/2] Update example description in xadd.md Corrected the description in the clients-example for clarity. --- content/commands/xadd.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/content/commands/xadd.md b/content/commands/xadd.md index 806348f14c..607fb725d9 100644 --- a/content/commands/xadd.md +++ b/content/commands/xadd.md @@ -306,7 +306,7 @@ For more information about Redis streams, see the ## Examples -{{< clients-example set="cmds_stream" step="xadd1" description="Basic XADD: Add entries to a stream with auto-generated IDs, check stream th, and read entries" difficulty="beginner" >}} +{{< clients-example set="cmds_stream" step="xadd1" description="Basic XADD: Add entries to a stream with auto-generated IDs, check stream the stream size, and read entries" difficulty="beginner" >}} > XADD mystream * name Sara surname OConnor 4378417975-0" > XADD mystream * field1 value1 field2 value2 field3 value3