feat(filter): transport filter engine — CEL-syntax filters with SIMD Tier 1#33
Merged
catinspace-au merged 7 commits intomainfrom Apr 11, 2026
Merged
feat(filter): transport filter engine — CEL-syntax filters with SIMD Tier 1#33catinspace-au merged 7 commits intomainfrom
catinspace-au merged 7 commits intomainfrom
Conversation
Remove stale docs (CLICKHOUSE_PYTHON_BINDINGS.md, GAP_ANALYSIS.md). Design spec (local, gitignored) for transport-level message filtering: - CEL syntax for all filters, SIMD fast-path for simple patterns - 3-tier model: Tier 1 (SIMD, always on), Tier 2 (CEL, opt-in), Tier 3 (complex CEL, opt-in) - Embedded in every transport via TransportFilterEngine - Zero downstream code changes — config-only activation - First-match-wins evaluation, fail-fast at startup - Spec review findings addressed (DLQ plumbing, commit tokens, text-based classification, MsgPack bypass, nested paths)
…SIMD evaluation Chunk 1 of transport filter engine implementation: - FilterRule, FilterAction, TransportFilterTierConfig config types - Tier classification via text pattern matching (regex, not AST walk) - Tier 1: has(), ==, !=, startsWith, endsWith, contains - Tier 2: compound CEL (detected by exclusion) - Tier 3: restricted functions (matches, exists, map, etc.) - CompiledFilter with Tier 1 SIMD evaluation via sonic_rs::get_from_slice - Dotted paths supported (metadata.source splits to path array) - MsgPack payloads bypass filters (JSON-only SIMD extraction) - TransportFilterEngine assembly with startup validation: - Tier gate enforcement (reject above enabled tier) - Ordering warnings (higher tier before lower tier) - Filter count warnings (>20 per direction) - FilterMetrics counters (transport_filtered_total) - 56 tests covering config, classification, evaluation, engine assembly regex added to transport feature (required for Tier 1 classification).
Chunk 2 — transport integration: - filters_in/filters_out added to all transport configs (KafkaConfig, GrpcConfig, MemoryConfig, FileTransportConfig, PipeTransportConfig, HttpTransportConfig, RedisTransportConfig) with #[serde(default)] - TransportFilterEngine embedded in every transport struct - Inbound filtering in recv(): retain() removes drop/dlq messages - Outbound filtering in send(): returns SendResult::Ok (drop) or SendResult::FilteredDlq (dlq action) - SendResult::FilteredDlq variant added to types.rs - TransportFilterEngine::empty() constructor for graceful fallback - No SEP: fixed deployment doc-test (missing oci_labels, schema_version) Zero downstream code changes — all new fields default to empty vecs. 1,234 tests pass with --all-features, clippy clean.
…, sample data 40 new tests in tests/transport_filter.rs: Integration (MemoryTransport round-trip): - Inbound filter drops/DLQs matching messages - Outbound filter blocks send, returns FilteredDlq - No-filters passthrough (baseline) - First-match-wins evaluation order - Mixed Tier 1 filter types (exists, equals, startsWith) Expected failures: - Tier 2 rejected without allow_cel_filters_in - Tier 3 rejected without allow_complex_filters_in - Tier 3 iteration blocked (exists function) - Invalid CEL syntax fails at construction - Empty/whitespace expression fails Sample data (real DFE payloads): - Syslog events, nested cloud events, loader routing - Receiver poison messages, fetcher debug filter Adversarial: - Binary garbage, truncated JSON, empty payload - JSON null/array/string (not objects) - CEL-significant chars in field values - 1MB payload, 100 filters, missing fields - Unicode, deeply nested paths, MsgPack bypass Engine API: empty engine, direction independence, DLQ detection Config: YAML deserialization, tier config, empty config 1,274 tests pass with --all-features, clippy clean.
Chunk 4 — performance optimisation, CEL evaluation tests, benchmarks, docs.
SIMD/zero-copy review optimisations:
- Pre-compiled memchr::memmem::Finder for FieldExists/FieldNotExists
with single-segment paths. Detects "field": pattern in raw bytes,
bypassing JSON parser entirely.
- has(_table) match: 218ns -> 107ns (51% faster)
- 5 filters first-match-wins: 1697ns -> 129ns (92% faster)
- Stack-allocated path arrays via with_path_refs (1-4 segments inline)
eliminates per-message Vec allocation
- extract_string_value uses sonic-rs is_str() + memchr SIMD escape
detection — zero-copy Cow<str> for the common case (no escapes)
- Fix field extraction in classify.rs for method calls (host.matches
was extracting "host.matches" instead of "host")
Tier 2/3 CEL evaluation tests (5 new):
- Compound expression: severity > 3 && source != "internal"
- size() function on arrays
- Field-to-field comparison
- Tier 3 regex via host.matches("^prod-.*")
- Missing field safety
Benchmark suite (benches/filter_benchmark.rs):
- No-filter baseline: 3.2 ns (zero overhead confirmed)
- Tier 1 has() match: 107 ns (memmem fast-path)
- Tier 1 ==: 288 ns match, 741 ns no-match (sonic-rs full scan)
- Tier 1 startsWith: 250 ns
- Tier 1 dotted path: 239 ns
- Tier 2/3 CEL benchmarks (feature-gated)
Docs:
- STATE.md: added transport-filter to Key Components
- STATE.md: added TransportFilterEngine decision entry
700 tests pass, clippy clean with --all-features.
memchr added to transport feature for SIMD substring search.
Expose filter-staged DLQ entries via a new default trait method on TransportReceiver. When inbound filters classify messages as action:dlq, they are removed from the recv() result and staged in a per-transport parking_lot::Mutex buffer; callers drain the buffer after each recv() call and route the entries to their own DLQ handle. Also add TransportFilterEngine::partition_batch() — a generic batch API that returns a FilteredBatch<T> with passing messages, DLQ entries, and drop count in one pass. Transports can use this to avoid implementing the retain/stage pattern themselves. All seven transports (memory, kafka, grpc, file, pipe, http, redis) wire the DLQ buffer and override take_filtered_dlq_entries(). The existing retain-and-drop behaviour is preserved for action:drop; only DLQ entries get the new exposure. Default trait impl returns an empty vec, so transports without filter support (none currently, but future ones) don't need to override.
Add the final review findings from the pre-merge audit: * Python<->Rust classifier parity via shared JSON fixture (tests/fixtures/cel_classifier_parity.json). The matching dfe-engine test loads the same fixture and runs every entry through the Python classifier — both sides must agree on tier, op kind, op field, op value, and field references. * Concurrency test: 32 tokio tasks share one Arc<TransportFilterEngine> and classify 1000 messages each. Also a compile-time assert_send_sync check for FilterRule/Action/Disposition. * DLQ buffer drain tests covering take_filtered_dlq_entries(): entries are exposed, draining empties the buffer, drop filters don't pollute the buffer, and no filters means zero overhead. * Memmem nested-field limitation: documents the known false-positive case where has(field) matches a nested occurrence (needed for the 50% performance gain on the common top-level case) and the sound case where JSON-escaped quotes prevent false positives inside string values. * Per-transport smoke test for MemoryTransport filter wiring. Also add docs/TRANSPORT-FILTER-FOLLOWUP.md tracking items #7-#11: constant-time compare, log masking, pre-quoted equals fast path, msgpack edges, and expression_text reuse on reload.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Auto-enabled CEL-syntax message filtering embedded in every transport (Kafka, gRPC, Memory, File, Pipe, HTTP, Redis). Three performance tiers:
has(),==,!=,startsWith,endsWith,containsbypass the CEL engine and usesonic_rs::get_from_sliceor a pre-compiledmemchr::memmem::Finderfor existence checks.expression.allow_cel_filters_in/out, ~500 ns–1 µs/msg) — compound CEL expressions evaluated throughcel-interpreterv0.10.0 with extracted field context.expression.allow_complex_filters_in/out, ~5–50 µs/msg) — regex/iteration/time functions. Classified, not rejected by default — the UI surfaces opt-in requirements.First-match-wins evaluation with
dropordlqactions. Inbound and outbound directions supported. DLQ entries exposed via a newTransportReceiver::take_filtered_dlq_entries()trait method so callers can route them via their own DLQ handles without changingrecv()signatures.Zero downstream code changes — config-only activation across all DFE Rust services.
Benchmark results
has(_table)single filterTest plan
src/transport/filter/— classifier, compiled filters, profile gatestests/transport_filter.rs— 54 tests covering:take_filtered_dlq_entries()drain semanticstests/fixtures/cel_classifier_parity.jsonArc<TransportFilterEngine>hyperi-ci check --quickpasses (fmt, clippy src/tests, audit, deny, semgrep)/v1/cel/checkFastAPI endpoint + Python classifier for live UI validation using the samecel-interpreterengineFollow-ups
See
docs/TRANSPORT-FILTER-FOLLOWUP.mdfor items #7–#11:field == "value"None block this merge.