Skip to content

Latest commit

 

History

History
287 lines (215 loc) · 9.35 KB

File metadata and controls

287 lines (215 loc) · 9.35 KB

Local Development Guide

Quick Start

Prerequisites

  • Docker & Docker Compose
  • Python 3.12+
  • uv (recommended) or pip

1. Start Emulators

docker compose up -d

This starts:

  • Firestore emulator on localhost:8080 (for Firestore storage mode)
  • Pub/Sub emulator on localhost:8085 (for distributed queue mode)

For GCS + BigQuery mode, you can run a GCS emulator:

docker run -d -p 9023:9023 --name gcs-emulator \
  fsouza/fake-gcs-server -scheme http
export STORAGE_EMULATOR_HOST=http://localhost:9023

See tests/integration/README.md for full emulator setup.

2. Install Dependencies

uv sync

3. Run the API Server

Option A: GCS + BigQuery Mode (production pattern with emulator)

export STORAGE_EMULATOR_HOST="http://localhost:9023"
export GCP_PROJECT_ID="test-project"
export GCP_GCS_BUCKET="test-events"
export GCP_BIGQUERY_DATASET="events"
export GCP_BIGQUERY_TABLE="raw_events"
export EVENTKIT_EVENT_STORE="gcs"
export EVENTKIT_WAREHOUSE_ENABLED="true"
export EVENTKIT_QUEUE_MODE="async"
export EVENTKIT_ASYNC_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"

uv run uvicorn eventkit.api.app:app --reload --port 8000

Option B: Firestore Mode (fast local development)

export FIRESTORE_EMULATOR_HOST="localhost:8080"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_EVENT_STORE="firestore"
export EVENTKIT_QUEUE_MODE="async"
export EVENTKIT_ASYNC_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"

uv run uvicorn eventkit.api.app:app --reload --port 8000

Option C: Pub/Sub Queue Mode (distributed workers)

export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_EVENT_STORE="firestore"
export EVENTKIT_QUEUE_MODE="pubsub"
export EVENTKIT_PUBSUB_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"

uv run uvicorn eventkit.api.app:app --reload --port 8000

Note: The ring buffer (Write-Ahead Log) is always enabled for durability. Events are persisted to SQLite before processing, ensuring no data loss even if the service crashes.

The API will be available at http://localhost:8000.

4. Test the API

Health Check:

curl http://localhost:8000/health
# {"status": "ok"}

Send an Event:

curl -X POST http://localhost:8000/collect \
  -H "Content-Type: application/json" \
  -d '{"type": "track", "event": "button_click", "userId": "user_123"}'
# {"status": "accepted"}

Running Tests

# Start emulators
docker compose up -d

# Run all tests (unit + integration)
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
uv run pytest --cov=src/eventkit

# Run only unit tests (fast, no emulator needed)
uv run pytest tests/unit/

# Run only integration tests
uv run pytest tests/integration/

# Stop emulators
docker compose down

Test Isolation

Docker emulators keep data in memory between test runs (while containers are running). This is useful for fast iteration, but means:

  • First test run: Clean slate ✅
  • Second test run (without restarting): Data from previous run persists ⚠️

To get a clean slate for integration tests:

# Option 1: Restart emulators (fast, keeps containers)
docker compose restart firestore-emulator pubsub-emulator

# Option 2: Full restart (slower, recreates containers)
docker compose down
docker compose up -d --wait

In CI: Each workflow run starts fresh containers, so tests are always isolated.


Configuration

See src/eventkit/config.py for all available settings.

Key Settings:

