You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
A comprehensive zero-base review of the zae_limiter_aggregator package — the Lambda function that processes DynamoDB Streams for usage snapshots, proactive bucket refill, and audit event archival.
The aggregator is a critical production component (1,131 LOC across 4 modules) that runs as an AWS Lambda triggered by DynamoDB Streams. It handles three responsibilities:
Usage snapshot aggregation — extracts consumption deltas from bucket MODIFY events and writes hourly/daily snapshot records
Description
A comprehensive zero-base review of the
zae_limiter_aggregatorpackage — the Lambda function that processes DynamoDB Streams for usage snapshots, proactive bucket refill, and audit event archival.The aggregator is a critical production component (1,131 LOC across 4 modules) that runs as an AWS Lambda triggered by DynamoDB Streams. It handles three responsibilities:
Modules Under Review
handler.pyprocessor.pyarchiver.py__init__.pyTest Coverage Under Review
tests/unit/test_handler.pytests/unit/test_processor.pytests/unit/test_archiver.pyAcceptance Criteria
Code Quality & Structure
TODO,FIXME, orHACKcomments are resolved or tracked as issuesAnywhere a concrete type is possible)Error Handling & Resilience
update_itemcalls have error handling that does not crash the batchConditionalCheckFailedExceptionintry_refill_bucketis caught and logged at DEBUG (not WARNING or ERROR)put_objectfailure in archiver does not prevent snapshot processingexcept Exceptionwithoutexc_info=Truein log callsDynamoDB Patterns (ADR-111 Compliance)
data.Mpaths)resource,window,window_start,ttl) useExpressionAttributeNamesaliasesupdate_snapshotuses SET + ADD without overlapping document paths (Issue 🐛 fix(aggregator): DynamoDB UpdateExpression path overlap in update_snapshot() #168)UpdateExpressionuses ADD for tokens (commutative with speculative writes) and SET forrf(optimistic lock)Stream Record Parsing
_parse_bucket_recordcorrectly discovers limits by scanningb_{name}_tcattributesentity_id, empty resource, or missingtccounters returnNone/empty listtc) are skippedSnapshot Aggregation
if_not_exists(does not overwrite on subsequent ADD operations)get_window_endhandles December year rollover and leap yearsProactive Bucket Refill
aggregate_bucket_statesaccumulatestc_deltaacross multiple events for the same bucketaggregate_bucket_statesuses the last event's NewImage fields (tk, cp, bx, ra, rp) and rf timestamprefill_delta <= 0(no elapsed time or already at capacity)projected >= consumption_estimate(tokens sufficient for next window)tc_delta(refunds) does not trigger refill (max(0, info.tc_delta)guard)UpdateItemcall (not one per limit)rfusesConditionExpression: rf = :expected_rfAudit Archival
AUDIT#PK prefix are extractedactionfield are skipped with a WARNING log(",", ":")) for minimal S3 storageaudit/year=YYYY/month=MM/day=DD)application/x-ndjsonand Content-Encoding isgzipHandler Orchestration
TABLE_NAME=rate-limits,SNAPSHOT_WINDOWS=hourly,daily,SNAPSHOT_TTL_DAYS=90)ENABLE_ARCHIVAL=falseORARCHIVE_BUCKET_NAMEis emptytime.perf_counter()and logged in millisecondsLogging
StructuredLoggeroutputs valid JSON to stdout (CloudWatch Logs Insights compatible)timestamp,level,logger, andmessagefieldsexc_info=Truewithexceptionfield containing tracebackprocessing_time_msmetricentity_id,resource,limit_name,windowTest Quality
_make_record,_make_bucket_record,_make_audit_record) produce flat schema records (ADR-111)