Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bin/ampd/src/server_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub enum Error {
/// Convert common config to server-specific config
pub fn config_from_common(config: &CommonConfig) -> ServerConfig {
ServerConfig {
server_microbatch_max_interval: config.server_microbatch_max_interval,
microbatch_max_interval: config.microbatch_max_interval,
keep_alive_interval: config.keep_alive_interval,
max_mem_mb: config.max_mem_mb,
query_max_mem_mb: config.query_max_mem_mb,
Expand Down
15 changes: 3 additions & 12 deletions crates/config/src/config_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ pub const DEFAULT_MANIFESTS_DIRNAME: &str = "manifests";
/// Default maximum interval for derived dataset dump microbatches (in blocks)
pub const DEFAULT_MICROBATCH_MAX_INTERVAL: u64 = 100_000;

/// Default maximum interval for streaming server microbatches (in blocks)
pub const DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL: u64 = 1_000;

/// Default keep-alive interval for streaming server (in seconds)
pub const DEFAULT_KEEP_ALIVE_INTERVAL: u64 = 30;

Expand Down Expand Up @@ -176,12 +173,11 @@ pub struct ConfigFile {
/// Polling interval for new blocks during dump in seconds (default: 1.0)
#[serde(default)]
pub poll_interval_secs: ConfigDuration<1>,
/// Max interval for derived dataset dump microbatches in blocks (default: 100000)
/// Max interval for microbatches in blocks (default: 100000).
/// Used for derived dataset dumps and as the default for streaming server queries
/// when the client does not override via the SQL `SETTINGS` clause.
#[serde(default = "default_microbatch_max_interval")]
pub microbatch_max_interval: u64,
/// Max interval for streaming server microbatches in blocks (default: 1000)
#[serde(default = "default_server_microbatch_max_interval")]
pub server_microbatch_max_interval: u64,
/// Keep-alive interval for streaming server in seconds (default: 30; min: 30)
#[serde(default = "default_keep_alive_interval")]
pub keep_alive_interval: u64,
Expand Down Expand Up @@ -247,11 +243,6 @@ fn default_microbatch_max_interval() -> u64 {
DEFAULT_MICROBATCH_MAX_INTERVAL
}

/// Serde default for [`ConfigFile::server_microbatch_max_interval`]. Returns [`DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL`].
fn default_server_microbatch_max_interval() -> u64 {
DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL
}

/// Serde default for [`ConfigFile::keep_alive_interval`]. Returns [`DEFAULT_KEEP_ALIVE_INTERVAL`].
fn default_keep_alive_interval() -> u64 {
DEFAULT_KEEP_ALIVE_INTERVAL
Expand Down
5 changes: 1 addition & 4 deletions crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use self::{
config_file::{
ConfigDefaultsOverride, ConfigFile, DEFAULT_DATA_DIRNAME, DEFAULT_KEEP_ALIVE_INTERVAL,
DEFAULT_MANIFESTS_DIRNAME, DEFAULT_MICROBATCH_MAX_INTERVAL, DEFAULT_PROVIDERS_DIRNAME,
DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL, no_defaults_override,
no_defaults_override,
},
metadb::{DEFAULT_METADB_CONN_POOL_SIZE, DEFAULT_METADB_DIRNAME, MetadataDbConfig},
redacted::Redacted,
Expand Down Expand Up @@ -103,7 +103,6 @@ fn resolve_config(
query_max_mem_mb: config_file.query_max_mem_mb,
spill_location: config_file.spill_location,
microbatch_max_interval: config_file.microbatch_max_interval,
server_microbatch_max_interval: config_file.server_microbatch_max_interval,
parquet: config_file.writer,
opentelemetry: config_file.opentelemetry,
server_addrs,
Expand Down Expand Up @@ -213,8 +212,6 @@ pub struct Config {
pub spill_location: Vec<PathBuf>,
/// Maximum interval for derived dataset dump microbatches (in blocks).
pub microbatch_max_interval: u64,
/// Maximum interval for streaming server microbatches (in blocks).
pub server_microbatch_max_interval: u64,
/// OpenTelemetry observability configuration.
pub opentelemetry: Option<OpenTelemetryConfig>,
/// Network addresses for query server endpoints.
Expand Down
5 changes: 3 additions & 2 deletions crates/services/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::path::PathBuf;
/// for query execution and streaming.
#[derive(Debug, Clone)]
pub struct Config {
/// Maximum interval for streaming server microbatches (in blocks)
pub server_microbatch_max_interval: u64,
/// Default maximum interval for streaming microbatches (in blocks).
/// Used when the client does not specify `microbatch_max_interval` in the SQL `SETTINGS` clause.
pub microbatch_max_interval: u64,
/// Keep-alive interval for streaming server (in seconds)
pub keep_alive_interval: u64,
/// Maximum memory the server can use (in MB, 0 = unlimited)
Expand Down
10 changes: 8 additions & 2 deletions crates/services/server/src/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,11 @@ impl Service {
.map_err(Error::PlanSql)?;

let is_streaming = is_streaming.unwrap_or_else(|| crate::helpers::is_streaming(&query));
let microbatch_max_interval = crate::helpers::microbatch_max_interval(&query);
let dataset_labels = catalog_dataset_labels(&catalog);
let result = self.execute_plan(catalog, plan, is_streaming, cursor).await;
let result = self
.execute_plan(catalog, plan, is_streaming, cursor, microbatch_max_interval)
.await;

// Record execution error, once per dataset
if result.is_err()
Expand All @@ -183,6 +186,7 @@ impl Service {
plan: DetachedLogicalPlan,
is_streaming: bool,
cursor: Option<Cursor>,
microbatch_max_interval: Option<u64>,
) -> Result<QueryResultStream, Error> {
let query_start_time = std::time::Instant::now();
let dataset_labels = catalog_dataset_labels(&catalog);
Expand Down Expand Up @@ -308,7 +312,9 @@ impl Service {
cursor,
&self.notification_multiplexer,
None,
self.config.server_microbatch_max_interval,
microbatch_max_interval
.unwrap_or(self.config.microbatch_max_interval)
.max(1),
self.config.keep_alive_interval,
None, // no job_id for query path
)
Expand Down
108 changes: 108 additions & 0 deletions crates/services/server/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use datafusion::{
/// `SETTINGS` key that enables streaming mode.
const STREAM_SETTING: &str = "stream";

/// `SETTINGS` key that overrides the microbatch max interval.
const MICROBATCH_MAX_INTERVAL_SETTING: &str = "microbatch_max_interval";

/// Check whether a parsed statement contains `SETTINGS stream = true`.
///
/// ```sql
Expand Down Expand Up @@ -37,6 +40,33 @@ pub fn is_streaming(stmt: &sql::parser::Statement) -> bool {
is_streaming
}

/// Extract `microbatch_max_interval` from `SETTINGS microbatch_max_interval = <u64>`.
///
/// ```sql
/// SELECT * FROM eth.logs SETTINGS stream = true, microbatch_max_interval = 5000
/// ```
pub fn microbatch_max_interval(stmt: &sql::parser::Statement) -> Option<u64> {
let sql::parser::Statement::Statement(box_stmt) = stmt else {
return None;
};
let ast::Statement::Query(query) = box_stmt.as_ref() else {
return None;
};
let settings = query.settings.as_ref()?;
let setting = settings.iter().find(|s| {
s.key
.value
.eq_ignore_ascii_case(MICROBATCH_MAX_INTERVAL_SETTING)
})?;
let Expr::Value(v) = &setting.value else {
return None;
};
match &v.value {
ast::Value::Number(n, _) => n.parse::<u64>().ok(),
_ => None,
}
}

#[cfg(test)]
mod tests {
use common::sql_str::SqlStr;
Expand Down Expand Up @@ -112,6 +142,84 @@ mod tests {
assert!(!result, "query without SETTINGS should not be streaming");
}

#[test]
fn microbatch_max_interval_with_valid_value_returns_some() {
//* Given
let stmt =
parse_stmt("SELECT * FROM test SETTINGS stream = true, microbatch_max_interval = 5000");

//* When
let result = microbatch_max_interval(&stmt);

//* Then
assert_eq!(
result,
Some(5000),
"should extract microbatch_max_interval from SETTINGS"
);
}

#[test]
fn microbatch_max_interval_with_uppercase_key_returns_value() {
//* Given
let stmt = parse_stmt("SELECT * FROM test SETTINGS MICROBATCH_MAX_INTERVAL = 200");

//* When
let result = microbatch_max_interval(&stmt);

//* Then
assert_eq!(
result,
Some(200),
"case-insensitive key should be recognized"
);
}

#[test]
fn microbatch_max_interval_without_setting_returns_none() {
//* Given
let stmt = parse_stmt("SELECT * FROM test SETTINGS stream = true");

//* When
let result = microbatch_max_interval(&stmt);

//* Then
assert_eq!(
result, None,
"missing microbatch_max_interval setting should return None"
);
}

#[test]
fn microbatch_max_interval_without_settings_clause_returns_none() {
//* Given
let stmt = parse_stmt("SELECT * FROM test");

//* When
let result = microbatch_max_interval(&stmt);

//* Then
assert_eq!(
result, None,
"query without SETTINGS clause should return None"
);
}

#[test]
fn microbatch_max_interval_with_boolean_value_returns_none() {
//* Given
let stmt = parse_stmt("SELECT * FROM test SETTINGS microbatch_max_interval = true");

//* When
let result = microbatch_max_interval(&stmt);

//* Then
assert_eq!(
result, None,
"boolean value should not be parsed as microbatch_max_interval"
);
}

fn parse_stmt(sql: &str) -> sql::parser::Statement {
let sql_str: SqlStr = sql.parse().expect("sql should be valid SqlStr");
common::sql::parse(&sql_str).expect("sql should parse successfully")
Expand Down
3 changes: 1 addition & 2 deletions docs/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ manifests_dir = "manifests"

# Operational timing
# poll_interval_secs = 1.0 # Polling interval for new blocks during dump in seconds (default: 1.0)
# microbatch_max_interval = 100000 # Max interval for derived dataset dump microbatches in blocks (default: 100000)
# server_microbatch_max_interval = 1000 # Max interval for streaming server microbatches in blocks (default: 1000)
# microbatch_max_interval = 100000 # Max interval for microbatches in blocks; also the streaming server default (default: 100000)
# keep_alive_interval = 30 # Keep-alive interval for streaming server in seconds (default: 30, minimum: 30)

# Service addresses
Expand Down
4 changes: 2 additions & 2 deletions docs/feat/app-ampd-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ ampd server

| Setting | Default | Description |
|---------|---------|-------------|
| `server_microbatch_max_interval` | - | Maximum blocks per streaming microbatch |
| `keep_alive_interval` | - | Seconds between keep-alive messages (min 30s) |

Clients can override `microbatch_max_interval` per query via the SQL `SETTINGS` clause (see [Streaming Queries](query-sql-streaming.md)). When not set, the server uses the configured `microbatch_max_interval` value.

### Config File

```toml
# .amp/config.toml
flight_addr = "0.0.0.0:1602"
jsonl_addr = "0.0.0.0:1603"
server_microbatch_max_interval = 100
keep_alive_interval = 60
```

Expand Down
17 changes: 13 additions & 4 deletions docs/feat/query-sql-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,19 @@ Clients receiving `is_reorg: true` should invalidate cached data from affected b

## Configuration

| Setting | Default | Description |
| -------------------------------- | ------- | ------------------------------------------------- |
| `server_microbatch_max_interval` | - | Maximum blocks per microbatch |
| `keep_alive_interval` | - | Seconds between keep-alive messages (minimum 30s) |
| Setting | Default | Description |
| -------------------- | ------- | ------------------------------------------------- |
| `keep_alive_interval`| - | Seconds between keep-alive messages (minimum 30s) |

### Per-Query Settings

The `microbatch_max_interval` can be set per query via the SQL `SETTINGS` clause:

```sql
SELECT * FROM eth.logs SETTINGS stream = true, microbatch_max_interval = 5000
```

When not specified, the server falls back to the configured `microbatch_max_interval` value.

### Keep-Alive

Expand Down
9 changes: 1 addition & 8 deletions docs/schemas/config/ampd.spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"minimum": 0
},
"microbatch_max_interval": {
"description": "Max interval for derived dataset dump microbatches in blocks (default: 100000)",
"description": "Max interval for microbatches in blocks (default: 100000).\nUsed for derived dataset dumps and as the default for streaming server queries\nwhen the client does not override via the SQL `SETTINGS` clause.",
"type": "integer",
"format": "uint64",
"default": 100000,
Expand Down Expand Up @@ -89,13 +89,6 @@
"default": 0,
"minimum": 0
},
"server_microbatch_max_interval": {
"description": "Max interval for streaming server microbatches in blocks (default: 1000)",
"type": "integer",
"format": "uint64",
"default": 1000,
"minimum": 0
},
"spill_location": {
"description": "Paths for DataFusion temporary files for spill-to-disk (default: [])",
"type": "array",
Expand Down
2 changes: 1 addition & 1 deletion tests/src/testlib/fixtures/daemon_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Drop for DaemonServer {
/// Convert config::Config to server::config::Config
fn server_config_from_common(config: &amp_config::Config) -> Config {
Config {
server_microbatch_max_interval: config.server_microbatch_max_interval,
microbatch_max_interval: config.microbatch_max_interval,
keep_alive_interval: config.keep_alive_interval,
max_mem_mb: config.max_mem_mb,
query_max_mem_mb: config.query_max_mem_mb,
Expand Down
Loading