Variable Default Description
GCP_PROJECT_ID required GCP project ID
FIRESTORE_EMULATOR_HOST - Firestore emulator address (e.g., localhost:8080)
PUBSUB_EMULATOR_HOST - Pub/Sub emulator address (e.g., localhost:8085)
STORAGE_EMULATOR_HOST - GCS emulator address (e.g., http://localhost:9023)
Storage Mode
EVENTKIT_EVENT_STORE "gcs" Storage backend: gcs, firestore
GCP_GCS_BUCKET required for GCS GCS bucket name for event storage
GCP_BIGQUERY_DATASET required for GCS BigQuery dataset name
GCP_BIGQUERY_TABLE required for GCS BigQuery table name
EVENTKIT_WAREHOUSE_ENABLED true Enable background warehouse loader
EVENTKIT_WAREHOUSE_LOADER_INTERVAL 300.0 Seconds between loader polls (5 min)
Queue Mode
EVENTKIT_QUEUE_MODE "async" Queue mode: async, pubsub
EVENTKIT_ASYNC_WORKERS 4 Number of async workers (async mode)
EVENTKIT_PUBSUB_WORKERS 4 Number of Pub/Sub workers (pubsub mode)
Ring Buffer (Write-Ahead Log)
EVENTKIT_RING_BUFFER_MODE "sqlite" Ring buffer implementation (currently: sqlite)
EVENTKIT_RING_BUFFER_DB_PATH "eventkit_ring_buffer.db" Path to SQLite database file
EVENTKIT_RING_BUFFER_MAX_SIZE 100000 Max published events to keep (size-based cleanup)
EVENTKIT_RING_BUFFER_RETENTION_HOURS 24 Max age for published events (time-based cleanup)
EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE 100 Events per publisher batch
EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL 0.1 Seconds between ring buffer polls
EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL 3600.0 Seconds between cleanup runs (1 hour)
EventLoader
EVENTKIT_EVENTLOADER_BATCH_SIZE adaptive Events per batch (1000 for GCS, 100 for Firestore)
EVENTKIT_EVENTLOADER_FLUSH_INTERVAL adaptive Flush interval seconds (60 for GCS, 5 for Firestore)
EVENTKIT_BUFFER_SIZE 100 Events per partition before flush (deprecated)
EVENTKIT_BUFFER_MAX_SIZE 1000 Hard limit per partition
EVENTKIT_BUFFER_TIMEOUT 5.0 Max seconds before flush (deprecated)

Ring Buffer (Durability Layer)

The ring buffer provides Write-Ahead Log (WAL) durability:

  • Events are never lost - Written to SQLite before processing
  • Survives crashes - SQLite WAL mode ensures durability
  • Automatic cleanup - Old published events are removed based on time/size limits
  • Background publisher - Moves events from ring buffer to queue asynchronously

Architecture:

API → ring buffer (durable) → publisher → queue → workers → Firestore

Why SQLite?

  • Local durability (no network calls on hot path)
  • WAL mode for concurrent reads/writes
  • Zero dependencies (built into Python)
  • Production-proven (similar to Lytics' BoltDB approach)

Logging

eventkit uses structured logging via structlog for production observability.

Development (Pretty Logs)

By default, logs are colored and human-readable:

export EVENTKIT_LOG_LEVEL="DEBUG"  # or INFO (default), WARNING, ERROR
export EVENTKIT_JSON_LOGS="false"  # default

uv run uvicorn eventkit.api.app:app --reload --port 8000

Example output:

2026-01-12T10:30:45.123Z [info     ] request_received          method=POST path=/collect/users event_type=identify request_id=abc-123 stream=users
2026-01-12T10:30:45.125Z [debug    ] event_received            event_type=identify stream=users
2026-01-12T10:30:45.126Z [debug    ] event_adapted             event_type=identify
2026-01-12T10:30:45.127Z [debug    ] event_sequenced           partition=7
2026-01-12T10:30:45.128Z [info     ] request_completed         status_code=202 duration_ms=5.23 request_id=abc-123 stream=users

Production (JSON Logs)

For log aggregation systems (e.g., GCP Logging, Datadog):

export EVENTKIT_LOG_LEVEL="INFO"
export EVENTKIT_JSON_LOGS="true"

uv run uvicorn eventkit.api.app:app --port 8000

Example output (single-line JSON per log):

{"event": "request_received", "method": "POST", "path": "/collect/users", "event_type": "identify", "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.123Z", "logger": "eventkit.api.router"}
{"event": "request_completed", "status_code": 202, "duration_ms": 5.23, "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.128Z", "logger": "eventkit.api.router"}

Log Levels

Level When to Use Example Operations
DEBUG Development debugging Event adaptation details, sequencer routing
INFO Normal operations API requests, buffer flushes, store writes
WARNING Recoverable errors Validation failures, retries
ERROR Unrecoverable errors Store failures, ring buffer failures

What's Logged

Always Logged:

  • API requests: method, path, status_code, duration_ms
  • Events: stream, event_type (NOT full payload for PII safety)
  • EventLoader flushes: partition, event_count, duration_ms
  • Store writes: event_count, duration_ms

Never Logged:

  • Full event payloads (PII risk)
  • User identifiers in plaintext
  • Auth tokens or secrets

Context Propagation

All logs within a request include the same request_id and stream for tracing:

# Example: Trace a single event through the entire pipeline
cat logs.json | jq 'select(.request_id == "abc-123")'

Stopping Services

# Stop API server: Ctrl+C

# Stop emulators
docker compose down