Skip to content

feat(filter): transport filter engine — CEL-syntax filters with SIMD Tier 1#33

Merged
catinspace-au merged 7 commits intomainfrom
feat/transport-filter-engine
Apr 11, 2026
Merged

feat(filter): transport filter engine — CEL-syntax filters with SIMD Tier 1#33
catinspace-au merged 7 commits intomainfrom
feat/transport-filter-engine

Conversation

@catinspace-au
Copy link
Copy Markdown
Collaborator

Summary

Auto-enabled CEL-syntax message filtering embedded in every transport (Kafka, gRPC, Memory, File, Pipe, HTTP, Redis). Three performance tiers:

  • Tier 1 (always on, ~50–200 ns/msg) — SIMD field ops detected by regex classification. has(), ==, !=, startsWith, endsWith, contains bypass the CEL engine and use sonic_rs::get_from_slice or a pre-compiled memchr::memmem::Finder for existence checks.
  • Tier 2 (opt-in via expression.allow_cel_filters_in/out, ~500 ns–1 µs/msg) — compound CEL expressions evaluated through cel-interpreter v0.10.0 with extracted field context.
  • Tier 3 (opt-in via expression.allow_complex_filters_in/out, ~5–50 µs/msg) — regex/iteration/time functions. Classified, not rejected by default — the UI surfaces opt-in requirements.

First-match-wins evaluation with drop or dlq actions. Inbound and outbound directions supported. DLQ entries exposed via a new TransportReceiver::take_filtered_dlq_entries() trait method so callers can route them via their own DLQ handles without changing recv() signatures.

Zero downstream code changes — config-only activation across all DFE Rust services.

Benchmark results

Scenario Before After Change
No filters configured 3.2 ns 3.2 ns baseline
has(_table) single filter 218 ns 107 ns -51%
5-filter chain, miss all 1697 ns 129 ns -13x
Tier 2 compound CEL ~800 ns ~800 ns unchanged

Test plan

  • Unit tests in src/transport/filter/ — classifier, compiled filters, profile gates
  • Integration tests in tests/transport_filter.rs — 54 tests covering:
    • Sample data from real DFE pipelines (receiver poison, loader routing, fetcher debug)
    • Adversarial inputs (binary garbage, truncated JSON, Unicode fields, 1 MB payloads, deeply nested paths)
    • Expected failures (invalid CEL, tier rejections without opt-in)
    • FilteredBatch + take_filtered_dlq_entries() drain semantics
    • Python↔Rust classifier parity via shared fixture tests/fixtures/cel_classifier_parity.json
    • Concurrency: 32 tokio tasks × 1000 messages sharing one Arc<TransportFilterEngine>
    • Memmem nested-field limitation documented
  • All 7 transports wire the filter engine and DLQ buffer
  • hyperi-ci check --quick passes (fmt, clippy src/tests, audit, deny, semgrep)
  • 427 lib tests pass
  • dfe-engine companion branch ships /v1/cel/check FastAPI endpoint + Python classifier for live UI validation using the same cel-interpreter engine

Follow-ups

See docs/TRANSPORT-FILTER-FOLLOWUP.md for items #7#11:

  • Constant-time comparison for sensitive-field filters
  • Log masking of expression text
  • Pre-quoted bytes fast path for field == "value"
  • MsgPack payload filtering (currently bypassed, documented)
  • Cached compiled filters on hot-reload

None block this merge.

Remove stale docs (CLICKHOUSE_PYTHON_BINDINGS.md, GAP_ANALYSIS.md).

Design spec (local, gitignored) for transport-level message filtering:
- CEL syntax for all filters, SIMD fast-path for simple patterns
- 3-tier model: Tier 1 (SIMD, always on), Tier 2 (CEL, opt-in),
  Tier 3 (complex CEL, opt-in)
- Embedded in every transport via TransportFilterEngine
- Zero downstream code changes — config-only activation
- First-match-wins evaluation, fail-fast at startup
- Spec review findings addressed (DLQ plumbing, commit tokens,
  text-based classification, MsgPack bypass, nested paths)
…SIMD evaluation

Chunk 1 of transport filter engine implementation:

- FilterRule, FilterAction, TransportFilterTierConfig config types
- Tier classification via text pattern matching (regex, not AST walk)
  - Tier 1: has(), ==, !=, startsWith, endsWith, contains
  - Tier 2: compound CEL (detected by exclusion)
  - Tier 3: restricted functions (matches, exists, map, etc.)
- CompiledFilter with Tier 1 SIMD evaluation via sonic_rs::get_from_slice
  - Dotted paths supported (metadata.source splits to path array)
  - MsgPack payloads bypass filters (JSON-only SIMD extraction)
- TransportFilterEngine assembly with startup validation:
  - Tier gate enforcement (reject above enabled tier)
  - Ordering warnings (higher tier before lower tier)
  - Filter count warnings (>20 per direction)
- FilterMetrics counters (transport_filtered_total)
- 56 tests covering config, classification, evaluation, engine assembly

regex added to transport feature (required for Tier 1 classification).
Chunk 2 — transport integration:

- filters_in/filters_out added to all transport configs (KafkaConfig,
  GrpcConfig, MemoryConfig, FileTransportConfig, PipeTransportConfig,
  HttpTransportConfig, RedisTransportConfig) with #[serde(default)]
