Skip to content

♻️ Zero-Based Review: Lambda Aggregator #337

@sodre

Description

@sodre

Description

A comprehensive zero-base review of the zae_limiter_aggregator package — the Lambda function that processes DynamoDB Streams for usage snapshots, proactive bucket refill, and audit event archival.

The aggregator is a critical production component (1,131 LOC across 4 modules) that runs as an AWS Lambda triggered by DynamoDB Streams. It handles three responsibilities:

  1. Usage snapshot aggregation — extracts consumption deltas from bucket MODIFY events and writes hourly/daily snapshot records
  2. Proactive bucket refill — refills buckets when projected tokens are insufficient for observed consumption rates (Issue ⚡ Aggregator-assisted bucket refill for speculative writes #317)
  3. Audit event archival — archives TTL-deleted audit events to S3 as gzip-compressed JSONL with Hive-style partitioning

Modules Under Review

Module LOC Responsibility
handler.py 131 Lambda entry point, environment config, orchestration
processor.py 686 Stream parsing, delta extraction, snapshot writes, bucket refill
archiver.py 291 Audit event extraction, S3 archival with gzip JSONL
__init__.py 23 Re-exports

Test Coverage Under Review

Test File Tests
tests/unit/test_handler.py 5 tests — handler orchestration, archival toggle, error aggregation
tests/unit/test_processor.py ~45 tests — delta extraction, window keys, snapshots, refill, logging
tests/unit/test_archiver.py ~20 tests — audit extraction, deserialization, JSONL, S3 partitioning

Acceptance Criteria

Code Quality & Structure

  • All functions have docstrings with Args/Returns sections
  • No dead code or unused imports exist in any aggregator module
  • All TODO, FIXME, or HACK comments are resolved or tracked as issues
  • Type annotations are present on all function signatures and return types
  • Dataclass fields have type annotations (no Any where a concrete type is possible)

Error Handling & Resilience

  • All DynamoDB update_item calls have error handling that does not crash the batch
  • ConditionalCheckFailedException in try_refill_bucket is caught and logged at DEBUG (not WARNING or ERROR)
  • S3 put_object failure in archiver does not prevent snapshot processing
  • Extraction errors for individual records do not prevent processing of remaining records
  • No bare except Exception without exc_info=True in log calls

DynamoDB Patterns (ADR-111 Compliance)

Stream Record Parsing

  • _parse_bucket_record correctly discovers limits by scanning b_{name}_tc attributes
  • Records with missing entity_id, empty resource, or missing tc counters return None/empty list
  • Only MODIFY events are processed for deltas and refill; REMOVE events are processed for archival
  • Partial counters (only OldImage or only NewImage has tc) are skipped

Snapshot Aggregation

  • Millitokens are converted to tokens (integer division by 1000) before storage
  • GSI2 keys (GSI2PK, GSI2SK) are set for resource-level aggregation queries
  • TTL is set using if_not_exists (does not overwrite on subsequent ADD operations)
  • Window key functions produce correct ISO timestamps for hourly, daily, and monthly windows
  • get_window_end handles December year rollover and leap years

Proactive Bucket Refill

  • aggregate_bucket_states accumulates tc_delta across multiple events for the same bucket
  • aggregate_bucket_states uses the last event's NewImage fields (tk, cp, bx, ra, rp) and rf timestamp
  • Refill is skipped when refill_delta <= 0 (no elapsed time or already at capacity)
  • Refill is skipped when projected >= consumption_estimate (tokens sufficient for next window)
  • Negative tc_delta (refunds) does not trigger refill (max(0, info.tc_delta) guard)
  • Multiple limits needing refill produce a single UpdateItem call (not one per limit)
  • Optimistic lock on rf uses ConditionExpression: rf = :expected_rf

Audit Archival

  • Only REMOVE events with AUDIT# PK prefix are extracted
  • Records missing the action field are skipped with a WARNING log
  • DynamoDB wire format deserialization handles S, N, BOOL, NULL, M, and L types
  • JSONL uses compact separators ((",", ":")) for minimal S3 storage
  • S3 object key uses Hive-style partitioning (audit/year=YYYY/month=MM/day=DD)
  • Timestamp sanitization removes colons and plus signs from S3 filenames
  • Content-Type is application/x-ndjson and Content-Encoding is gzip

Handler Orchestration

  • Environment variables have sensible defaults (TABLE_NAME=rate-limits, SNAPSHOT_WINDOWS=hourly,daily, SNAPSHOT_TTL_DAYS=90)
  • Archival is skipped when ENABLE_ARCHIVAL=false OR ARCHIVE_BUCKET_NAME is empty
  • S3 client is only created when archival is enabled (no unnecessary boto3 calls)
  • Processing time is measured with time.perf_counter() and logged in milliseconds
  • Empty records list returns early with zero counts (no DynamoDB/S3 calls)
  • Errors from snapshot processing and archival are aggregated into a single response

Logging

  • StructuredLogger outputs valid JSON to stdout (CloudWatch Logs Insights compatible)
  • All log entries include timestamp, level, logger, and message fields
  • Error/warning logs include exc_info=True with exception field containing traceback
  • Batch processing logs start and completion with processing_time_ms metric
  • Debug logs for individual snapshot updates include entity_id, resource, limit_name, window

Test Quality

  • Unit tests use mocks (not moto) for DynamoDB and S3 calls
  • Test helpers (_make_record, _make_bucket_record, _make_audit_record) produce flat schema records (ADR-111)
  • Edge cases tested: empty records, zero delta, missing fields, partial counters, invalid data
  • Refill tests verify ADD vs SET in UpdateExpression, optimistic lock, and conditional check failure
  • Archival tests verify gzip decompression, JSONL parsing, S3 headers, and error recovery
  • No test relies on specific DynamoDB wire format that could drift from production schema

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions