Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ config-reload = ["config", "parking_lot", "tokio", "tracing"]
config-postgres = ["config", "sqlx", "tokio", "serde_json"]

# Transport features
transport = ["tokio", "serde_json", "rmp-serde", "chrono", "async-trait"]
transport = ["tokio", "serde_json", "rmp-serde", "chrono", "async-trait", "regex", "memchr"]
transport-memory = ["transport"]
transport-kafka = ["transport", "rdkafka", "regex", "tokio-util"]
transport-grpc = ["transport", "dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:tonic-prost-build", "dep:prost-build"]
Expand Down Expand Up @@ -232,6 +232,7 @@ sonic-rs = { version = ">=0.5, <1", optional = true }

# Regex (topic resolver include/exclude filters)
regex = { version = ">=1.11, <2", optional = true }
memchr = { version = ">=2.7", optional = true }

# Async trait (for tiered-sink Sink trait)
async-trait = { version = ">=0.1.88, <0.2", optional = true }
Expand Down Expand Up @@ -306,3 +307,8 @@ harness = false
name = "engine_benchmark"
harness = false
required-features = ["worker"]

[[bench]]
name = "filter_benchmark"
harness = false
required-features = ["transport-memory"]
2 changes: 2 additions & 0 deletions STATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Modular library with feature-gated components. Each module can be enabled/disabl
15. **memory** - MemoryGuard: cgroup-aware memory backpressure with auto-detection
16. **scaling** - ScalingPressure: KEDA autoscaling signal calculation
17. **cli** - DfeApp trait, ServiceRuntime (pre-wired metrics + worker pool + batch engine + memory guard + shutdown)
18. **transport-filter** - TransportFilterEngine: CEL-syntax message filtering embedded in every transport. Tier 1 SIMD field ops (~50-100ns), Tier 2 compiled CEL (opt-in), Tier 3 complex CEL with regex/iteration (opt-in). Inbound/outbound, drop/dlq, first-match-wins.

### Tech Stack

Expand Down Expand Up @@ -154,6 +155,7 @@ spool, cache, secrets, HTTP client, DLQ — with zero additional wiring.
- **DFE parallelisation pattern** — split sequential hot loops into parallel (pure `&self` computation via rayon) and sequential (mutable state: buffer push, mark_pending, stats, DLQ) phases. The `BatchProcessor` trait + `BatchPipeline` struct in rustlib provide the common framework. Each DFE app implements `BatchProcessor` for its domain. See `src/pipeline/` module.
- **ServiceRuntime** — pre-built infrastructure for DFE service apps. Created by `run_app()` before `run_service()`. Contains MetricsManager, DfeMetrics, MemoryGuard (optional), shutdown token (with K8s pre-stop delay), worker pool (optional), batch engine (optional), scaling pressure (optional), RuntimeContext. Apps receive it fully wired. See `src/cli/runtime.rs`.
- **BatchEngine** — SIMD-optimised batch processing for DFE pipelines. Two modes: `process_mid_tier()` (parse JSON via sonic-rs + parallel transform via rayon) and `process_raw()` (skip parsing, parallel transform on raw bytes). Transport-wired: `run_async()` / `run_raw_async()` with async sink, sink-managed commit tokens, and optional ticker callback. See `src/worker/engine/`.
- **TransportFilterEngine** — CEL-syntax message filtering embedded in every transport (Kafka, gRPC, Memory, File, Pipe, HTTP, Redis). Three performance tiers: Tier 1 (SIMD field ops via sonic_rs::get_from_slice, ~50-200ns/msg, always enabled), Tier 2 (compiled CEL with extracted fields, requires `expression.allow_cel_filters_in/out`), Tier 3 (CEL with regex/iteration/time, requires `expression.allow_complex_filters_in/out`). Operators write CEL syntax — engine classifies via text pattern matching and bypasses CEL engine entirely for Tier 1. First-match-wins, drop/dlq actions, fail-fast at startup. Zero downstream code changes — config-only activation. See `src/transport/filter/`.
- **RuntimeContext** — rich runtime metadata detected once at startup (pod_name, namespace, node_name, container_id, memory_limit_bytes, cpu_quota_cores). Global singleton via OnceLock. All modules read from this instead of doing their own env var lookups. No-ops on bare metal. See `src/env.rs`.
- **K8s pre-stop compliance** — shutdown handler sleeps `PRESTOP_DELAY_SECS` (default 5 in K8s, 0 elsewhere) before cancelling the token. Prevents traffic routing to a draining pod.
- **Deployment contract CI bridge** — `container-manifest.json` (minimal CI subset), `Dockerfile.runtime` (runtime stage fragment for CI composition), OCI labels (static from contract, dynamic injected by CI), `from_cargo_toml()` for auto-detecting native deps, `schema_version` field.
Expand Down
233 changes: 233 additions & 0 deletions benches/filter_benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Project: hyperi-rustlib
// File: benches/filter_benchmark.rs
// Purpose: Criterion benchmarks for transport filter engine performance
// Language: Rust
//
// License: FSL-1.1-ALv2
// Copyright: (c) 2026 HYPERI PTY LIMITED

//! Benchmarks for the transport filter engine.
//!
//! Validates the design assumption: Tier 1 filters are ~50-100ns/msg via SIMD,
//! and the no-filter overhead is negligible.

use criterion::{Criterion, Throughput, criterion_group, criterion_main};

use hyperi_rustlib::transport::filter::{
FilterAction, FilterDisposition, FilterRule, TransportFilterEngine, TransportFilterTierConfig,
};

const SAMPLE_PAYLOAD: &[u8] = br#"{"_table":"events","host":"prod-web01","source_type":"syslog","severity":3,"id":12345,"timestamp":"2026-04-10T12:00:00Z","message":"Sample log event with some realistic padding for benchmarking"}"#;

const POISON_PAYLOAD: &[u8] = br#"{"_table":"events","status":"poison","data":"x"}"#;

fn bench_no_filters_baseline(c: &mut Criterion) {
let engine = TransportFilterEngine::empty();

let mut group = c.benchmark_group("filter_no_filters");
group.throughput(Throughput::Elements(1));
group.bench_function("apply_inbound_no_filters", |b| {
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
});
group.finish();
}

fn bench_tier1_field_exists(c: &mut Criterion) {
let engine = TransportFilterEngine::new(
&[FilterRule {
expression: "has(_table)".into(),
action: FilterAction::Drop,
}],
&[],
&TransportFilterTierConfig::default(),
)
.unwrap();

let mut group = c.benchmark_group("filter_tier1_field_exists");
group.throughput(Throughput::Elements(1));
group.bench_function("match", |b| {
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
});
group.finish();
}

fn bench_tier1_field_equals(c: &mut Criterion) {
let engine = TransportFilterEngine::new(
&[FilterRule {
expression: r#"status == "poison""#.into(),
action: FilterAction::Drop,
}],
&[],
&TransportFilterTierConfig::default(),
)
.unwrap();

let mut group = c.benchmark_group("filter_tier1_field_equals");
group.throughput(Throughput::Elements(1));
group.bench_function("no_match_pass", |b| {
b.iter(|| {
let result = engine.apply_inbound(SAMPLE_PAYLOAD);
assert_eq!(result, FilterDisposition::Pass);
std::hint::black_box(result)
});
});
group.bench_function("match_drop", |b| {
b.iter(|| {
let result = engine.apply_inbound(POISON_PAYLOAD);
assert_eq!(result, FilterDisposition::Drop);
std::hint::black_box(result)
});
});
group.finish();
}

fn bench_tier1_starts_with(c: &mut Criterion) {
let engine = TransportFilterEngine::new(
&[FilterRule {
expression: r#"host.startsWith("prod-")"#.into(),
action: FilterAction::Drop,
}],
&[],
&TransportFilterTierConfig::default(),
)
.unwrap();

let mut group = c.benchmark_group("filter_tier1_starts_with");
group.throughput(Throughput::Elements(1));
group.bench_function("match", |b| {
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
});
group.finish();
}

fn bench_tier1_dotted_path(c: &mut Criterion) {
let nested_payload =
br#"{"metadata":{"source":"aws","region":"ap-southeast-2"},"event":"login"}"#;
let engine = TransportFilterEngine::new(
&[FilterRule {
expression: r#"metadata.source == "aws""#.into(),
action: FilterAction::Drop,
}],
&[],
&TransportFilterTierConfig::default(),
)
.unwrap();

let mut group = c.benchmark_group("filter_tier1_dotted_path");
group.throughput(Throughput::Elements(1));
group.bench_function("nested_match", |b| {
b.iter(|| std::hint::black_box(engine.apply_inbound(nested_payload)));
});
group.finish();
}

fn bench_tier1_first_match_wins(c: &mut Criterion) {
// 5 filters, message matches the third one
let rules = vec![
FilterRule {
expression: "has(no_match_1)".into(),
action: FilterAction::Drop,
},
FilterRule {
expression: "has(no_match_2)".into(),
action: FilterAction::Drop,
},
FilterRule {
expression: "has(_table)".into(),
action: FilterAction::Drop,
},
FilterRule {
expression: "has(no_match_3)".into(),
action: FilterAction::Drop,
},
FilterRule {
expression: "has(no_match_4)".into(),
action: FilterAction::Drop,
},
];
let engine =
TransportFilterEngine::new(&rules, &[], &TransportFilterTierConfig::default()).unwrap();

let mut group = c.benchmark_group("filter_tier1_first_match_wins");
group.throughput(Throughput::Elements(1));
group.bench_function("match_at_position_3", |b| {
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
});
group.finish();
}

#[cfg(feature = "expression")]
fn bench_tier2_compound_cel(c: &mut Criterion) {
let tier_config = TransportFilterTierConfig {
allow_cel_filters_in: true,
..Default::default()
};
let engine = TransportFilterEngine::new(
&[FilterRule {
expression: r#"severity > 3 && source != "internal""#.into(),
action: FilterAction::Drop,
}],
&[],
&tier_config,
)
.unwrap();

let payload = br#"{"severity":5,"source":"external","data":"x"}"#;

let mut group = c.benchmark_group("filter_tier2_compound_cel");
group.throughput(Throughput::Elements(1));
group.bench_function("compound_cel_match", |b| {
b.iter(|| std::hint::black_box(engine.apply_inbound(payload)));
});
group.finish();
}

#[cfg(feature = "expression")]
fn bench_tier3_regex_cel(c: &mut Criterion) {
let tier_config = TransportFilterTierConfig {
allow_complex_filters_in: true,
..Default::default()
};
let engine = TransportFilterEngine::new(
&[FilterRule {
expression: r#"host.matches("^prod-.*$")"#.into(),
action: FilterAction::Drop,
}],
&[],
&tier_config,
)
.unwrap();

let mut group = c.benchmark_group("filter_tier3_regex_cel");
group.throughput(Throughput::Elements(1));
group.bench_function("regex_match", |b| {
b.iter(|| std::hint::black_box(engine.apply_inbound(SAMPLE_PAYLOAD)));
});
group.finish();
}

#[cfg(feature = "expression")]
criterion_group!(
benches,
bench_no_filters_baseline,
bench_tier1_field_exists,
bench_tier1_field_equals,
bench_tier1_starts_with,
bench_tier1_dotted_path,
bench_tier1_first_match_wins,
bench_tier2_compound_cel,
bench_tier3_regex_cel,
);

#[cfg(not(feature = "expression"))]
criterion_group!(
benches,
bench_no_filters_baseline,
bench_tier1_field_exists,
bench_tier1_field_equals,
bench_tier1_starts_with,
bench_tier1_dotted_path,
bench_tier1_first_match_wins,
);

criterion_main!(benches);
Loading