- Docker & Docker Compose
- Python 3.12+
- uv (recommended) or pip
docker compose up -dThis starts:
- Firestore emulator on
localhost:8080(for Firestore storage mode) - Pub/Sub emulator on
localhost:8085(for distributed queue mode)
For GCS + BigQuery mode, you can run a GCS emulator:
docker run -d -p 9023:9023 --name gcs-emulator \
fsouza/fake-gcs-server -scheme http
export STORAGE_EMULATOR_HOST=http://localhost:9023See tests/integration/README.md for full emulator setup.
uv syncOption A: GCS + BigQuery Mode (production pattern with emulator)
export STORAGE_EMULATOR_HOST="http://localhost:9023"
export GCP_PROJECT_ID="test-project"
export GCP_GCS_BUCKET="test-events"
export GCP_BIGQUERY_DATASET="events"
export GCP_BIGQUERY_TABLE="raw_events"
export EVENTKIT_EVENT_STORE="gcs"
export EVENTKIT_WAREHOUSE_ENABLED="true"
export EVENTKIT_QUEUE_MODE="async"
export EVENTKIT_ASYNC_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"
uv run uvicorn eventkit.api.app:app --reload --port 8000Option B: Firestore Mode (fast local development)
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_EVENT_STORE="firestore"
export EVENTKIT_QUEUE_MODE="async"
export EVENTKIT_ASYNC_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"
uv run uvicorn eventkit.api.app:app --reload --port 8000Option C: Pub/Sub Queue Mode (distributed workers)
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_EVENT_STORE="firestore"
export EVENTKIT_QUEUE_MODE="pubsub"
export EVENTKIT_PUBSUB_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"
uv run uvicorn eventkit.api.app:app --reload --port 8000Note: The ring buffer (Write-Ahead Log) is always enabled for durability. Events are persisted to SQLite before processing, ensuring no data loss even if the service crashes.
The API will be available at http://localhost:8000.
Health Check:
curl http://localhost:8000/health
# {"status": "ok"}Send an Event:
curl -X POST http://localhost:8000/collect \
-H "Content-Type: application/json" \
-d '{"type": "track", "event": "button_click", "userId": "user_123"}'
# {"status": "accepted"}# Start emulators
docker compose up -d
# Run all tests (unit + integration)
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
uv run pytest --cov=src/eventkit
# Run only unit tests (fast, no emulator needed)
uv run pytest tests/unit/
# Run only integration tests
uv run pytest tests/integration/
# Stop emulators
docker compose downDocker emulators keep data in memory between test runs (while containers are running). This is useful for fast iteration, but means:
- First test run: Clean slate ✅
- Second test run (without restarting): Data from previous run persists
⚠️
To get a clean slate for integration tests:
# Option 1: Restart emulators (fast, keeps containers)
docker compose restart firestore-emulator pubsub-emulator
# Option 2: Full restart (slower, recreates containers)
docker compose down
docker compose up -d --waitIn CI: Each workflow run starts fresh containers, so tests are always isolated.
See src/eventkit/config.py for all available settings.
Key Settings:
| Variable | Default | Description |
|---|---|---|
GCP_PROJECT_ID |
required | GCP project ID |
FIRESTORE_EMULATOR_HOST |
- | Firestore emulator address (e.g., localhost:8080) |
PUBSUB_EMULATOR_HOST |
- | Pub/Sub emulator address (e.g., localhost:8085) |
STORAGE_EMULATOR_HOST |
- | GCS emulator address (e.g., http://localhost:9023) |
| Storage Mode | ||
EVENTKIT_EVENT_STORE |
"gcs" |
Storage backend: gcs, firestore |
GCP_GCS_BUCKET |
required for GCS | GCS bucket name for event storage |
GCP_BIGQUERY_DATASET |
required for GCS | BigQuery dataset name |
GCP_BIGQUERY_TABLE |
required for GCS | BigQuery table name |
EVENTKIT_WAREHOUSE_ENABLED |
true |
Enable background warehouse loader |
EVENTKIT_WAREHOUSE_LOADER_INTERVAL |
300.0 |
Seconds between loader polls (5 min) |
| Queue Mode | ||
EVENTKIT_QUEUE_MODE |
"async" |
Queue mode: async, pubsub |
EVENTKIT_ASYNC_WORKERS |
4 |
Number of async workers (async mode) |
EVENTKIT_PUBSUB_WORKERS |
4 |
Number of Pub/Sub workers (pubsub mode) |
| Ring Buffer (Write-Ahead Log) | ||
EVENTKIT_RING_BUFFER_MODE |
"sqlite" |
Ring buffer implementation (currently: sqlite) |
EVENTKIT_RING_BUFFER_DB_PATH |
"eventkit_ring_buffer.db" |
Path to SQLite database file |
EVENTKIT_RING_BUFFER_MAX_SIZE |
100000 |
Max published events to keep (size-based cleanup) |
EVENTKIT_RING_BUFFER_RETENTION_HOURS |
24 |
Max age for published events (time-based cleanup) |
EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE |
100 |
Events per publisher batch |
EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL |
0.1 |
Seconds between ring buffer polls |
EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL |
3600.0 |
Seconds between cleanup runs (1 hour) |
| EventLoader | ||
EVENTKIT_EVENTLOADER_BATCH_SIZE |
adaptive | Events per batch (1000 for GCS, 100 for Firestore) |
EVENTKIT_EVENTLOADER_FLUSH_INTERVAL |
adaptive | Flush interval seconds (60 for GCS, 5 for Firestore) |
EVENTKIT_BUFFER_SIZE |
100 |
Events per partition before flush (deprecated) |
EVENTKIT_BUFFER_MAX_SIZE |
1000 |
Hard limit per partition |
EVENTKIT_BUFFER_TIMEOUT |
5.0 |
Max seconds before flush (deprecated) |
The ring buffer provides Write-Ahead Log (WAL) durability:
- Events are never lost - Written to SQLite before processing
- Survives crashes - SQLite WAL mode ensures durability
- Automatic cleanup - Old published events are removed based on time/size limits
- Background publisher - Moves events from ring buffer to queue asynchronously
Architecture:
API → ring buffer (durable) → publisher → queue → workers → Firestore
Why SQLite?
- Local durability (no network calls on hot path)
- WAL mode for concurrent reads/writes
- Zero dependencies (built into Python)
- Production-proven (similar to Lytics' BoltDB approach)
eventkit uses structured logging via structlog for production observability.
By default, logs are colored and human-readable:
export EVENTKIT_LOG_LEVEL="DEBUG" # or INFO (default), WARNING, ERROR
export EVENTKIT_JSON_LOGS="false" # default
uv run uvicorn eventkit.api.app:app --reload --port 8000Example output:
2026-01-12T10:30:45.123Z [info ] request_received method=POST path=/collect/users event_type=identify request_id=abc-123 stream=users
2026-01-12T10:30:45.125Z [debug ] event_received event_type=identify stream=users
2026-01-12T10:30:45.126Z [debug ] event_adapted event_type=identify
2026-01-12T10:30:45.127Z [debug ] event_sequenced partition=7
2026-01-12T10:30:45.128Z [info ] request_completed status_code=202 duration_ms=5.23 request_id=abc-123 stream=users
For log aggregation systems (e.g., GCP Logging, Datadog):
export EVENTKIT_LOG_LEVEL="INFO"
export EVENTKIT_JSON_LOGS="true"
uv run uvicorn eventkit.api.app:app --port 8000Example output (single-line JSON per log):
{"event": "request_received", "method": "POST", "path": "/collect/users", "event_type": "identify", "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.123Z", "logger": "eventkit.api.router"}
{"event": "request_completed", "status_code": 202, "duration_ms": 5.23, "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.128Z", "logger": "eventkit.api.router"}| Level | When to Use | Example Operations |
|---|---|---|
| DEBUG | Development debugging | Event adaptation details, sequencer routing |
| INFO | Normal operations | API requests, buffer flushes, store writes |
| WARNING | Recoverable errors | Validation failures, retries |
| ERROR | Unrecoverable errors | Store failures, ring buffer failures |
Always Logged:
- API requests: method, path, status_code, duration_ms
- Events: stream, event_type (NOT full payload for PII safety)
- EventLoader flushes: partition, event_count, duration_ms
- Store writes: event_count, duration_ms
Never Logged:
- Full event payloads (PII risk)
- User identifiers in plaintext
- Auth tokens or secrets
All logs within a request include the same request_id and stream for tracing:
# Example: Trace a single event through the entire pipeline
cat logs.json | jq 'select(.request_id == "abc-123")'# Stop API server: Ctrl+C
# Stop emulators
docker compose down