- TransportFilterEngine embedded in every transport struct
- Inbound filtering in recv(): retain() removes drop/dlq messages
- Outbound filtering in send(): returns SendResult::Ok (drop) or
  SendResult::FilteredDlq (dlq action)
- SendResult::FilteredDlq variant added to types.rs
- TransportFilterEngine::empty() constructor for graceful fallback
- No SEP: fixed deployment doc-test (missing oci_labels, schema_version)

Zero downstream code changes — all new fields default to empty vecs.
1,234 tests pass with --all-features, clippy clean.
…, sample data

40 new tests in tests/transport_filter.rs:

Integration (MemoryTransport round-trip):
- Inbound filter drops/DLQs matching messages
- Outbound filter blocks send, returns FilteredDlq
- No-filters passthrough (baseline)
- First-match-wins evaluation order
- Mixed Tier 1 filter types (exists, equals, startsWith)

Expected failures:
- Tier 2 rejected without allow_cel_filters_in
- Tier 3 rejected without allow_complex_filters_in
- Tier 3 iteration blocked (exists function)
- Invalid CEL syntax fails at construction
- Empty/whitespace expression fails

Sample data (real DFE payloads):
- Syslog events, nested cloud events, loader routing
- Receiver poison messages, fetcher debug filter

Adversarial:
- Binary garbage, truncated JSON, empty payload
- JSON null/array/string (not objects)
- CEL-significant chars in field values
- 1MB payload, 100 filters, missing fields
- Unicode, deeply nested paths, MsgPack bypass

Engine API: empty engine, direction independence, DLQ detection
Config: YAML deserialization, tier config, empty config

1,274 tests pass with --all-features, clippy clean.
Chunk 4 — performance optimisation, CEL evaluation tests, benchmarks, docs.

SIMD/zero-copy review optimisations:
- Pre-compiled memchr::memmem::Finder for FieldExists/FieldNotExists
  with single-segment paths. Detects "field": pattern in raw bytes,
  bypassing JSON parser entirely.
  - has(_table) match: 218ns -> 107ns (51% faster)
  - 5 filters first-match-wins: 1697ns -> 129ns (92% faster)
- Stack-allocated path arrays via with_path_refs (1-4 segments inline)
  eliminates per-message Vec allocation
- extract_string_value uses sonic-rs is_str() + memchr SIMD escape
  detection — zero-copy Cow<str> for the common case (no escapes)
- Fix field extraction in classify.rs for method calls (host.matches
  was extracting "host.matches" instead of "host")

Tier 2/3 CEL evaluation tests (5 new):
- Compound expression: severity > 3 && source != "internal"
- size() function on arrays
- Field-to-field comparison
- Tier 3 regex via host.matches("^prod-.*")
- Missing field safety

Benchmark suite (benches/filter_benchmark.rs):
- No-filter baseline: 3.2 ns (zero overhead confirmed)
- Tier 1 has() match: 107 ns (memmem fast-path)
- Tier 1 ==: 288 ns match, 741 ns no-match (sonic-rs full scan)
- Tier 1 startsWith: 250 ns
- Tier 1 dotted path: 239 ns
- Tier 2/3 CEL benchmarks (feature-gated)

Docs:
- STATE.md: added transport-filter to Key Components
- STATE.md: added TransportFilterEngine decision entry

700 tests pass, clippy clean with --all-features.
memchr added to transport feature for SIMD substring search.
Expose filter-staged DLQ entries via a new default trait method on
TransportReceiver. When inbound filters classify messages as action:dlq,
they are removed from the recv() result and staged in a per-transport
parking_lot::Mutex buffer; callers drain the buffer after each recv()
call and route the entries to their own DLQ handle.

Also add TransportFilterEngine::partition_batch() — a generic batch API
that returns a FilteredBatch<T> with passing messages, DLQ entries, and
drop count in one pass. Transports can use this to avoid implementing
the retain/stage pattern themselves.

All seven transports (memory, kafka, grpc, file, pipe, http, redis)
wire the DLQ buffer and override take_filtered_dlq_entries(). The
existing retain-and-drop behaviour is preserved for action:drop; only
DLQ entries get the new exposure.

Default trait impl returns an empty vec, so transports without filter
support (none currently, but future ones) don't need to override.
Add the final review findings from the pre-merge audit:

* Python<->Rust classifier parity via shared JSON fixture
  (tests/fixtures/cel_classifier_parity.json). The matching
  dfe-engine test loads the same fixture and runs every entry
  through the Python classifier — both sides must agree on tier,
  op kind, op field, op value, and field references.

* Concurrency test: 32 tokio tasks share one Arc<TransportFilterEngine>
  and classify 1000 messages each. Also a compile-time
  assert_send_sync check for FilterRule/Action/Disposition.

* DLQ buffer drain tests covering take_filtered_dlq_entries():
  entries are exposed, draining empties the buffer, drop filters
  don't pollute the buffer, and no filters means zero overhead.

* Memmem nested-field limitation: documents the known false-positive
  case where has(field) matches a nested occurrence (needed for the
  50% performance gain on the common top-level case) and the sound
  case where JSON-escaped quotes prevent false positives inside
  string values.

* Per-transport smoke test for MemoryTransport filter wiring.

Also add docs/TRANSPORT-FILTER-FOLLOWUP.md tracking items #7-#11:
constant-time compare, log masking, pre-quoted equals fast path,
msgpack edges, and expression_text reuse on reload.
@catinspace-au catinspace-au merged commit 9271451 into main Apr 11, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant