From a9734ce7ab698f64b7441731148a9644a9a78276 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 30 Mar 2026 11:27:45 -0600 Subject: [PATCH] feat(server): allow clients to set microbatch_max_interval per query Replace the server-wide `server_microbatch_max_interval` config with a per-query option using the existing SQL `SETTINGS` clause: ```sql SELECT * FROM eth.logs SETTINGS stream = true, microbatch_max_interval = 5000 ``` When not specified, the server falls back to the configured `microbatch_max_interval`. --- crates/bin/ampd/src/server_cmd.rs | 2 +- crates/config/src/config_file.rs | 15 +-- crates/config/src/lib.rs | 5 +- crates/services/server/src/config.rs | 5 +- crates/services/server/src/flight.rs | 10 +- crates/services/server/src/helpers.rs | 108 ++++++++++++++++++++ docs/config.sample.toml | 3 +- docs/feat/app-ampd-server.md | 4 +- docs/feat/query-sql-streaming.md | 17 ++- docs/schemas/config/ampd.spec.json | 9 +- tests/src/testlib/fixtures/daemon_server.rs | 2 +- 11 files changed, 142 insertions(+), 38 deletions(-) diff --git a/crates/bin/ampd/src/server_cmd.rs b/crates/bin/ampd/src/server_cmd.rs index 763c5282d..88a0ddca7 100644 --- a/crates/bin/ampd/src/server_cmd.rs +++ b/crates/bin/ampd/src/server_cmd.rs @@ -163,7 +163,7 @@ pub enum Error { /// Convert common config to server-specific config pub fn config_from_common(config: &CommonConfig) -> ServerConfig { ServerConfig { - server_microbatch_max_interval: config.server_microbatch_max_interval, + microbatch_max_interval: config.microbatch_max_interval, keep_alive_interval: config.keep_alive_interval, max_mem_mb: config.max_mem_mb, query_max_mem_mb: config.query_max_mem_mb, diff --git a/crates/config/src/config_file.rs b/crates/config/src/config_file.rs index a78e8de76..f6585d62c 100644 --- a/crates/config/src/config_file.rs +++ b/crates/config/src/config_file.rs @@ -49,9 +49,6 @@ pub const DEFAULT_MANIFESTS_DIRNAME: &str = "manifests"; /// Default maximum interval for derived dataset dump microbatches (in blocks) pub const DEFAULT_MICROBATCH_MAX_INTERVAL: u64 = 100_000; -/// Default maximum interval for streaming server microbatches (in blocks) -pub const DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL: u64 = 1_000; - /// Default keep-alive interval for streaming server (in seconds) pub const DEFAULT_KEEP_ALIVE_INTERVAL: u64 = 30; @@ -176,12 +173,11 @@ pub struct ConfigFile { /// Polling interval for new blocks during dump in seconds (default: 1.0) #[serde(default)] pub poll_interval_secs: ConfigDuration<1>, - /// Max interval for derived dataset dump microbatches in blocks (default: 100000) + /// Max interval for microbatches in blocks (default: 100000). + /// Used for derived dataset dumps and as the default for streaming server queries + /// when the client does not override via the SQL `SETTINGS` clause. #[serde(default = "default_microbatch_max_interval")] pub microbatch_max_interval: u64, - /// Max interval for streaming server microbatches in blocks (default: 1000) - #[serde(default = "default_server_microbatch_max_interval")] - pub server_microbatch_max_interval: u64, /// Keep-alive interval for streaming server in seconds (default: 30; min: 30) #[serde(default = "default_keep_alive_interval")] pub keep_alive_interval: u64, @@ -247,11 +243,6 @@ fn default_microbatch_max_interval() -> u64 { DEFAULT_MICROBATCH_MAX_INTERVAL } -/// Serde default for [`ConfigFile::server_microbatch_max_interval`]. Returns [`DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL`]. -fn default_server_microbatch_max_interval() -> u64 { - DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL -} - /// Serde default for [`ConfigFile::keep_alive_interval`]. Returns [`DEFAULT_KEEP_ALIVE_INTERVAL`]. fn default_keep_alive_interval() -> u64 { DEFAULT_KEEP_ALIVE_INTERVAL diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 64dcb6ff8..859c19240 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -18,7 +18,7 @@ pub use self::{ config_file::{ ConfigDefaultsOverride, ConfigFile, DEFAULT_DATA_DIRNAME, DEFAULT_KEEP_ALIVE_INTERVAL, DEFAULT_MANIFESTS_DIRNAME, DEFAULT_MICROBATCH_MAX_INTERVAL, DEFAULT_PROVIDERS_DIRNAME, - DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL, no_defaults_override, + no_defaults_override, }, metadb::{DEFAULT_METADB_CONN_POOL_SIZE, DEFAULT_METADB_DIRNAME, MetadataDbConfig}, redacted::Redacted, @@ -103,7 +103,6 @@ fn resolve_config( query_max_mem_mb: config_file.query_max_mem_mb, spill_location: config_file.spill_location, microbatch_max_interval: config_file.microbatch_max_interval, - server_microbatch_max_interval: config_file.server_microbatch_max_interval, parquet: config_file.writer, opentelemetry: config_file.opentelemetry, server_addrs, @@ -213,8 +212,6 @@ pub struct Config { pub spill_location: Vec, /// Maximum interval for derived dataset dump microbatches (in blocks). pub microbatch_max_interval: u64, - /// Maximum interval for streaming server microbatches (in blocks). - pub server_microbatch_max_interval: u64, /// OpenTelemetry observability configuration. pub opentelemetry: Option, /// Network addresses for query server endpoints. diff --git a/crates/services/server/src/config.rs b/crates/services/server/src/config.rs index 0313bde4e..bd99f96ea 100644 --- a/crates/services/server/src/config.rs +++ b/crates/services/server/src/config.rs @@ -6,8 +6,9 @@ use std::path::PathBuf; /// for query execution and streaming. #[derive(Debug, Clone)] pub struct Config { - /// Maximum interval for streaming server microbatches (in blocks) - pub server_microbatch_max_interval: u64, + /// Default maximum interval for streaming microbatches (in blocks). + /// Used when the client does not specify `microbatch_max_interval` in the SQL `SETTINGS` clause. + pub microbatch_max_interval: u64, /// Keep-alive interval for streaming server (in seconds) pub keep_alive_interval: u64, /// Maximum memory the server can use (in MB, 0 = unlimited) diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index 429c9f7eb..fca0b20ac 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -156,8 +156,11 @@ impl Service { .map_err(Error::PlanSql)?; let is_streaming = is_streaming.unwrap_or_else(|| crate::helpers::is_streaming(&query)); + let microbatch_max_interval = crate::helpers::microbatch_max_interval(&query); let dataset_labels = catalog_dataset_labels(&catalog); - let result = self.execute_plan(catalog, plan, is_streaming, cursor).await; + let result = self + .execute_plan(catalog, plan, is_streaming, cursor, microbatch_max_interval) + .await; // Record execution error, once per dataset if result.is_err() @@ -183,6 +186,7 @@ impl Service { plan: DetachedLogicalPlan, is_streaming: bool, cursor: Option, + microbatch_max_interval: Option, ) -> Result { let query_start_time = std::time::Instant::now(); let dataset_labels = catalog_dataset_labels(&catalog); @@ -308,7 +312,9 @@ impl Service { cursor, &self.notification_multiplexer, None, - self.config.server_microbatch_max_interval, + microbatch_max_interval + .unwrap_or(self.config.microbatch_max_interval) + .max(1), self.config.keep_alive_interval, None, // no job_id for query path ) diff --git a/crates/services/server/src/helpers.rs b/crates/services/server/src/helpers.rs index 328e0bb73..62b46e104 100644 --- a/crates/services/server/src/helpers.rs +++ b/crates/services/server/src/helpers.rs @@ -6,6 +6,9 @@ use datafusion::{ /// `SETTINGS` key that enables streaming mode. const STREAM_SETTING: &str = "stream"; +/// `SETTINGS` key that overrides the microbatch max interval. +const MICROBATCH_MAX_INTERVAL_SETTING: &str = "microbatch_max_interval"; + /// Check whether a parsed statement contains `SETTINGS stream = true`. /// /// ```sql @@ -37,6 +40,33 @@ pub fn is_streaming(stmt: &sql::parser::Statement) -> bool { is_streaming } +/// Extract `microbatch_max_interval` from `SETTINGS microbatch_max_interval = `. +/// +/// ```sql +/// SELECT * FROM eth.logs SETTINGS stream = true, microbatch_max_interval = 5000 +/// ``` +pub fn microbatch_max_interval(stmt: &sql::parser::Statement) -> Option { + let sql::parser::Statement::Statement(box_stmt) = stmt else { + return None; + }; + let ast::Statement::Query(query) = box_stmt.as_ref() else { + return None; + }; + let settings = query.settings.as_ref()?; + let setting = settings.iter().find(|s| { + s.key + .value + .eq_ignore_ascii_case(MICROBATCH_MAX_INTERVAL_SETTING) + })?; + let Expr::Value(v) = &setting.value else { + return None; + }; + match &v.value { + ast::Value::Number(n, _) => n.parse::().ok(), + _ => None, + } +} + #[cfg(test)] mod tests { use common::sql_str::SqlStr; @@ -112,6 +142,84 @@ mod tests { assert!(!result, "query without SETTINGS should not be streaming"); } + #[test] + fn microbatch_max_interval_with_valid_value_returns_some() { + //* Given + let stmt = + parse_stmt("SELECT * FROM test SETTINGS stream = true, microbatch_max_interval = 5000"); + + //* When + let result = microbatch_max_interval(&stmt); + + //* Then + assert_eq!( + result, + Some(5000), + "should extract microbatch_max_interval from SETTINGS" + ); + } + + #[test] + fn microbatch_max_interval_with_uppercase_key_returns_value() { + //* Given + let stmt = parse_stmt("SELECT * FROM test SETTINGS MICROBATCH_MAX_INTERVAL = 200"); + + //* When + let result = microbatch_max_interval(&stmt); + + //* Then + assert_eq!( + result, + Some(200), + "case-insensitive key should be recognized" + ); + } + + #[test] + fn microbatch_max_interval_without_setting_returns_none() { + //* Given + let stmt = parse_stmt("SELECT * FROM test SETTINGS stream = true"); + + //* When + let result = microbatch_max_interval(&stmt); + + //* Then + assert_eq!( + result, None, + "missing microbatch_max_interval setting should return None" + ); + } + + #[test] + fn microbatch_max_interval_without_settings_clause_returns_none() { + //* Given + let stmt = parse_stmt("SELECT * FROM test"); + + //* When + let result = microbatch_max_interval(&stmt); + + //* Then + assert_eq!( + result, None, + "query without SETTINGS clause should return None" + ); + } + + #[test] + fn microbatch_max_interval_with_boolean_value_returns_none() { + //* Given + let stmt = parse_stmt("SELECT * FROM test SETTINGS microbatch_max_interval = true"); + + //* When + let result = microbatch_max_interval(&stmt); + + //* Then + assert_eq!( + result, None, + "boolean value should not be parsed as microbatch_max_interval" + ); + } + fn parse_stmt(sql: &str) -> sql::parser::Statement { let sql_str: SqlStr = sql.parse().expect("sql should be valid SqlStr"); common::sql::parse(&sql_str).expect("sql should parse successfully") diff --git a/docs/config.sample.toml b/docs/config.sample.toml index b403214bb..7f7080077 100644 --- a/docs/config.sample.toml +++ b/docs/config.sample.toml @@ -35,8 +35,7 @@ manifests_dir = "manifests" # Operational timing # poll_interval_secs = 1.0 # Polling interval for new blocks during dump in seconds (default: 1.0) -# microbatch_max_interval = 100000 # Max interval for derived dataset dump microbatches in blocks (default: 100000) -# server_microbatch_max_interval = 1000 # Max interval for streaming server microbatches in blocks (default: 1000) +# microbatch_max_interval = 100000 # Max interval for microbatches in blocks; also the streaming server default (default: 100000) # keep_alive_interval = 30 # Keep-alive interval for streaming server in seconds (default: 30, minimum: 30) # Service addresses diff --git a/docs/feat/app-ampd-server.md b/docs/feat/app-ampd-server.md index dc2179ff9..22649fc40 100644 --- a/docs/feat/app-ampd-server.md +++ b/docs/feat/app-ampd-server.md @@ -59,16 +59,16 @@ ampd server | Setting | Default | Description | |---------|---------|-------------| -| `server_microbatch_max_interval` | - | Maximum blocks per streaming microbatch | | `keep_alive_interval` | - | Seconds between keep-alive messages (min 30s) | +Clients can override `microbatch_max_interval` per query via the SQL `SETTINGS` clause (see [Streaming Queries](query-sql-streaming.md)). When not set, the server uses the configured `microbatch_max_interval` value. + ### Config File ```toml # .amp/config.toml flight_addr = "0.0.0.0:1602" jsonl_addr = "0.0.0.0:1603" -server_microbatch_max_interval = 100 keep_alive_interval = 60 ``` diff --git a/docs/feat/query-sql-streaming.md b/docs/feat/query-sql-streaming.md index fabcb5938..d0834d78f 100644 --- a/docs/feat/query-sql-streaming.md +++ b/docs/feat/query-sql-streaming.md @@ -91,10 +91,19 @@ Clients receiving `is_reorg: true` should invalidate cached data from affected b ## Configuration -| Setting | Default | Description | -| -------------------------------- | ------- | ------------------------------------------------- | -| `server_microbatch_max_interval` | - | Maximum blocks per microbatch | -| `keep_alive_interval` | - | Seconds between keep-alive messages (minimum 30s) | +| Setting | Default | Description | +| -------------------- | ------- | ------------------------------------------------- | +| `keep_alive_interval`| - | Seconds between keep-alive messages (minimum 30s) | + +### Per-Query Settings + +The `microbatch_max_interval` can be set per query via the SQL `SETTINGS` clause: + +```sql +SELECT * FROM eth.logs SETTINGS stream = true, microbatch_max_interval = 5000 +``` + +When not specified, the server falls back to the configured `microbatch_max_interval` value. ### Keep-Alive diff --git a/docs/schemas/config/ampd.spec.json b/docs/schemas/config/ampd.spec.json index 528a468c5..f19c63948 100644 --- a/docs/schemas/config/ampd.spec.json +++ b/docs/schemas/config/ampd.spec.json @@ -57,7 +57,7 @@ "minimum": 0 }, "microbatch_max_interval": { - "description": "Max interval for derived dataset dump microbatches in blocks (default: 100000)", + "description": "Max interval for microbatches in blocks (default: 100000).\nUsed for derived dataset dumps and as the default for streaming server queries\nwhen the client does not override via the SQL `SETTINGS` clause.", "type": "integer", "format": "uint64", "default": 100000, @@ -89,13 +89,6 @@ "default": 0, "minimum": 0 }, - "server_microbatch_max_interval": { - "description": "Max interval for streaming server microbatches in blocks (default: 1000)", - "type": "integer", - "format": "uint64", - "default": 1000, - "minimum": 0 - }, "spill_location": { "description": "Paths for DataFusion temporary files for spill-to-disk (default: [])", "type": "array", diff --git a/tests/src/testlib/fixtures/daemon_server.rs b/tests/src/testlib/fixtures/daemon_server.rs index e329481d4..64beaf1bb 100644 --- a/tests/src/testlib/fixtures/daemon_server.rs +++ b/tests/src/testlib/fixtures/daemon_server.rs @@ -156,7 +156,7 @@ impl Drop for DaemonServer { /// Convert config::Config to server::config::Config fn server_config_from_common(config: &_config::Config) -> Config { Config { - server_microbatch_max_interval: config.server_microbatch_max_interval, + microbatch_max_interval: config.microbatch_max_interval, keep_alive_interval: config.keep_alive_interval, max_mem_mb: config.max_mem_mb, query_max_mem_mb: config.query_max_mem_mb,