From ec78ee5f5bfa102cf97d30e876a26ddaea1eb721 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Mon, 30 Mar 2026 10:45:09 +0200 Subject: [PATCH 1/4] feat: add atlas-server CLI commands and DB flow CI --- .github/workflows/ci.yml | 224 +++++++++- Justfile | 14 +- backend/Cargo.toml | 3 + backend/Dockerfile | 2 +- backend/crates/atlas-server/Cargo.toml | 1 + backend/crates/atlas-server/src/cli.rs | 409 ++++++++++++++++++ backend/crates/atlas-server/src/config.rs | 273 ++++++++++++ backend/crates/atlas-server/src/lib.rs | 1 + backend/crates/atlas-server/src/main.rs | 124 +++++- .../atlas-server/tests/integration/main.rs | 1 + .../atlas-server/tests/integration/schema.rs | 259 +++++++++++ 11 files changed, 1286 insertions(+), 25 deletions(-) create mode 100644 backend/crates/atlas-server/src/cli.rs create mode 100644 backend/crates/atlas-server/tests/integration/schema.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b5017e3..4af3bf3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,6 +73,228 @@ jobs: - name: Test run: cargo test --workspace --all-targets + backend-cli-db-flow: + name: Backend CLI DB Flow + runs-on: ubuntu-latest + services: + postgres: + image: postgres:16-alpine + env: + POSTGRES_DB: atlas + POSTGRES_USER: atlas + POSTGRES_PASSWORD: atlas + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U atlas -d atlas" + --health-interval 5s + --health-timeout 5s + --health-retries 5 + defaults: + run: + working-directory: backend + env: + PG_ADMIN_URL: postgres://atlas:atlas@127.0.0.1:5432/postgres + SOURCE_DB_URL: postgres://atlas:atlas@127.0.0.1:5432/atlas + RESTORE_DB_URL: postgres://atlas:atlas@127.0.0.1:5432/atlas_restore + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache Cargo + uses: Swatinem/rust-cache@v2 + with: + workspaces: backend + + - name: Install PostgreSQL client + run: sudo apt-get update && sudo apt-get install -y postgresql-client + + - name: Exercise migrate, dump, and restore commands + run: | + set -euo pipefail + + psql "$PG_ADMIN_URL" -c "DROP DATABASE IF EXISTS atlas_restore;" + psql "$PG_ADMIN_URL" -c "CREATE DATABASE atlas_restore;" + + cargo run --quiet --bin atlas-server -- migrate --atlas.db.url "$SOURCE_DB_URL" + + psql "$SOURCE_DB_URL" <<'SQL' + INSERT INTO blocks ( + number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at + ) VALUES ( + 424242, + '0x0000000000000000000000000000000000000000000000000000000000067932', + '0x0000000000000000000000000000000000000000000000000000000000067931', + 1700000000, + 21000, + 30000000, + 1, + NOW() + ) + ON CONFLICT (number) DO NOTHING; + + INSERT INTO transactions ( + hash, block_number, block_index, from_address, to_address, value, gas_price, gas_used, + input_data, status, timestamp + ) VALUES ( + '0x0000000000000000000000000000000000000000000000000000000000067933', + 424242, + 0, + '0x4242420000000000000000000000000000000001', + '0x4242420000000000000000000000000000000002', + 1000000000000000000, + 20000000000, + 21000, + '\x', + true, + 1700000000 + ) + ON CONFLICT (hash, block_number) DO NOTHING; + SQL + + DUMP_FILE="$RUNNER_TEMP/atlas-cli-db-flow.dump" + cargo run --quiet --bin atlas-server -- db dump "$DUMP_FILE" --atlas.db.url "$SOURCE_DB_URL" + cargo run --quiet --bin atlas-server -- db restore "$DUMP_FILE" --atlas.db.url "$RESTORE_DB_URL" + cargo run --quiet --bin atlas-server -- migrate --atlas.db.url "$RESTORE_DB_URL" + + test "$(psql "$RESTORE_DB_URL" -Atc "SELECT COUNT(*) FROM blocks WHERE number = 424242")" = "1" + test "$(psql "$RESTORE_DB_URL" -Atc "SELECT COUNT(*) FROM transactions WHERE hash = '0x0000000000000000000000000000000000000000000000000000000000067933'")" = "1" + + backend-cli-snapshot-compat: + name: Backend CLI Snapshot Compatibility + runs-on: ubuntu-latest + services: + postgres: + image: postgres:16-alpine + env: + POSTGRES_DB: atlas + POSTGRES_USER: atlas + POSTGRES_PASSWORD: atlas + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U atlas -d atlas" + --health-interval 5s + --health-timeout 5s + --health-retries 5 + defaults: + run: + working-directory: backend + env: + PG_ADMIN_URL: postgres://atlas:atlas@127.0.0.1:5432/postgres + SOURCE_DB_URL: postgres://atlas:atlas@127.0.0.1:5432/atlas_compat_source + RESTORE_DB_URL: postgres://atlas:atlas@127.0.0.1:5432/atlas_compat_restore + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + + - name: Cache Cargo + uses: Swatinem/rust-cache@v2 + with: + workspaces: backend + + - name: Install PostgreSQL client + run: sudo apt-get update && sudo apt-get install -y postgresql-client + + - name: Resolve compatibility baseline + id: baseline + run: | + if [ "${{ github.event_name }}" = "pull_request" ]; then + echo "sha=${{ github.event.pull_request.base.sha }}" >> "$GITHUB_OUTPUT" + else + echo "sha=${{ github.event.before }}" >> "$GITHUB_OUTPUT" + fi + + - name: Restore a snapshot created from the previous revision + id: compat + continue-on-error: true + run: | + set -euo pipefail + + BASE_SHA="${{ steps.baseline.outputs.sha }}" + if [ -z "$BASE_SHA" ] || [ "$BASE_SHA" = "0000000000000000000000000000000000000000" ]; then + echo "No prior revision is available for snapshot compatibility testing." + exit 0 + fi + + BASE_WORKTREE="$RUNNER_TEMP/atlas-base" + DUMP_FILE="$RUNNER_TEMP/atlas-compat.dump" + + git worktree add "$BASE_WORKTREE" "$BASE_SHA" + trap 'git worktree remove --force "$BASE_WORKTREE"' EXIT + + psql "$PG_ADMIN_URL" -c "DROP DATABASE IF EXISTS atlas_compat_source;" + psql "$PG_ADMIN_URL" -c "DROP DATABASE IF EXISTS atlas_compat_restore;" + psql "$PG_ADMIN_URL" -c "CREATE DATABASE atlas_compat_source;" + psql "$PG_ADMIN_URL" -c "CREATE DATABASE atlas_compat_restore;" + + HELPER_DIR="$RUNNER_TEMP/base-migrate-helper" + mkdir -p "$HELPER_DIR/src" + + cat > "$HELPER_DIR/Cargo.toml" < "$HELPER_DIR/src/main.rs" <<'EOF' + #[tokio::main] + async fn main() -> Result<(), Box> { + let database_url = std::env::args().nth(1).expect("database url argument"); + atlas_common::db::run_migrations(&database_url).await?; + Ok(()) + } + EOF + + cargo run --quiet --manifest-path "$HELPER_DIR/Cargo.toml" -- "$SOURCE_DB_URL" + + psql "$SOURCE_DB_URL" <<'SQL' + INSERT INTO blocks ( + number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at + ) VALUES ( + 515151, + '0x000000000000000000000000000000000000000000000000000000000007db4f', + '0x000000000000000000000000000000000000000000000000000000000007db4e', + 1700001000, + 21000, + 30000000, + 1, + NOW() + ) + ON CONFLICT (number) DO NOTHING; + SQL + + pg_dump --dbname="$SOURCE_DB_URL" --format=custom --file "$DUMP_FILE" + + cargo run --quiet --bin atlas-server -- db restore "$DUMP_FILE" --atlas.db.url "$RESTORE_DB_URL" + cargo run --quiet --bin atlas-server -- migrate --atlas.db.url "$RESTORE_DB_URL" + + test "$(psql "$RESTORE_DB_URL" -Atc "SELECT COUNT(*) FROM blocks WHERE number = 515151")" = "1" + + - name: Warn when snapshot compatibility fails + if: always() && steps.compat.outcome == 'failure' + run: | + echo "::warning::Snapshot compatibility check failed. Review the 'Backend CLI Snapshot Compatibility' job before release." + { + echo "## Snapshot compatibility warning" + echo + echo "The previous-revision dump/restore drill failed in CI." + echo "The workflow stayed green, but this should be reviewed before release." + } >> "$GITHUB_STEP_SUMMARY" + frontend: name: Frontend (Bun) runs-on: ubuntu-latest @@ -99,7 +321,7 @@ jobs: docker: name: Docker (GHCR) - needs: [backend-fmt, backend-clippy, backend-test, frontend] + needs: [backend-fmt, backend-clippy, backend-test, backend-cli-db-flow, frontend] if: github.event_name == 'push' && github.ref == 'refs/heads/main' uses: ./.github/workflows/docker-build-push.yml secrets: inherit diff --git a/Justfile b/Justfile index 645b075..fb60c45 100644 --- a/Justfile +++ b/Justfile @@ -49,9 +49,21 @@ backend-test: backend-build: cd backend && cargo build --workspace +# Build optimised release binary to build/atlas-server +[group('backend')] +build-release: + cd backend && cargo build --release --bin atlas-server + mkdir -p build + cp backend/target/release/atlas-server build/atlas-server + +# Install atlas-server to ~/.cargo/bin (available on PATH after cargo setup) +[group('backend')] +install: + cd backend && cargo install --path crates/atlas-server --locked + [group('backend')] backend-run: - cd backend && cargo run --bin atlas-server + cd backend && cargo run --bin atlas-server -- run [group('frontend')] frontend-install: diff --git a/backend/Cargo.toml b/backend/Cargo.toml index d6b3abb..7a2db69 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -58,5 +58,8 @@ chrono = { version = "0.4", features = ["serde"] } testcontainers = "0.27" testcontainers-modules = { version = "0.15", features = ["postgres"] } +# CLI +clap = { version = "4", features = ["derive", "env"] } + # Internal crates atlas-common = { path = "crates/atlas-common" } diff --git a/backend/Dockerfile b/backend/Dockerfile index 4d827f1..a912b69 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -19,7 +19,7 @@ RUN addgroup -S atlas && adduser -S atlas -G atlas USER atlas EXPOSE 3000 -CMD ["atlas-server"] +CMD ["atlas-server", "run"] # Backward-compatible target names for CI jobs that still build the old images. FROM server AS api diff --git a/backend/crates/atlas-server/Cargo.toml b/backend/crates/atlas-server/Cargo.toml index 0bb864a..08c8e82 100644 --- a/backend/crates/atlas-server/Cargo.toml +++ b/backend/crates/atlas-server/Cargo.toml @@ -9,6 +9,7 @@ path = "src/main.rs" [dependencies] atlas-common = { workspace = true } +clap = { workspace = true } tokio = { workspace = true } axum = { workspace = true } tower = { workspace = true } diff --git a/backend/crates/atlas-server/src/cli.rs b/backend/crates/atlas-server/src/cli.rs new file mode 100644 index 0000000..ad5455a --- /dev/null +++ b/backend/crates/atlas-server/src/cli.rs @@ -0,0 +1,409 @@ +use clap::{Args, Parser, Subcommand}; + +/// Atlas — EVM blockchain explorer +#[derive(Parser)] +#[command(name = "atlas-server", version, about = "EVM blockchain explorer — indexer + API + DA tracking")] +pub struct Cli { + #[command(subcommand)] + pub command: Command, +} + +#[derive(Subcommand)] +pub enum Command { + /// Start the indexer and API server + Run(Box), + /// Run database migrations and exit + Migrate(Box), + /// Validate configuration and test DB/RPC connectivity, then exit + Check(Box), + /// Database utilities + Db(DbCommand), +} + +/// Arguments for the `run` and `check` subcommands +#[derive(Args, Clone)] +pub struct RunArgs { + #[command(flatten)] + pub db: DatabaseArgs, + #[command(flatten)] + pub rpc: RpcArgs, + #[command(flatten)] + pub api: ApiArgs, + #[command(flatten)] + pub indexer: IndexerArgs, + #[command(flatten)] + pub chain: ChainArgs, + #[command(flatten)] + pub da: DaArgs, + #[command(flatten)] + pub faucet: FaucetArgs, + #[command(flatten)] + pub branding: BrandingArgs, + #[command(flatten)] + pub log: LogArgs, +} + +#[derive(Args, Clone)] +pub struct MigrateArgs { + #[command(flatten)] + pub db: DatabaseArgs, + #[command(flatten)] + pub log: LogArgs, +} + +// ── Sections ────────────────────────────────────────────────────────────────── + +#[derive(Args, Clone)] +#[command(next_help_heading = "Database")] +pub struct DatabaseArgs { + #[arg( + id = "db-url", + long = "atlas.db.url", + env = "DATABASE_URL", + value_name = "URL", + help = "PostgreSQL connection string" + )] + pub url: String, + + #[arg( + long = "atlas.db.max-connections", + env = "DB_MAX_CONNECTIONS", + default_value = "20", + value_name = "N", + help = "Max connections for the indexer pool" + )] + pub max_connections: u32, + + #[arg( + long = "atlas.db.api-max-connections", + env = "API_DB_MAX_CONNECTIONS", + default_value = "20", + value_name = "N", + help = "Max connections for the API pool" + )] + pub api_max_connections: u32, +} + +#[derive(Args, Clone)] +#[command(next_help_heading = "RPC")] +pub struct RpcArgs { + #[arg( + id = "rpc-url", + long = "atlas.rpc.url", + env = "RPC_URL", + value_name = "URL", + help = "Ethereum JSON-RPC endpoint" + )] + pub url: String, + + #[arg( + long = "atlas.rpc.requests-per-second", + env = "RPC_REQUESTS_PER_SECOND", + default_value = "100", + value_name = "N", + help = "Max RPC requests per second" + )] + pub requests_per_second: u32, + + #[arg( + id = "rpc-batch-size", + long = "atlas.rpc.batch-size", + env = "RPC_BATCH_SIZE", + default_value = "20", + value_name = "N", + help = "Number of blocks fetched per RPC batch call" + )] + pub batch_size: u32, +} + +#[derive(Args, Clone)] +#[command(next_help_heading = "API")] +pub struct ApiArgs { + #[arg( + long = "atlas.api.host", + env = "API_HOST", + default_value = "127.0.0.1", + value_name = "HOST", + help = "Host address for the API server" + )] + pub host: String, + + #[arg( + long = "atlas.api.port", + env = "API_PORT", + default_value = "3000", + value_name = "PORT", + help = "Port for the API server" + )] + pub port: u16, + + #[arg( + long = "atlas.api.cors-origin", + env = "CORS_ORIGIN", + value_name = "ORIGIN", + help = "Restrict CORS to this origin (unset = allow all)" + )] + pub cors_origin: Option, + + #[arg( + long = "atlas.api.sse-replay-buffer-blocks", + env = "SSE_REPLAY_BUFFER_BLOCKS", + default_value = "4096", + value_name = "N", + help = "Number of recent blocks kept in the SSE replay buffer [1–100000]" + )] + pub sse_replay_buffer_blocks: usize, +} + +#[derive(Args, Clone)] +#[command(next_help_heading = "Indexer")] +pub struct IndexerArgs { + #[arg( + long = "atlas.indexer.start-block", + env = "START_BLOCK", + default_value = "0", + value_name = "N", + help = "Block number to start indexing from" + )] + pub start_block: u64, + + #[arg( + id = "indexer-batch-size", + long = "atlas.indexer.batch-size", + env = "BATCH_SIZE", + default_value = "100", + value_name = "N", + help = "Number of blocks written per DB batch" + )] + pub batch_size: u64, + + #[arg( + long = "atlas.indexer.fetch-workers", + env = "FETCH_WORKERS", + default_value = "10", + value_name = "N", + help = "Number of concurrent block-fetch workers" + )] + pub fetch_workers: u32, + + #[arg( + long = "atlas.indexer.reindex", + env = "REINDEX", + default_value_t = false, + help = "Wipe indexed data and re-index from start-block" + )] + pub reindex: bool, + + #[arg( + long = "atlas.indexer.ipfs-gateway", + env = "IPFS_GATEWAY", + default_value = "https://ipfs.io/ipfs/", + value_name = "URL", + help = "IPFS gateway used for token metadata fetching" + )] + pub ipfs_gateway: String, + + #[arg( + long = "atlas.indexer.metadata-fetch-workers", + env = "METADATA_FETCH_WORKERS", + default_value = "4", + value_name = "N", + help = "Number of concurrent metadata-fetch workers" + )] + pub metadata_fetch_workers: u32, + + #[arg( + long = "atlas.indexer.metadata-retry-attempts", + env = "METADATA_RETRY_ATTEMPTS", + default_value = "3", + value_name = "N", + help = "Max retry attempts for metadata fetches" + )] + pub metadata_retry_attempts: u32, +} + +#[derive(Args, Clone)] +#[command(next_help_heading = "Chain")] +pub struct ChainArgs { + #[arg( + long = "atlas.chain.name", + env = "CHAIN_NAME", + default_value = "Unknown", + value_name = "NAME", + help = "Human-readable chain name shown in the UI" + )] + pub name: String, + + #[arg( + long = "atlas.chain.logo-url", + env = "CHAIN_LOGO_URL", + value_name = "URL", + help = "URL to the chain logo image" + )] + pub logo_url: Option, +} + +#[derive(Args, Clone)] +#[command(next_help_heading = "DA Tracking")] +pub struct DaArgs { + #[arg( + id = "da-enabled", + long = "atlas.da.enabled", + env = "ENABLE_DA_TRACKING", + default_value_t = false, + help = "Enable Celestia DA inclusion tracking" + )] + pub enabled: bool, + + #[arg( + long = "atlas.da.evnode-url", + env = "EVNODE_URL", + value_name = "URL", + help = "ev-node gRPC endpoint (required when DA tracking is enabled)" + )] + pub evnode_url: Option, + + #[arg( + long = "atlas.da.worker-concurrency", + env = "DA_WORKER_CONCURRENCY", + default_value = "50", + value_name = "N", + help = "Number of concurrent DA worker tasks" + )] + pub worker_concurrency: u32, + + #[arg( + long = "atlas.da.rpc-requests-per-second", + env = "DA_RPC_REQUESTS_PER_SECOND", + default_value = "50", + value_name = "N", + help = "Max DA RPC requests per second" + )] + pub rpc_requests_per_second: u32, +} + +#[derive(Args, Clone)] +#[command(next_help_heading = "Faucet")] +pub struct FaucetArgs { + #[arg( + id = "faucet-enabled", + long = "atlas.faucet.enabled", + env = "FAUCET_ENABLED", + default_value_t = false, + help = "Enable the token faucet endpoint" + )] + pub enabled: bool, + + #[arg( + long = "atlas.faucet.amount", + env = "FAUCET_AMOUNT", + value_name = "ETH", + help = "Amount of ETH dispensed per faucet request (e.g. 0.1). FAUCET_PRIVATE_KEY must be set via env" + )] + pub amount: Option, + + #[arg( + long = "atlas.faucet.cooldown-minutes", + env = "FAUCET_COOLDOWN_MINUTES", + value_name = "MINS", + help = "Cooldown period in minutes between faucet requests per address" + )] + pub cooldown_minutes: Option, + // FAUCET_PRIVATE_KEY is intentionally env-only (security: never pass secrets as CLI flags) +} + +#[derive(Args, Clone)] +#[command(next_help_heading = "Branding")] +pub struct BrandingArgs { + #[arg( + long = "atlas.branding.accent-color", + env = "ACCENT_COLOR", + value_name = "HEX", + help = "UI accent color (e.g. #3b82f6)" + )] + pub accent_color: Option, + + #[arg( + long = "atlas.branding.background-dark", + env = "BACKGROUND_COLOR_DARK", + value_name = "HEX", + help = "Dark mode background color" + )] + pub background_dark: Option, + + #[arg( + long = "atlas.branding.background-light", + env = "BACKGROUND_COLOR_LIGHT", + value_name = "HEX", + help = "Light mode background color" + )] + pub background_light: Option, + + #[arg( + long = "atlas.branding.success-color", + env = "SUCCESS_COLOR", + value_name = "HEX", + help = "Success state color" + )] + pub success_color: Option, + + #[arg( + long = "atlas.branding.error-color", + env = "ERROR_COLOR", + value_name = "HEX", + help = "Error state color" + )] + pub error_color: Option, +} + +#[derive(Args, Clone)] +#[command(next_help_heading = "Logging")] +pub struct LogArgs { + #[arg( + long = "atlas.log.level", + env = "RUST_LOG", + default_value = "atlas_server=info,tower_http=debug,sqlx=warn", + value_name = "FILTER", + help = "Log filter directive (e.g. info, atlas_server=debug)" + )] + pub level: String, +} + +// ── db subcommand ───────────────────────────────────────────────────────────── + +#[derive(Args)] +pub struct DbCommand { + #[command(subcommand)] + pub command: DbSubcommand, +} + +#[derive(Subcommand)] +pub enum DbSubcommand { + /// Dump the database to a file using pg_dump + Dump { + /// Destination file path (use - for stdout) + #[arg(value_name = "OUTPUT")] + output: String, + + #[arg(long = "atlas.db.url", env = "DATABASE_URL", value_name = "URL")] + db_url: String, + }, + /// Restore the database from a pg_dump file + Restore { + /// Source file path (use - for stdin) + #[arg(value_name = "INPUT")] + input: String, + + #[arg(long = "atlas.db.url", env = "DATABASE_URL", value_name = "URL")] + db_url: String, + }, + /// Drop all indexed data, keeping schema and migrations intact (requires --confirm) + Reset { + /// Required to confirm the destructive operation + #[arg(long)] + confirm: bool, + + #[arg(long = "atlas.db.url", env = "DATABASE_URL", value_name = "URL")] + db_url: String, + }, +} diff --git a/backend/crates/atlas-server/src/config.rs b/backend/crates/atlas-server/src/config.rs index 2591c1a..eaec3f1 100644 --- a/backend/crates/atlas-server/src/config.rs +++ b/backend/crates/atlas-server/src/config.rs @@ -251,6 +251,125 @@ impl FaucetConfig { } } +// ── CLI → Config conversion ─────────────────────────────────────────────────── + +impl Config { + pub fn from_run_args(args: crate::cli::RunArgs) -> anyhow::Result { + let sse_replay_buffer_blocks = args.api.sse_replay_buffer_blocks; + if sse_replay_buffer_blocks == 0 || sse_replay_buffer_blocks > 100_000 { + bail!("--atlas.api.sse-replay-buffer-blocks must be between 1 and 100000"); + } + + let da_tracking_enabled = args.da.enabled; + + if da_tracking_enabled && args.da.worker_concurrency == 0 { + bail!("--atlas.da.worker-concurrency must be greater than 0"); + } + if da_tracking_enabled && args.da.rpc_requests_per_second == 0 { + bail!("--atlas.da.rpc-requests-per-second must be greater than 0"); + } + + let evnode_url = if da_tracking_enabled { + let url = args + .da + .evnode_url + .map(|s: String| s.trim().to_string()) + .filter(|s: &String| !s.is_empty()); + Some(url.ok_or_else(|| { + anyhow::anyhow!( + "--atlas.da.evnode-url (or EVNODE_URL) must be set when DA tracking is enabled" + ) + })?) + } else { + None + }; + + let chain_name = args.chain.name.trim().to_string(); + let chain_name = if chain_name.is_empty() { + "Unknown".to_string() + } else { + chain_name + }; + + Ok(Self { + database_url: args.db.url, + rpc_url: args.rpc.url, + indexer_db_max_connections: args.db.max_connections, + api_db_max_connections: args.db.api_max_connections, + rpc_requests_per_second: args.rpc.requests_per_second, + start_block: args.indexer.start_block, + batch_size: args.indexer.batch_size, + reindex: args.indexer.reindex, + ipfs_gateway: args.indexer.ipfs_gateway, + metadata_fetch_workers: args.indexer.metadata_fetch_workers, + metadata_retry_attempts: args.indexer.metadata_retry_attempts, + fetch_workers: args.indexer.fetch_workers, + rpc_batch_size: args.rpc.batch_size, + da_tracking_enabled, + evnode_url, + da_worker_concurrency: args.da.worker_concurrency, + da_rpc_requests_per_second: args.da.rpc_requests_per_second, + api_host: args.api.host, + api_port: args.api.port, + cors_origin: parse_optional_env(args.api.cors_origin), + sse_replay_buffer_blocks, + chain_name, + chain_logo_url: parse_optional_env(args.chain.logo_url), + accent_color: parse_optional_env(args.branding.accent_color), + background_color_dark: parse_optional_env(args.branding.background_dark), + background_color_light: parse_optional_env(args.branding.background_light), + success_color: parse_optional_env(args.branding.success_color), + error_color: parse_optional_env(args.branding.error_color), + }) + } +} + +impl FaucetConfig { + pub fn from_faucet_args(args: &crate::cli::FaucetArgs) -> anyhow::Result { + if !args.enabled { + return Ok(Self { + enabled: false, + private_key: None, + amount_wei: None, + cooldown_minutes: None, + }); + } + + let private_key = env::var("FAUCET_PRIVATE_KEY") + .context("FAUCET_PRIVATE_KEY env var must be set when faucet is enabled")?; + PrivateKeySigner::from_str(&private_key).context("Invalid FAUCET_PRIVATE_KEY")?; + + let amount_str = args.amount.as_deref().ok_or_else(|| { + anyhow::anyhow!( + "--atlas.faucet.amount (or FAUCET_AMOUNT) must be set when faucet is enabled" + ) + })?; + let amount_wei = parse_faucet_amount_to_wei(amount_str)?; + if amount_wei == U256::ZERO { + bail!("faucet amount must be greater than 0"); + } + + let cooldown_minutes = args.cooldown_minutes.ok_or_else(|| { + anyhow::anyhow!( + "--atlas.faucet.cooldown-minutes (or FAUCET_COOLDOWN_MINUTES) must be set when faucet is enabled" + ) + })?; + if cooldown_minutes == 0 { + bail!("faucet cooldown must be greater than 0"); + } + if cooldown_minutes.checked_mul(60).is_none() { + bail!("faucet cooldown is too large"); + } + + Ok(Self { + enabled: true, + private_key: Some(private_key), + amount_wei: Some(amount_wei), + cooldown_minutes: Some(cooldown_minutes), + }) + } +} + fn parse_optional_env(val: Option) -> Option { val.map(|s| s.trim().to_string()).filter(|s| !s.is_empty()) } @@ -298,6 +417,160 @@ fn parse_faucet_amount_to_wei(amount: &str) -> Result { Ok(whole_wei * wei_per_eth + fractional_wei) } +#[cfg(test)] +mod tests_from_run_args { + use super::*; + use crate::cli; + + fn minimal_run_args() -> cli::RunArgs { + cli::RunArgs { + db: cli::DatabaseArgs { + url: "postgres://test@localhost/test".to_string(), + max_connections: 20, + api_max_connections: 20, + }, + rpc: cli::RpcArgs { + url: "http://localhost:8545".to_string(), + requests_per_second: 100, + batch_size: 20, + }, + api: cli::ApiArgs { + host: "127.0.0.1".to_string(), + port: 3000, + cors_origin: None, + sse_replay_buffer_blocks: 4096, + }, + indexer: cli::IndexerArgs { + start_block: 0, + batch_size: 100, + fetch_workers: 10, + reindex: false, + ipfs_gateway: "https://ipfs.io/ipfs/".to_string(), + metadata_fetch_workers: 4, + metadata_retry_attempts: 3, + }, + chain: cli::ChainArgs { + name: "TestChain".to_string(), + logo_url: None, + }, + da: cli::DaArgs { + enabled: false, + evnode_url: None, + worker_concurrency: 50, + rpc_requests_per_second: 50, + }, + faucet: cli::FaucetArgs { + enabled: false, + amount: None, + cooldown_minutes: None, + }, + branding: cli::BrandingArgs { + accent_color: None, + background_dark: None, + background_light: None, + success_color: None, + error_color: None, + }, + log: cli::LogArgs { + level: "info".to_string(), + }, + } + } + + #[test] + fn minimal_args_produce_valid_config() { + let config = Config::from_run_args(minimal_run_args()).unwrap(); + assert_eq!(config.database_url, "postgres://test@localhost/test"); + assert_eq!(config.rpc_url, "http://localhost:8545"); + assert_eq!(config.chain_name, "TestChain"); + assert!(!config.da_tracking_enabled); + } + + #[test] + fn chain_name_trimmed_and_defaults_to_unknown_when_blank() { + let mut args = minimal_run_args(); + args.chain.name = " ".to_string(); + assert_eq!(Config::from_run_args(args).unwrap().chain_name, "Unknown"); + } + + #[test] + fn chain_name_surrounding_whitespace_is_trimmed() { + let mut args = minimal_run_args(); + args.chain.name = " MyChain ".to_string(); + assert_eq!(Config::from_run_args(args).unwrap().chain_name, "MyChain"); + } + + #[test] + fn sse_replay_buffer_zero_is_rejected() { + let mut args = minimal_run_args(); + args.api.sse_replay_buffer_blocks = 0; + assert!(Config::from_run_args(args) + .unwrap_err() + .to_string() + .contains("must be between 1 and 100000")); + } + + #[test] + fn sse_replay_buffer_above_max_is_rejected() { + let mut args = minimal_run_args(); + args.api.sse_replay_buffer_blocks = 100_001; + assert!(Config::from_run_args(args) + .unwrap_err() + .to_string() + .contains("must be between 1 and 100000")); + } + + #[test] + fn da_tracking_requires_evnode_url() { + let mut args = minimal_run_args(); + args.da.enabled = true; + args.da.evnode_url = None; + assert!(Config::from_run_args(args) + .unwrap_err() + .to_string() + .contains("evnode-url")); + } + + #[test] + fn da_tracking_rejects_blank_evnode_url() { + let mut args = minimal_run_args(); + args.da.enabled = true; + args.da.evnode_url = Some(" ".to_string()); + assert!(Config::from_run_args(args) + .unwrap_err() + .to_string() + .contains("evnode-url")); + } + + #[test] + fn da_tracking_disabled_does_not_require_evnode_url() { + let mut args = minimal_run_args(); + args.da.enabled = false; + args.da.evnode_url = None; + let config = Config::from_run_args(args).unwrap(); + assert!(config.evnode_url.is_none()); + } + + #[test] + fn da_worker_concurrency_zero_is_rejected_when_da_enabled() { + let mut args = minimal_run_args(); + args.da.enabled = true; + args.da.evnode_url = Some("http://localhost:7331".to_string()); + args.da.worker_concurrency = 0; + assert!(Config::from_run_args(args).is_err()); + } + + #[test] + fn branding_blank_strings_become_none() { + let mut args = minimal_run_args(); + args.branding.accent_color = Some(" ".to_string()); + args.branding.success_color = Some("#00ff00".to_string()); + let config = Config::from_run_args(args).unwrap(); + assert!(config.accent_color.is_none()); + assert_eq!(config.success_color.as_deref(), Some("#00ff00")); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/backend/crates/atlas-server/src/lib.rs b/backend/crates/atlas-server/src/lib.rs index 1a70e0a..f9da928 100644 --- a/backend/crates/atlas-server/src/lib.rs +++ b/backend/crates/atlas-server/src/lib.rs @@ -1,4 +1,5 @@ pub mod api; +pub mod cli; pub mod config; pub mod faucet; pub mod head; diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index 2d9cc76..71a786a 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use clap::Parser; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; @@ -8,6 +9,7 @@ use alloy::providers::ProviderBuilder; use alloy::signers::local::PrivateKeySigner; mod api; +mod cli; mod config; mod faucet; mod head; @@ -17,6 +19,13 @@ mod indexer; const RETRY_DELAYS: &[u64] = &[5, 10, 20, 30, 60]; const MAX_RETRY_DELAY: u64 = 60; +fn init_tracing(filter: &str) { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new(filter)) + .with(tracing_subscriber::fmt::layer()) + .init(); +} + fn parse_chain_id(hex: &str) -> Option { u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok() } @@ -45,21 +54,35 @@ async fn fetch_chain_id(rpc_url: &str) -> Result { #[tokio::main] async fn main() -> Result<()> { - // Initialize tracing - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "atlas_server=info,tower_http=debug,sqlx=warn".into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); + // Load .env before clap so env vars are available for clap's `env = "..."` fallback + dotenvy::dotenv().ok(); + let cli = cli::Cli::parse(); + + match cli.command { + cli::Command::Run(args) => run(*args).await, + cli::Command::Migrate(args) => { + init_tracing(&args.log.level); + tracing::info!("Running database migrations"); + atlas_common::db::run_migrations(&args.db.url).await?; + tracing::info!("Migrations complete"); + Ok(()) + } + cli::Command::Check(args) => check(*args).await, + cli::Command::Db(db_cmd) => match db_cmd.command { + cli::DbSubcommand::Dump { output, db_url } => cmd_db_dump(&db_url, &output), + cli::DbSubcommand::Restore { input, db_url } => cmd_db_restore(&db_url, &input), + cli::DbSubcommand::Reset { confirm, db_url } => cmd_db_reset(&db_url, confirm).await, + }, + } +} + +async fn run(args: cli::RunArgs) -> Result<()> { + init_tracing(&args.log.level); tracing::info!("Starting Atlas Server"); - // Load configuration - dotenvy::dotenv().ok(); - let config = config::Config::from_env()?; - let faucet_config = config::FaucetConfig::from_env()?; + let config = config::Config::from_run_args(args.clone())?; + let faucet_config = config::FaucetConfig::from_faucet_args(&args.faucet)?; let faucet = if faucet_config.enabled { tracing::info!("Faucet enabled"); @@ -71,7 +94,7 @@ async fn main() -> Result<()> { let rpc_url: reqwest::Url = config .rpc_url .parse() - .map_err(|e| anyhow::anyhow!("Invalid RPC_URL for faucet: {e}"))?; + .map_err(|e| anyhow::anyhow!("Invalid RPC URL for faucet: {e}"))?; let provider = ProviderBuilder::new().wallet(signer).connect_http(rpc_url); Some(Arc::new(faucet::FaucetService::new( provider, @@ -88,18 +111,15 @@ async fn main() -> Result<()> { let chain_id = fetch_chain_id(&config.rpc_url).await?; tracing::info!("Chain ID: {}", chain_id); - // Run migrations once (dedicated pool, no statement_timeout) tracing::info!("Running database migrations"); atlas_common::db::run_migrations(&config.database_url).await?; - // Create separate DB pools for indexer and API let indexer_pool = atlas_common::db::create_pool(&config.database_url, config.indexer_db_max_connections) .await?; let api_pool = atlas_common::db::create_pool(&config.database_url, config.api_db_max_connections).await?; - // Shared broadcast channels for SSE notifications let (block_events_tx, _) = broadcast::channel(1024); let (da_events_tx, _) = broadcast::channel::>(256); let head_tracker = Arc::new(if config.reindex { @@ -108,7 +128,6 @@ async fn main() -> Result<()> { head::HeadTracker::bootstrap(&api_pool, config.sse_replay_buffer_blocks).await? }); - // Build AppState for API let state = Arc::new(api::AppState { pool: api_pool, block_events_tx: block_events_tx.clone(), @@ -127,7 +146,6 @@ async fn main() -> Result<()> { error_color: config.error_color.clone(), }); - // Spawn indexer task with retry logic let da_pool = indexer_pool.clone(); let indexer = indexer::Indexer::new( indexer_pool.clone(), @@ -141,7 +159,6 @@ async fn main() -> Result<()> { } }); - // Spawn DA worker when DA tracking is explicitly enabled. if config.da_tracking_enabled { let evnode_url = config .evnode_url @@ -166,7 +183,6 @@ async fn main() -> Result<()> { }); } - // Spawn metadata fetcher in background let metadata_pool = indexer_pool; let metadata_config = config.clone(); tokio::spawn(async move { @@ -181,7 +197,6 @@ async fn main() -> Result<()> { } }); - // Build and serve API let app = api::build_router(state, config.cors_origin.clone()); let addr = format!("{}:{}", config.api_host, config.api_port); tracing::info!("API listening on {}", addr); @@ -194,6 +209,72 @@ async fn main() -> Result<()> { Ok(()) } +async fn check(args: cli::RunArgs) -> Result<()> { + init_tracing(&args.log.level); + + let config = config::Config::from_run_args(args.clone())?; + config::FaucetConfig::from_faucet_args(&args.faucet)?; + + // Test DB connectivity + tracing::info!("Testing database connectivity..."); + let pool = atlas_common::db::create_pool(&config.database_url, 1).await?; + sqlx::query("SELECT 1").execute(&pool).await?; + tracing::info!("Database OK"); + + // Test RPC connectivity + tracing::info!("Testing RPC connectivity..."); + let chain_id = fetch_chain_id(&config.rpc_url).await?; + tracing::info!("RPC OK — chain_id={}", chain_id); + + tracing::info!("Configuration is valid"); + Ok(()) +} + +fn cmd_db_dump(db_url: &str, output: &str) -> Result<()> { + let status = std::process::Command::new("pg_dump") + .args(["--dbname", db_url, "--format=custom", "--file", output]) + .status() + .map_err(|e| anyhow::anyhow!("Failed to run pg_dump (is it installed?): {e}"))?; + + if !status.success() { + anyhow::bail!("pg_dump exited with status {status}"); + } + eprintln!("Dump written to {output}"); + Ok(()) +} + +fn cmd_db_restore(db_url: &str, input: &str) -> Result<()> { + let status = std::process::Command::new("pg_restore") + .args(["--dbname", db_url, "--format=custom", "--clean", "--if-exists", input]) + .status() + .map_err(|e| anyhow::anyhow!("Failed to run pg_restore (is it installed?): {e}"))?; + + if !status.success() { + anyhow::bail!("pg_restore exited with status {status}"); + } + eprintln!("Restore complete from {input}"); + Ok(()) +} + +async fn cmd_db_reset(db_url: &str, confirm: bool) -> Result<()> { + if !confirm { + eprintln!("This will DELETE all indexed data. Pass --confirm to proceed."); + std::process::exit(1); + } + + let pool = atlas_common::db::create_pool(db_url, 1).await?; + sqlx::query( + "TRUNCATE blocks, transactions, event_logs, addresses, nft_contracts, nft_tokens, + nft_transfers, indexer_state, erc20_contracts, erc20_transfers, erc20_balances, + event_signatures, address_labels, proxy_contracts, contract_abis, failed_blocks, + tx_hash_lookup, block_da_status CASCADE", + ) + .execute(&pool) + .await?; + eprintln!("All indexed data has been reset."); + Ok(()) +} + async fn shutdown_signal() { #[cfg(unix)] { @@ -241,7 +322,6 @@ where } } -/// Run an async function with exponential backoff retry async fn run_with_retry(f: F) -> Result<()> where F: Fn() -> Fut, diff --git a/backend/crates/atlas-server/tests/integration/main.rs b/backend/crates/atlas-server/tests/integration/main.rs index cb27ff0..92e9a0d 100644 --- a/backend/crates/atlas-server/tests/integration/main.rs +++ b/backend/crates/atlas-server/tests/integration/main.rs @@ -3,6 +3,7 @@ mod common; mod addresses; mod blocks; mod nfts; +mod schema; mod search; mod status; mod tokens; diff --git a/backend/crates/atlas-server/tests/integration/schema.rs b/backend/crates/atlas-server/tests/integration/schema.rs new file mode 100644 index 0000000..306c960 --- /dev/null +++ b/backend/crates/atlas-server/tests/integration/schema.rs @@ -0,0 +1,259 @@ +use crate::common; + +// ── Migration health ────────────────────────────────────────────────────────── + +#[test] +fn migrations_are_idempotent() { + // sqlx tracks applied migrations in _sqlx_migrations; re-running is a no-op. + // This verifies the tracking table is intact and no migration errors on repeat. + common::run(async { + sqlx::migrate!("../../migrations") + .run(common::pool()) + .await + .expect("migrations should be idempotent"); + }); +} + +// ── Table presence ──────────────────────────────────────────────────────────── + +#[test] +fn all_expected_tables_exist() { + common::run(async { + let tables: Vec = sqlx::query_scalar( + "SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename", + ) + .fetch_all(common::pool()) + .await + .expect("query tables"); + + for expected in [ + "addresses", + "address_labels", + "block_da_status", + "blocks", + "contract_abis", + "erc20_balances", + "erc20_contracts", + "erc20_transfers", + "event_logs", + "event_signatures", + "failed_blocks", + "indexer_state", + "nft_contracts", + "nft_tokens", + "nft_transfers", + "proxy_contracts", + "transactions", + "tx_hash_lookup", + ] { + assert!( + tables.contains(&expected.to_string()), + "missing table: {expected}" + ); + } + }); +} + +#[test] +fn partitioned_tables_have_initial_partition() { + common::run(async { + // Each partitioned table must have its _p0 partition created by the migration. + let partitions: Vec = sqlx::query_scalar( + "SELECT relname::text FROM pg_class + WHERE relname LIKE '%_p0' AND relkind = 'r' + ORDER BY relname", + ) + .fetch_all(common::pool()) + .await + .expect("query partitions"); + + for expected in [ + "blocks_p0", + "erc20_transfers_p0", + "event_logs_p0", + "nft_transfers_p0", + "transactions_p0", + ] { + assert!( + partitions.contains(&expected.to_string()), + "missing partition: {expected}" + ); + } + }); +} + +// ── Index presence ──────────────────────────────────────────────────────────── + +#[test] +fn key_indexes_exist() { + common::run(async { + // Include both regular indexes (relkind='i') and partitioned indexes (relkind='I'). + let indexes: Vec = sqlx::query_scalar( + "SELECT c.relname::text FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'public' AND c.relkind IN ('i', 'I') + ORDER BY c.relname", + ) + .fetch_all(common::pool()) + .await + .expect("query indexes"); + + for expected in [ + // blocks + "idx_blocks_hash", + "idx_blocks_timestamp", + // transactions + "idx_transactions_block", + "idx_transactions_from", + "idx_transactions_to", + // event_logs + "idx_event_logs_address", + "idx_event_logs_topic0", + // addresses + "idx_addresses_contract", + // erc20 + "idx_erc20_balances_contract", + // tx hash lookup (powers O(1) search) + "tx_hash_lookup_pkey", + // da status (powers pending-DA queries) + "idx_block_da_status_pending", + ] { + assert!( + indexes.contains(&expected.to_string()), + "missing index: {expected}" + ); + } + }); +} + +#[test] +fn pg_trgm_extension_is_installed() { + // Required for fuzzy search indexes on token names / symbols. + common::run(async { + let exists: bool = sqlx::query_scalar( + "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm')", + ) + .fetch_one(common::pool()) + .await + .expect("query extension"); + + assert!(exists, "pg_trgm extension must be installed"); + }); +} + +// ── Constraint enforcement ──────────────────────────────────────────────────── + +#[test] +fn duplicate_block_number_is_rejected() { + common::run(async { + let pool = common::pool(); + + // Use a block number unlikely to collide with other test seeds. + let block_number: i64 = 9_000_001; + + sqlx::query( + "INSERT INTO blocks (number, hash, parent_hash, timestamp, + gas_used, gas_limit, transaction_count) + VALUES ($1, $2, $3, 0, 0, 0, 0) + ON CONFLICT DO NOTHING", + ) + .bind(block_number) + .bind(format!("0x{:064x}", block_number)) + .bind(format!("0x{:064x}", block_number - 1)) + .execute(pool) + .await + .expect("insert block"); + + // Second insert with same number must fail (PK violation). + let result = sqlx::query( + "INSERT INTO blocks (number, hash, parent_hash, timestamp, + gas_used, gas_limit, transaction_count) + VALUES ($1, $2, $3, 0, 0, 0, 0)", + ) + .bind(block_number) + .bind(format!("0x{:064x}", block_number + 9999)) + .bind(format!("0x{:064x}", block_number - 1)) + .execute(pool) + .await; + + assert!(result.is_err(), "duplicate block number should be rejected"); + }); +} + +#[test] +fn duplicate_transaction_is_rejected() { + common::run(async { + let pool = common::pool(); + + let block_number: i64 = 9_000_002; + let tx_hash = format!("0x{:064x}", block_number); + + // Ensure the parent block exists. + sqlx::query( + "INSERT INTO blocks (number, hash, parent_hash, timestamp, + gas_used, gas_limit, transaction_count) + VALUES ($1, $2, $3, 0, 0, 0, 1) + ON CONFLICT DO NOTHING", + ) + .bind(block_number) + .bind(format!("0x{:064x}", block_number + 1)) + .bind(format!("0x{:064x}", block_number - 1)) + .execute(pool) + .await + .expect("insert parent block"); + + sqlx::query( + "INSERT INTO transactions + (hash, block_number, block_index, from_address, value, + gas_price, gas_used, input_data, status, timestamp) + VALUES ($1, $2, 0, '0xdead', 0, 1, 21000, '\\x', true, 0)", + ) + .bind(&tx_hash) + .bind(block_number) + .execute(pool) + .await + .expect("insert tx"); + + let result = sqlx::query( + "INSERT INTO transactions + (hash, block_number, block_index, from_address, value, + gas_price, gas_used, input_data, status, timestamp) + VALUES ($1, $2, 0, '0xdead', 0, 1, 21000, '\\x', true, 0)", + ) + .bind(&tx_hash) + .bind(block_number) + .execute(pool) + .await; + + assert!(result.is_err(), "duplicate transaction should be rejected"); + }); +} + +#[test] +fn duplicate_erc20_transfer_is_rejected() { + common::run(async { + let pool = common::pool(); + + let block_number: i64 = 9_000_003; + let tx_hash = format!("0x{:064x}", block_number + 10000); + + let insert = || { + sqlx::query( + "INSERT INTO erc20_transfers + (tx_hash, log_index, block_number, contract_address, + from_address, to_address, value, timestamp) + VALUES ($1, 0, $2, '0xtoken', '0xfrom', '0xto', '100', 0)", + ) + .bind(&tx_hash) + .bind(block_number) + }; + + insert().execute(pool).await.expect("insert erc20 transfer"); + let result = insert().execute(pool).await; + + assert!( + result.is_err(), + "duplicate erc20 transfer should be rejected by unique constraint" + ); + }); +} From 1c4da5ad9ab175e07142a27c04f129738aa8d66b Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Mon, 30 Mar 2026 11:45:51 +0200 Subject: [PATCH 2/4] fix: secure atlas-server database CLI handling --- .github/workflows/ci.yml | 12 ++-- backend/crates/atlas-server/src/cli.rs | 24 ++++--- backend/crates/atlas-server/src/config.rs | 11 ++- backend/crates/atlas-server/src/main.rs | 83 +++++++++++++++++++++-- 4 files changed, 105 insertions(+), 25 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4af3bf3..07a31a6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -119,7 +119,7 @@ jobs: psql "$PG_ADMIN_URL" -c "DROP DATABASE IF EXISTS atlas_restore;" psql "$PG_ADMIN_URL" -c "CREATE DATABASE atlas_restore;" - cargo run --quiet --bin atlas-server -- migrate --atlas.db.url "$SOURCE_DB_URL" + DATABASE_URL="$SOURCE_DB_URL" cargo run --quiet --bin atlas-server -- migrate psql "$SOURCE_DB_URL" <<'SQL' INSERT INTO blocks ( @@ -156,9 +156,9 @@ jobs: SQL DUMP_FILE="$RUNNER_TEMP/atlas-cli-db-flow.dump" - cargo run --quiet --bin atlas-server -- db dump "$DUMP_FILE" --atlas.db.url "$SOURCE_DB_URL" - cargo run --quiet --bin atlas-server -- db restore "$DUMP_FILE" --atlas.db.url "$RESTORE_DB_URL" - cargo run --quiet --bin atlas-server -- migrate --atlas.db.url "$RESTORE_DB_URL" + DATABASE_URL="$SOURCE_DB_URL" cargo run --quiet --bin atlas-server -- db dump "$DUMP_FILE" + DATABASE_URL="$RESTORE_DB_URL" cargo run --quiet --bin atlas-server -- db restore "$DUMP_FILE" + DATABASE_URL="$RESTORE_DB_URL" cargo run --quiet --bin atlas-server -- migrate test "$(psql "$RESTORE_DB_URL" -Atc "SELECT COUNT(*) FROM blocks WHERE number = 424242")" = "1" test "$(psql "$RESTORE_DB_URL" -Atc "SELECT COUNT(*) FROM transactions WHERE hash = '0x0000000000000000000000000000000000000000000000000000000000067933'")" = "1" @@ -279,8 +279,8 @@ jobs: pg_dump --dbname="$SOURCE_DB_URL" --format=custom --file "$DUMP_FILE" - cargo run --quiet --bin atlas-server -- db restore "$DUMP_FILE" --atlas.db.url "$RESTORE_DB_URL" - cargo run --quiet --bin atlas-server -- migrate --atlas.db.url "$RESTORE_DB_URL" + DATABASE_URL="$RESTORE_DB_URL" cargo run --quiet --bin atlas-server -- db restore "$DUMP_FILE" + DATABASE_URL="$RESTORE_DB_URL" cargo run --quiet --bin atlas-server -- migrate test "$(psql "$RESTORE_DB_URL" -Atc "SELECT COUNT(*) FROM blocks WHERE number = 515151")" = "1" diff --git a/backend/crates/atlas-server/src/cli.rs b/backend/crates/atlas-server/src/cli.rs index ad5455a..24bdf52 100644 --- a/backend/crates/atlas-server/src/cli.rs +++ b/backend/crates/atlas-server/src/cli.rs @@ -1,8 +1,16 @@ use clap::{Args, Parser, Subcommand}; +fn database_url_from_env() -> String { + std::env::var("DATABASE_URL").unwrap_or_default() +} + /// Atlas — EVM blockchain explorer #[derive(Parser)] -#[command(name = "atlas-server", version, about = "EVM blockchain explorer — indexer + API + DA tracking")] +#[command( + name = "atlas-server", + version, + about = "EVM blockchain explorer — indexer + API + DA tracking" +)] pub struct Cli { #[command(subcommand)] pub command: Command, @@ -56,13 +64,7 @@ pub struct MigrateArgs { #[derive(Args, Clone)] #[command(next_help_heading = "Database")] pub struct DatabaseArgs { - #[arg( - id = "db-url", - long = "atlas.db.url", - env = "DATABASE_URL", - value_name = "URL", - help = "PostgreSQL connection string" - )] + #[arg(skip = database_url_from_env())] pub url: String, #[arg( @@ -385,7 +387,7 @@ pub enum DbSubcommand { #[arg(value_name = "OUTPUT")] output: String, - #[arg(long = "atlas.db.url", env = "DATABASE_URL", value_name = "URL")] + #[arg(skip = database_url_from_env())] db_url: String, }, /// Restore the database from a pg_dump file @@ -394,7 +396,7 @@ pub enum DbSubcommand { #[arg(value_name = "INPUT")] input: String, - #[arg(long = "atlas.db.url", env = "DATABASE_URL", value_name = "URL")] + #[arg(skip = database_url_from_env())] db_url: String, }, /// Drop all indexed data, keeping schema and migrations intact (requires --confirm) @@ -403,7 +405,7 @@ pub enum DbSubcommand { #[arg(long)] confirm: bool, - #[arg(long = "atlas.db.url", env = "DATABASE_URL", value_name = "URL")] + #[arg(skip = database_url_from_env())] db_url: String, }, } diff --git a/backend/crates/atlas-server/src/config.rs b/backend/crates/atlas-server/src/config.rs index eaec3f1..1114654 100644 --- a/backend/crates/atlas-server/src/config.rs +++ b/backend/crates/atlas-server/src/config.rs @@ -3,7 +3,9 @@ use alloy::signers::local::PrivateKeySigner; use anyhow::{bail, Context, Result}; use std::{env, str::FromStr}; +#[cfg(test)] const DEFAULT_DA_WORKER_CONCURRENCY: u32 = 50; +#[cfg(test)] const DEFAULT_DA_RPC_REQUESTS_PER_SECOND: u32 = 50; #[derive(Debug, Clone)] @@ -75,6 +77,7 @@ impl std::fmt::Debug for FaucetConfig { } } +#[cfg(test)] impl Config { pub fn from_env() -> Result { let sse_replay_buffer_blocks: usize = env::var("SSE_REPLAY_BUFFER_BLOCKS") @@ -204,6 +207,7 @@ impl Config { } } +#[cfg(test)] impl FaucetConfig { pub fn from_env() -> Result { let enabled = env::var("FAUCET_ENABLED") @@ -255,6 +259,11 @@ impl FaucetConfig { impl Config { pub fn from_run_args(args: crate::cli::RunArgs) -> anyhow::Result { + let database_url = args.db.url.trim().to_string(); + if database_url.is_empty() { + bail!("DATABASE_URL must be set"); + } + let sse_replay_buffer_blocks = args.api.sse_replay_buffer_blocks; if sse_replay_buffer_blocks == 0 || sse_replay_buffer_blocks > 100_000 { bail!("--atlas.api.sse-replay-buffer-blocks must be between 1 and 100000"); @@ -292,7 +301,7 @@ impl Config { }; Ok(Self { - database_url: args.db.url, + database_url, rpc_url: args.rpc.url, indexer_db_max_connections: args.db.max_connections, api_db_max_connections: args.db.api_max_connections, diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index 71a786a..31563fa 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{bail, Context, Result}; use clap::Parser; use std::sync::Arc; use std::time::Duration; @@ -26,6 +26,74 @@ fn init_tracing(filter: &str) { .init(); } +fn required_db_url(db_url: &str) -> Result<&str> { + let db_url = db_url.trim(); + if db_url.is_empty() { + bail!("DATABASE_URL must be set"); + } + Ok(db_url) +} + +fn postgres_command(program: &str, db_url: &str) -> Result { + let url = reqwest::Url::parse(required_db_url(db_url)?).context("Invalid DATABASE_URL")?; + match url.scheme() { + "postgres" | "postgresql" => {} + _ => bail!("DATABASE_URL must use postgres:// or postgresql://"), + } + + let database_name = url.path().trim_start_matches('/'); + if database_name.is_empty() { + bail!("DATABASE_URL must include a database name"); + } + + let mut command = std::process::Command::new(program); + if let Some(host) = url.host_str() { + command.env("PGHOST", host); + } + if let Some(port) = url.port() { + command.env("PGPORT", port.to_string()); + } + if !url.username().is_empty() { + command.env("PGUSER", url.username()); + } + if let Some(password) = url.password() { + command.env("PGPASSWORD", password); + } + command.env("PGDATABASE", database_name); + + for (key, value) in url.query_pairs() { + match key.as_ref() { + "sslmode" => { + command.env("PGSSLMODE", value.as_ref()); + } + "sslcert" => { + command.env("PGSSLCERT", value.as_ref()); + } + "sslkey" => { + command.env("PGSSLKEY", value.as_ref()); + } + "sslrootcert" => { + command.env("PGSSLROOTCERT", value.as_ref()); + } + "sslcrl" => { + command.env("PGSSLCRL", value.as_ref()); + } + "application_name" => { + command.env("PGAPPNAME", value.as_ref()); + } + "options" => { + command.env("PGOPTIONS", value.as_ref()); + } + "connect_timeout" => { + command.env("PGCONNECT_TIMEOUT", value.as_ref()); + } + _ => {} + } + } + + Ok(command) +} + fn parse_chain_id(hex: &str) -> Option { u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok() } @@ -64,7 +132,8 @@ async fn main() -> Result<()> { cli::Command::Migrate(args) => { init_tracing(&args.log.level); tracing::info!("Running database migrations"); - atlas_common::db::run_migrations(&args.db.url).await?; + let database_url = required_db_url(&args.db.url)?; + atlas_common::db::run_migrations(database_url).await?; tracing::info!("Migrations complete"); Ok(()) } @@ -231,8 +300,8 @@ async fn check(args: cli::RunArgs) -> Result<()> { } fn cmd_db_dump(db_url: &str, output: &str) -> Result<()> { - let status = std::process::Command::new("pg_dump") - .args(["--dbname", db_url, "--format=custom", "--file", output]) + let status = postgres_command("pg_dump", db_url)? + .args(["--format=custom", "--file", output]) .status() .map_err(|e| anyhow::anyhow!("Failed to run pg_dump (is it installed?): {e}"))?; @@ -244,8 +313,8 @@ fn cmd_db_dump(db_url: &str, output: &str) -> Result<()> { } fn cmd_db_restore(db_url: &str, input: &str) -> Result<()> { - let status = std::process::Command::new("pg_restore") - .args(["--dbname", db_url, "--format=custom", "--clean", "--if-exists", input]) + let status = postgres_command("pg_restore", db_url)? + .args(["--format=custom", "--clean", "--if-exists", input]) .status() .map_err(|e| anyhow::anyhow!("Failed to run pg_restore (is it installed?): {e}"))?; @@ -262,7 +331,7 @@ async fn cmd_db_reset(db_url: &str, confirm: bool) -> Result<()> { std::process::exit(1); } - let pool = atlas_common::db::create_pool(db_url, 1).await?; + let pool = atlas_common::db::create_pool(required_db_url(db_url)?, 1).await?; sqlx::query( "TRUNCATE blocks, transactions, event_logs, addresses, nft_contracts, nft_tokens, nft_transfers, indexer_state, erc20_contracts, erc20_transfers, erc20_balances, From cfe7347b68c9557b4191198f05f8df7700ff6e9e Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Mon, 30 Mar 2026 14:46:14 +0200 Subject: [PATCH 3/4] fix: isolate pg dump restore environment --- backend/crates/atlas-server/src/main.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index 31563fa..dd970e6 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -47,6 +47,23 @@ fn postgres_command(program: &str, db_url: &str) -> Result Date: Mon, 30 Mar 2026 15:12:50 +0200 Subject: [PATCH 4/4] fix: support libpq query params in db tools --- backend/crates/atlas-server/src/main.rs | 178 ++++++++++++++++++------ 1 file changed, 138 insertions(+), 40 deletions(-) diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index dd970e6..14d932c 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -34,81 +34,136 @@ fn required_db_url(db_url: &str) -> Result<&str> { Ok(db_url) } -fn postgres_command(program: &str, db_url: &str) -> Result { +struct PostgresConnectionConfig { + database_name: String, + env_vars: Vec<(&'static str, String)>, +} + +fn set_pg_env(env_vars: &mut Vec<(&'static str, String)>, key: &'static str, value: &str) { + if value.is_empty() { + return; + } + if let Some((_, existing)) = env_vars + .iter_mut() + .find(|(existing_key, _)| *existing_key == key) + { + *existing = value.to_string(); + } else { + env_vars.push((key, value.to_string())); + } +} + +fn postgres_connection_config(db_url: &str) -> Result { let url = reqwest::Url::parse(required_db_url(db_url)?).context("Invalid DATABASE_URL")?; match url.scheme() { "postgres" | "postgresql" => {} _ => bail!("DATABASE_URL must use postgres:// or postgresql://"), } - let database_name = url.path().trim_start_matches('/'); - if database_name.is_empty() { - bail!("DATABASE_URL must include a database name"); - } - - let mut command = std::process::Command::new(program); - for env_var in [ - "PGHOST", - "PGPORT", - "PGUSER", - "PGPASSWORD", - "PGDATABASE", - "PGSSLMODE", - "PGSSLCERT", - "PGSSLKEY", - "PGSSLROOTCERT", - "PGSSLCRL", - "PGAPPNAME", - "PGOPTIONS", - "PGCONNECT_TIMEOUT", - ] { - command.env_remove(env_var); - } + let mut database_name = url.path().trim_start_matches('/').to_string(); + let mut env_vars = Vec::new(); if let Some(host) = url.host_str() { - command.env("PGHOST", host); + set_pg_env(&mut env_vars, "PGHOST", host); } if let Some(port) = url.port() { - command.env("PGPORT", port.to_string()); + set_pg_env(&mut env_vars, "PGPORT", &port.to_string()); } if !url.username().is_empty() { - command.env("PGUSER", url.username()); + set_pg_env(&mut env_vars, "PGUSER", url.username()); } if let Some(password) = url.password() { - command.env("PGPASSWORD", password); + set_pg_env(&mut env_vars, "PGPASSWORD", password); } - command.env("PGDATABASE", database_name); for (key, value) in url.query_pairs() { match key.as_ref() { + "dbname" => { + if !value.is_empty() { + database_name = value.into_owned(); + } + } + "host" => { + set_pg_env(&mut env_vars, "PGHOST", value.as_ref()); + } + "hostaddr" => { + set_pg_env(&mut env_vars, "PGHOSTADDR", value.as_ref()); + } + "port" => { + set_pg_env(&mut env_vars, "PGPORT", value.as_ref()); + } + "user" => { + set_pg_env(&mut env_vars, "PGUSER", value.as_ref()); + } + "password" => { + set_pg_env(&mut env_vars, "PGPASSWORD", value.as_ref()); + } + "service" => { + set_pg_env(&mut env_vars, "PGSERVICE", value.as_ref()); + } "sslmode" => { - command.env("PGSSLMODE", value.as_ref()); + set_pg_env(&mut env_vars, "PGSSLMODE", value.as_ref()); } "sslcert" => { - command.env("PGSSLCERT", value.as_ref()); + set_pg_env(&mut env_vars, "PGSSLCERT", value.as_ref()); } "sslkey" => { - command.env("PGSSLKEY", value.as_ref()); + set_pg_env(&mut env_vars, "PGSSLKEY", value.as_ref()); } "sslrootcert" => { - command.env("PGSSLROOTCERT", value.as_ref()); + set_pg_env(&mut env_vars, "PGSSLROOTCERT", value.as_ref()); } "sslcrl" => { - command.env("PGSSLCRL", value.as_ref()); + set_pg_env(&mut env_vars, "PGSSLCRL", value.as_ref()); } "application_name" => { - command.env("PGAPPNAME", value.as_ref()); + set_pg_env(&mut env_vars, "PGAPPNAME", value.as_ref()); } "options" => { - command.env("PGOPTIONS", value.as_ref()); + set_pg_env(&mut env_vars, "PGOPTIONS", value.as_ref()); } "connect_timeout" => { - command.env("PGCONNECT_TIMEOUT", value.as_ref()); + set_pg_env(&mut env_vars, "PGCONNECT_TIMEOUT", value.as_ref()); } _ => {} } } - Ok(command) + if database_name.is_empty() { + bail!("DATABASE_URL must include a database name"); + } + set_pg_env(&mut env_vars, "PGDATABASE", &database_name); + + Ok(PostgresConnectionConfig { + database_name, + env_vars, + }) +} + +fn postgres_command(program: &str, config: &PostgresConnectionConfig) -> std::process::Command { + let mut command = std::process::Command::new(program); + for env_var in [ + "PGHOST", + "PGHOSTADDR", + "PGPORT", + "PGUSER", + "PGPASSWORD", + "PGDATABASE", + "PGSERVICE", + "PGSSLMODE", + "PGSSLCERT", + "PGSSLKEY", + "PGSSLROOTCERT", + "PGSSLCRL", + "PGAPPNAME", + "PGOPTIONS", + "PGCONNECT_TIMEOUT", + ] { + command.env_remove(env_var); + } + for (key, value) in &config.env_vars { + command.env(key, value); + } + command } fn parse_chain_id(hex: &str) -> Option { @@ -317,7 +372,8 @@ async fn check(args: cli::RunArgs) -> Result<()> { } fn cmd_db_dump(db_url: &str, output: &str) -> Result<()> { - let status = postgres_command("pg_dump", db_url)? + let config = postgres_connection_config(db_url)?; + let status = postgres_command("pg_dump", &config) .args(["--format=custom", "--file", output]) .status() .map_err(|e| anyhow::anyhow!("Failed to run pg_dump (is it installed?): {e}"))?; @@ -330,7 +386,10 @@ fn cmd_db_dump(db_url: &str, output: &str) -> Result<()> { } fn cmd_db_restore(db_url: &str, input: &str) -> Result<()> { - let status = postgres_command("pg_restore", db_url)? + let config = postgres_connection_config(db_url)?; + let status = postgres_command("pg_restore", &config) + .arg("--dbname") + .arg(&config.database_name) .args(["--format=custom", "--clean", "--if-exists", input]) .status() .map_err(|e| anyhow::anyhow!("Failed to run pg_restore (is it installed?): {e}"))?; @@ -449,6 +508,14 @@ mod tests { sync::oneshot, }; + fn env_value<'a>(config: &'a PostgresConnectionConfig, key: &str) -> Option<&'a str> { + config + .env_vars + .iter() + .find(|(env_key, _)| *env_key == key) + .map(|(_, value)| value.as_str()) + } + async fn serve_json_once(body: &'static str) -> String { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); @@ -527,4 +594,35 @@ mod tests { let url = format!("http://{}", addr); assert!(fetch_chain_id(&url).await.is_err()); } + + #[test] + fn postgres_connection_config_accepts_dbname_query_when_path_is_empty() { + let config = postgres_connection_config( + "postgres://user:secret@localhost?dbname=atlas&host=db.internal&port=6543&service=atlas-ci", + ) + .unwrap(); + + assert_eq!(config.database_name, "atlas"); + assert_eq!(env_value(&config, "PGHOST"), Some("db.internal")); + assert_eq!(env_value(&config, "PGPORT"), Some("6543")); + assert_eq!(env_value(&config, "PGUSER"), Some("user")); + assert_eq!(env_value(&config, "PGPASSWORD"), Some("secret")); + assert_eq!(env_value(&config, "PGSERVICE"), Some("atlas-ci")); + assert_eq!(env_value(&config, "PGDATABASE"), Some("atlas")); + } + + #[test] + fn postgres_connection_config_query_params_override_url_components() { + let config = postgres_connection_config( + "postgres://user:secret@localhost/base_db?dbname=query_db&host=query-host&hostaddr=127.0.0.1&user=query-user&password=query-pass", + ) + .unwrap(); + + assert_eq!(config.database_name, "query_db"); + assert_eq!(env_value(&config, "PGHOST"), Some("query-host")); + assert_eq!(env_value(&config, "PGHOSTADDR"), Some("127.0.0.1")); + assert_eq!(env_value(&config, "PGUSER"), Some("query-user")); + assert_eq!(env_value(&config, "PGPASSWORD"), Some("query-pass")); + assert_eq!(env_value(&config, "PGDATABASE"), Some("query_db")); + } }