| category | 🔄 Data Operations |
|---|---|
| version | v1.5.0 |
| status | ✅ |
| date | 2026-03-09 |
Event-Stream für Datenänderungen mit SSE/WebSocket-Streaming, Consumer Groups und Enterprise-Integration.
- 📋 Übersicht
- ✨ Features
- 🚀 Schnellstart
- 📖 Detaillierte Dokumentation
- 💡 Best Practices
- 🔧 Troubleshooting
- 📚 Siehe auch
- 📝 Changelog
ThemisDB's Change Data Capture (CDC) implementation provides a production-ready, append-only event log that tracks all data mutations (INSERT/UPDATE/DELETE) in the database. This enables real-time data synchronisation, audit trails, stream processing, and enterprise integration use cases.
Implemented Features:
- Sequence-basiertes Ordering (monoton steigende
sequence) - Automatische Erfassung für alle Schreiboperationen (INSERT/UPDATE/DELETE)
- Inkrementeller Abruf mit Checkpointing (
from_seq) - Filterung per
key_prefix, Collection und Operation-Typ - Ereignisse mit Timestamp,
before_snapshot/after_snapshotund freiemmetadata - SSE-Streaming (
GET /changefeed/stream) und WebSocket-Streaming (/v2/cdc/stream) - Consumer Groups mit dauerhaftem Offset-Tracking (
ConsumerGroupManager) - At-least-once Delivery mit Acknowledgement und Redelivery (
DeliveryTracker) - Dead-Letter Queue für Events, die alle Retry-Versuche erschöpft haben (
DeadLetterQueue) - Kafka-Produzent (Debezium-Envelope-Format, opt-in via
THEMIS_ENABLE_KAFKA) - Transactional Outbox Pattern für atomares CDC + Application-Publishing (
OutboxWriter,OutboxRelay) - Cross-Collection aggregierte Streams mit per-Collection Resume-Cursor
- CDC-gesteuerte inkrementelle Materialized Views (O(1) pro Änderung)
- GDPR-konforme PII-Redaktion; Change Stream Kompression
- Schema Registry Integration (Confluent Wire Format)
Primäre Quellen:
src/cdc/README.md·src/cdc/ARCHITECTURE.md·src/cdc/ROADMAP.md
ChangeEvent Structure:
{
"sequence": 42,
"type": "PUT",
"key": "user:alice",
"value": "{\"name\":\"Alice\",\"email\":\"alice@example.com\"}",
"timestamp_ms": 1730294567123,
"metadata": {
"table": "user",
"pk": "alice"
},
"before_snapshot": "{\"name\":\"Alice\",\"email\":\"alice@old.com\"}",
"after_snapshot": "{\"name\":\"Alice\",\"email\":\"alice@example.com\"}"
}Hinweise zu Feldern:
sequence: Monoton steigende ID (uint64)type:PUToderDELETE(TypenTRANSACTION_COMMIT/TRANSACTION_ROLLBACKsind definiert, werden aktuell aber nicht emittiert)key: Vollständiger Schlüssel, z. B.table:pkvalue: JSON-String bei PUT,nullbei DELETEtimestamp_ms: Millisekunden seit Epochmetadata: Freies JSON (z. B.table,pk)before_snapshot(optional): Zustand des Dokuments vor der Änderung; fehlt bei INSERT (neues Dokument, kein Vorzustand)after_snapshot(optional): Zustand des Dokuments nach der Änderung; fehlt bei DELETE
- Column Family: Default CF (or dedicated CF if configured)
- Key Format:
changefeed:{sequence_number}(zero-padded for lexicographic ordering) - Sequence Tracking: Atomic counter stored at key
changefeed_sequence
Retrieve change events with optional filtering and pagination.
Endpoint: GET /changefeed
Query Parameters:
from_seq(optional): Start after this sequence number (default: 0)limit(optional): Maximum events to return (default: 100)long_poll_ms(optional): Long-poll timeout in milliseconds (default: 0 = immediate)key_prefix(optional): Filter events by key prefix (e.g.,user:)
Request Example:
# Get all events
curl "http://localhost:8765/changefeed?from_seq=0&limit=20"
# Incremental query from checkpoint
curl "http://localhost:8765/changefeed?from_seq=42&limit=10"
# Filter by key prefix
curl "http://localhost:8765/changefeed?from_seq=0&limit=50&key_prefix=user:"
# Long-poll for new events
curl "http://localhost:8765/changefeed?from_seq=100&limit=10&long_poll_ms=5000"Response:
{
"events": [
{
"sequence": 1,
"type": "PUT",
"key": "user:alice",
"value": "{\"name\":\"Alice\",\"email\":\"alice@example.com\"}",
"timestamp_ms": 1730294567123,
"metadata": {"table": "user", "pk": "alice"},
"before_snapshot": "{\"name\":\"Alice\",\"email\":\"alice@old.com\"}",
"after_snapshot": "{\"name\":\"Alice\",\"email\":\"alice@example.com\"}"
},
{
"sequence": 2,
"type": "DELETE",
"key": "user:bob",
"value": null,
"timestamp_ms": 1730294568456,
"metadata": {"table": "user", "pk": "bob"},
"before_snapshot": "{\"name\":\"Bob\",\"email\":\"bob@example.com\"}"
}
],
"count": 2,
"latest_sequence": 42
}Antwortfelder:
events: Liste von ChangeEvent-Objektencount: Anzahl der zurückgegebenen Eventslatest_sequence: Aktuell letzter Sequence-Wert in der DB (für Checkpointing hilfreich)
Status: ✅ PASSED
- Created 4 entities via PUT → 4 CDC PUT events recorded
- Deleted 1 entity → 1 CDC DELETE event recorded
- Result: All mutations automatically tracked without manual intervention
Status: ✅ PASSED
- Full Query: Retrieved all events with
from_seq=0&limit=20 - Incremental Query: Retrieved only new events after checkpoint
- Key Prefix Filter: Successfully filtered events by
user:prefix - Pagination: Limit parameter correctly restricts result size
- Result: Query API fully functional with all parameters
Status: ✅ PASSED
- Event Types: PUT and DELETE events correctly distinguished
- Metadata: Table and PK correctly embedded in each event
- Timestamps: Millisecond precision timestamps present
- Value Handling: PUT events include value, DELETE events have
value: null - Result: Event structure matches specification
Status: ✅ PASSED
Scenario: Consumer reads batch → updates checkpoint → resumes from checkpoint
1. Consume events 1-5, checkpoint = 5
2. New events 6-7 arrive
3. Resume from checkpoint 5, receive events 6-7
Result: Sequential consumption with checkpointing works correctly
Stream database changes to external systems (analytics, search, caching):
let checkpoint = 0;
setInterval(async () => {
const res = await fetch(`/changefeed?from_seq=${checkpoint}&limit=100`);
const data = await res.json();
for (const event of data.events) {
await externalSystem.sync(event);
checkpoint = event.sequence;
}
}, 1000);Track all data mutations for compliance (GDPR, HIPAA):
-- Query all user data modifications
SELECT * FROM changefeed
WHERE key LIKE 'user:%' AND timestamp_ms > '2025-01-01';Maintain denormalized views automatically:
// Maintain user count by role
for (const event of events) {
if (event.key.startsWith('user:')) {
if (event.type === 'PUT') {
const user = JSON.parse(event.value);
roleCountMap[user.role] = (roleCountMap[user.role] || 0) + 1;
} else if (event.type === 'DELETE') {
// Decrement count
}
}
}Rebuild application state from event log:
// Rebuild state from beginning
const events = await fetch('/changefeed?from_seq=0&limit=1000');
const state = {};
for (const event of events.events) {
applyEvent(state, event);
}Aktueller Stand (Production):
- Lock-freier Ring-Buffer pro Tenant (MPSC Queue) mit konfigurierbarem High-Water-Mark
- Backpressure: bei Erreichen des High-Water-Mark werden älteste Events verdrängt + Gap-Marker eingefügt
- Batch-SSE-Frames: mehrere Events werden unter Last in einem Frame gebündelt
- Change Log Compaction: veraltete Log-Segmente werden kompaktiert und archiviert
- Konfiguration:
cdc.buffer.max_events_per_tenant(default 100 000),cdc.buffer.high_watermark_pct(default 80 %)
Für sehr hohe Last (> 100 000 Events/s) empfehlen sich:
- Kafka-Backend (
KafkaCDCProducer) für entkoppeltes Buffering - Dedizierte Column Family für das Change Log (konfigurierbar)
- Mehrere Consumer Groups zur Parallelisierung der Verarbeitung
Aktuell: Admin-Endpoint POST /changefeed/retention mit Body { "before_sequence": <uint64> } löscht Events mit kleinerer Sequence. Statistiken über GET /changefeed/stats.
Zukünftig möglich: TTL-/Zeit-basierte Retention und automatische Bereinigung.
Events, die nach Erschöpfung aller Wiederholungsversuche nicht zugestellt werden konnten, landen automatisch in der Dead-Letter Queue (DeadLetterQueue).
Die DLQ ist RocksDB-gestützt (Schlüsselpräfix dlq:) und teilt dieselbe Datenbankinstanz wie der Changefeed (kein Schlüsselkonflikt: changefeed:* vs. dlq:*).
Jeder DLQ-Eintrag (DLQEntry) enthält:
dlq_sequence— eindeutige Sequenz innerhalb der DLQevent— das ursprünglicheChangeEvent(vollständig erhalten)failure_reason— lesbarer Grund (letzter Fehlermeldung)attempt_count— Anzahl der unternommenen Zustellungsversucheenqueued_at_ms— Zeitstempel der Einreihung (ms seit Epoch)
// DLQ an ChangefeedBuffer hängen (nicht owned)
DeadLetterQueue dlq(db->getDB());
buffer.setDeadLetterQueue(&dlq);Ab diesem Moment werden Events, bei denen alle Retry-Versuche scheitern, automatisch in dlq eingereiht statt verworfen.
// Alle fehlgeschlagenen Events auflisten
for (const auto& entry : dlq.listEntries()) {
std::cout << "dlq_seq=" << entry.dlq_sequence
<< " key=" << entry.event.key
<< " reason=" << entry.failure_reason
<< " attempts=" << entry.attempt_count << "\n";
}
// Einzelnen Eintrag nach Ursachenbehebung erneut zustellen
Changefeed::ChangeEvent re_recorded = dlq.replay(dlq_seq, changefeed);
// replay() entfernt den Eintrag automatisch nach Erfolg
// Alle Einträge verwerfen
dlq.drain();- Events, die aufgrund von Payload-Dekompressionsfehlern verworfen werden, landen nicht in der DLQ (die Daten sind in diesem Fall nicht wiederherstellbar).
- Die DLQ teilt den RocksDB-Namespace mit dem Changefeed; eine separate Column Family ist optional über den
cf-Parameter konfigurierbar.
- At-least-once (nicht SSE): At-least-once Delivery ist via
ConsumerGroupManagerimplementiert; für reine SSE-Verbindungen (ohne Consumer Group) gilt kein At-least-once-Guarantee. - Outbox Relay: In-Flight-State des Outbox-Relay liegt im Memory; FAILED-Records überleben Restarts, PENDING-Records werden nach Restart erneut zugestellt (at-least-once Semantik).
- DLQ: Events, die wegen Payload-Dekompressionsfehlern verworfen werden, landen nicht in der DLQ (Daten sind nicht wiederherstellbar).
- Kafka:
KafkaCDCProducererfordert das Build-FlagTHEMIS_ENABLE_KAFKAund eine librdkafka-Installation. - Change log Retention: Retention-Policies sind zur Laufzeit nicht konfigurierbar (Admin-Endpoint
POST /changefeed/retentionerlaubt manuelles Trimming).
| Feature | Benefit | Cost |
|---|---|---|
| Append-only log | Simple, reliable, never loses events | Linear storage growth |
| Automatic tracking | Zero code changes, transparent | Cannot disable for specific entities |
| Sequence-based | Guaranteed order, easy checkpointing | Sequential bottleneck |
| Default CF storage | Simple deployment | No isolation from application data |
$checkpoint = 0
while ($true) {
$r = Invoke-RestMethod -Uri "http://localhost:8765/changefeed?from_seq=$checkpoint&limit=100"
foreach ($event in $r.events) {
Write-Host "[$($event.sequence)] $($event.type) $($event.key)"
# Process event
if ($event.type -eq "PUT") {
$data = $event.value | ConvertFrom-Json
# Sync to external system
}
$checkpoint = $event.sequence
}
Start-Sleep -Seconds 1
}import requests
import time
checkpoint = 0
while True:
r = requests.get(f'http://localhost:8765/changefeed?from_seq={checkpoint}&limit=100')
data = r.json()
for event in data['events']:
print(f"[{event['sequence']}] {event['type']} {event['key']}")
# Process event
if event['type'] == 'PUT':
value = json.loads(event['value'])
# Sync to external system
checkpoint = event['sequence']
time.sleep(1)let checkpoint = 0;
async function consumeChangefeed() {
while (true) {
const res = await fetch(
`http://localhost:8765/changefeed?from_seq=${checkpoint}&limit=100&long_poll_ms=5000`
);
const data = await res.json();
for (const event of data.events) {
console.log(`[${event.sequence}] ${event.type} ${event.key}`);
// Process event
if (event.type === 'PUT') {
const value = JSON.parse(event.value);
await processEvent(value);
}
checkpoint = event.sequence;
}
// Long-poll handles waiting, no need to sleep
}
}
consumeChangefeed();config/config.json:
{
"features": {
"cdc": true
},
"sse": {
"max_events_per_second": 0
}
}Bemerkungen:
features.cdc: Aktiviert Changefeed und SSE-Streaming-Endpunktesse.max_events_per_second: Server-seitiges Rate-Limit pro SSE-Verbindung (0 = unbegrenzt, empfohlen für Produktion: 0 oder 100–1000 je nach Last)
Check logs on server startup:
[INFO] Changefeed initialized using default CF
[INFO] SSE Connection Manager initialized
Query an endpoint to verify feature is enabled:
curl http://localhost:8765/changefeed?from_seq=0&limit=1
# Should return events, not 404Alle zuvor geplanten CDC-Features sind implementiert. Verbleibende Einschränkungen sind im
Abschnitt Limitations & Trade-offs beschrieben. Für den aktuellen Entwicklungsstand siehe
src/cdc/ROADMAP.md.
Das SSE-Streaming über GET /changefeed/stream ermöglicht nahezu Echtzeit-Konsum der CDC-Events mittels Server-Sent Events.
Parameter (Query):
from_seq(uint64, optional): Start nach dieser Sequence (exklusiv)key_prefix(string, optional): Filter nach Schlüsselpräfix (z. B.user:)keep_alive(bool, defaulttrue): Streaming mit Heartbeats; beifalseeinmalige Batch-Antwortmax_seconds(int, default30): Dauer des Streaming-Zyklus (Testzwecke, 1..60)heartbeat_ms(int, optional): Herzschlag-Intervall (nur Test/Override; min 100ms)retry_ms(int, default3000): SSE-„retry“-Hinweis für Client-Reconnectsmax_events(int, default100): Maximal pro Poll gelesene Events (Backpressure am Endpunkt)
Header (Client → Server):
Last-Event-ID: Startet das Streaming ab der zuletzt verarbeiteten ID; dank exklusivemfrom_seqwerden keine Duplikate erneut gesendet.
Format (Server → Client):
- Vorangestellter Hinweis:
retry: <ms> - Pro Event:
id: <sequence>data: <json>
- Heartbeats als Kommentar:
: heartbeat
Backpressure & Pufferung:
- Jeder Stream hat einen internen Puffer (Standard: 1000 Events). Bei Überlauf gilt Drop-Oldest-Policy (älteste Events werden verworfen).
- Pro-Poll-Kappung via
max_eventsbegrenzt die Eventabnahme je Schleifendurchlauf. - Metrik:
vccdb_sse_dropped_events_totalzählt verwarfene Events (Prometheus unter/metrics). - Optional (serverseitig):
max_events_per_secondRate-Limit pro Verbindung (0 = unbegrenzt).
Client-Reconnect (Beispiel, Browser):
let es;
function connect(fromSeq = 0) {
const headers = fromSeq ? { 'Last-Event-ID': String(fromSeq) } : {};
es = new EventSource('/changefeed/stream?keep_alive=true', { withCredentials: false });
let lastId = fromSeq;
es.onmessage = (ev) => {
const data = JSON.parse(ev.data);
lastId = parseInt(ev.lastEventId || lastId, 10);
handleEvent(data);
};
es.onerror = () => {
es.close();
setTimeout(() => connect(lastId), 3000); // Backoff siehe retry
};
}
connect();Proxy-Guidelines (Nginx):
location /changefeed/stream {
proxy_pass http://themis_backend;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off; # wichtig: keine Pufferung
proxy_cache off;
gzip off; # keine Transformation
chunked_transfer_encoding on; # Streaming erlauben
proxy_read_timeout 1h; # lange Timeouts für SSE
add_header Cache-Control "no-cache, no-transform";
}Proxy-Guidelines (HAProxy):
frontend fe
bind *:443
default_backend be
backend be
server s1 themis:8080 check
http-response add-header Cache-Control "no-cache, no-transform"
option http-keep-alive
timeout server 1h
timeout tunnel 1h
Empfohlene Defaults:
retry_ms=3000,heartbeat_ms≈15000,max_events=100- Proxy: Buffering aus,
no-transform, lange Read-Timeouts, keine Gzip-Kompression - Client: Last-Event-ID persistieren (z. B. lokal), Exponential Backoff bei Fehlern
Tests/Verifikation:
- Integrationstests prüfen: Last-Event-ID-Resume (exklusive Fortsetzung), Backpressure (Metrik > 0 unter Last), per-Poll-Kappung bei kurzer Laufzeit.
Zusammenfassung:
- Sequence-basiertes append-only Log
- Automatisches Tracking für PUT/DELETE
- GET /changefeed mit Filter/Pagination + Long-Poll
- SSE-Streaming (
/changefeed/stream) mit Keep-Alive/Heartbeats, Drop-Oldest-Backpressure, Retry/Resume über Last-Event-ID - Before/After-Snapshots: PUT-Events enthalten
before_snapshot(Dokument vor der Änderung, fehlt bei INSERT) undafter_snapshot(Dokument nach der Änderung); DELETE-Events enthaltenbefore_snapshot
Einsatz: Echtzeit-Sync, Audit-Trails, Event Sourcing – produktionsnah nutzbar; für sehr hohe Last ggf. Erweiterungen einplanen.