diff --git a/.env.example b/.env.example index d554b30..646764e 100644 --- a/.env.example +++ b/.env.example @@ -55,3 +55,44 @@ ENABLE_DA_TRACKING=false # FAUCET_PRIVATE_KEY=0x... # FAUCET_AMOUNT=0.01 # FAUCET_COOLDOWN_MINUTES=30 + +# Optional: archive indexed block bundles to an S3-compatible object store. +# +# When enabled, atlas-server writes a compressed copy of every indexed block +# (block header + transaction receipts) to an S3 bucket as it indexes. +# Objects are stored under: {S3_PREFIX}/v1/blocks/{bucket_start}/{block_number}.json.zst +# A manifest file at: {S3_PREFIX}/v1/manifest.json tracks the latest +# contiguous uploaded block so consumers know how far the archive has progressed. +# +# Works with AWS S3 and any S3-compatible store (MinIO, Cloudflare R2, etc.). +# Credentials are read from AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY below. +# This only adds the write path; a restore command will be added later. +# +# S3_ENABLED=false +# +# -- Where to store files -- +# S3_BUCKET=atlas-archive # Name of the bucket (must exist before starting) +# S3_REGION=us-east-1 # AWS region, or any non-empty string for non-AWS stores +# S3_PREFIX= # Optional path prefix inside the bucket, e.g. "mainnet". +# # Useful when sharing one bucket across multiple chains. +# # Leave empty to write directly at the bucket root. +# S3_ENDPOINT= # Override the S3 server URL. Leave empty for AWS S3. +# # Set to your MinIO / R2 / etc. URL, e.g. http://minio:9000 +# S3_FORCE_PATH_STYLE=false # Set to true for MinIO and most self-hosted stores. +# # AWS S3 uses subdomain-style (false); MinIO requires path-style (true). +# +# -- Performance & reliability -- +# S3_UPLOAD_CONCURRENCY=4 # Number of blocks uploaded in parallel. Higher = faster +# # catch-up after downtime, at the cost of more memory/network. +# S3_RETRY_BASE_SECONDS=30 # Base delay before retrying a failed upload. Uses exponential +# # backoff: 30s → 60s → 120s … capped at 1 hour. +# +# -- AWS credentials (also used for MinIO) -- +# AWS_ACCESS_KEY_ID= # Access key ID (IAM user key for AWS, root user for MinIO) +# AWS_SECRET_ACCESS_KEY= # Secret access key +# AWS_SESSION_TOKEN= # Only needed for temporary IAM credentials (e.g. assumed roles) + +# Optional MinIO admin credentials — only used by docker-compose to initialise the +# MinIO container itself (create buckets, etc.). Not read by atlas-server. +# MINIO_ROOT_USER=minioadmin +# MINIO_ROOT_PASSWORD=minioadmin diff --git a/CLAUDE.md b/CLAUDE.md index cebbaf7..d559684 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -91,6 +91,16 @@ pub struct AppState { ### DA tracking (optional) When `ENABLE_DA_TRACKING=true`, a background DA worker queries ev-node for Celestia inclusion heights per block. `EVNODE_URL` is required only in that mode. Updates are pushed to SSE clients via an in-process `broadcast::Sender>`. The SSE handler streams `da_batch` events for incremental updates and emits `da_resync` when a client falls behind and should refetch visible DA state. +### S3 archive (optional) +When `S3_ENABLED=true`, a background `ArchiveUploader` task uploads every indexed block to an S3-compatible object store as a zstd-compressed JSON bundle (block header + receipts). The write path uses a transactional outbox pattern: +1. The indexer writes archive entries to `archive_blocks` (with compressed payload) inside the same DB transaction as the block data. +2. The uploader claims rows via `SELECT … FOR UPDATE SKIP LOCKED`, uploads to S3, then clears the payload column. +3. `archive_state` tracks the latest *contiguous* uploaded block and triggers a manifest refresh at `{prefix}/v1/manifest.json` so consumers know how far the archive has progressed. + +Object key layout: `{prefix}/v1/blocks/{bucket_start:012}/{block_number:012}.json.zst` where `bucket_start = (block_number / 10_000) * 10_000`. + +Works with AWS S3 and any S3-compatible store (MinIO, Cloudflare R2, etc.). The URL is constructed automatically from `S3_BUCKET` + `S3_REGION` + optional `S3_ENDPOINT` — there is no single URL config. Set `S3_FORCE_PATH_STYLE=true` for MinIO and other self-hosted stores; leave it `false` for AWS. + ### Frontend API client - Base URL: `/api` (proxied by nginx to `atlas-server:3000`) - Fast polling endpoint: `GET /api/height` → `{ block_height, indexed_at, features: { da_tracking } }` — serves from `head_tracker` first and falls back to `indexer_state` when the in-memory head is empty. Used by the navbar as a polling fallback when SSE is disconnected and by feature-flag consumers. @@ -127,6 +137,16 @@ Key vars (see `.env.example` for full list): | `EVNODE_URL` | server | none | | `DA_RPC_REQUESTS_PER_SECOND` | DA worker | `50` | | `DA_WORKER_CONCURRENCY` | DA worker | `50` | +| `S3_ENABLED` | archive | `false` | +| `S3_BUCKET` | archive | required if enabled | +| `S3_REGION` | archive | required if enabled | +| `S3_PREFIX` | archive | `""` (bucket root) | +| `S3_ENDPOINT` | archive | none (AWS S3) | +| `S3_FORCE_PATH_STYLE` | archive | `false` | +| `S3_UPLOAD_CONCURRENCY` | archive | `4` | +| `S3_RETRY_BASE_SECONDS` | archive | `30` | +| `AWS_ACCESS_KEY_ID` | archive | env / IAM role | +| `AWS_SECRET_ACCESS_KEY` | archive | env / IAM role | ## Running Locally diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 7a2db69..2e75eff 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -32,10 +32,13 @@ serde_json = "1.0" # HTTP client reqwest = { version = "0.13", features = ["json", "rustls"], default-features = false } +aws-config = "1" +aws-sdk-s3 = "1" # Error handling thiserror = "2.0" anyhow = "1.0" +async-trait = "0.1" # Logging tracing = "0.1" @@ -53,6 +56,7 @@ async-stream = "0.3" bigdecimal = { version = "0.4", features = ["serde"] } hex = "0.4" chrono = { version = "0.4", features = ["serde"] } +zstd = "0.13" # Testing testcontainers = "0.27" diff --git a/backend/crates/atlas-server/Cargo.toml b/backend/crates/atlas-server/Cargo.toml index 08c8e82..d3bac21 100644 --- a/backend/crates/atlas-server/Cargo.toml +++ b/backend/crates/atlas-server/Cargo.toml @@ -19,14 +19,18 @@ alloy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } reqwest = { workspace = true } +aws-config = { workspace = true } +aws-sdk-s3 = { workspace = true } thiserror = { workspace = true } anyhow = { workspace = true } +async-trait = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } dotenvy = { workspace = true } bigdecimal = { workspace = true } hex = { workspace = true } chrono = { workspace = true } +zstd = { workspace = true } tokio-stream = { workspace = true } futures = { workspace = true } async-stream = { workspace = true } diff --git a/backend/crates/atlas-server/src/archive/mod.rs b/backend/crates/atlas-server/src/archive/mod.rs new file mode 100644 index 0000000..0c614fb --- /dev/null +++ b/backend/crates/atlas-server/src/archive/mod.rs @@ -0,0 +1,628 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use aws_config::{BehaviorVersion, Region}; +use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::Client as S3Client; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, PgPool}; +use tokio_postgres::{types::ToSql, Transaction}; + +use crate::config::ArchiveConfig; +use crate::indexer::fetcher::FetchedBlock; + +pub const ARCHIVE_SCHEMA_VERSION: i16 = 1; +const CLAIM_VISIBILITY_TIMEOUT_SECS: i64 = 300; +const IDLE_SLEEP: Duration = Duration::from_millis(500); +const MAX_RETRY_DELAY_SECS: u64 = 3600; +const ARCHIVE_STREAM_BLOCKS: &str = "blocks"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ArchiveBundleV1 { + pub schema_version: i16, + pub chain_id: u64, + pub block_number: u64, + pub block: alloy::rpc::types::Block, + pub receipts: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ArchiveManifestV1 { + pub schema_version: i16, + pub chain_id: u64, + pub archive_start_block: i64, + pub latest_contiguous_uploaded_block: Option, + pub updated_at: String, +} + +#[derive(Debug, Clone)] +pub struct ArchiveEntry { + pub block_number: i64, + pub object_key: String, + pub payload: Vec, + pub schema_version: i16, +} + +#[derive(Debug, Clone, FromRow)] +struct PendingArchiveRow { + block_number: i64, + object_key: String, + payload: Option>, + retry_count: i32, +} + +#[derive(Debug, Clone, FromRow)] +struct ArchiveStateRow { + archive_start_block: i64, + latest_contiguous_uploaded_block: Option, + schema_version: i16, + manifest_dirty: bool, +} + +#[async_trait] +pub trait ArchiveObjectStore: Send + Sync { + async fn ensure_bucket_access(&self) -> Result<()>; + async fn put_archive_object(&self, key: &str, payload: Vec) -> Result>; + async fn put_manifest_object(&self, key: &str, payload: Vec) -> Result>; +} + +#[derive(Clone)] +pub struct S3ArchiveStore { + client: S3Client, + bucket: String, +} + +impl S3ArchiveStore { + pub fn new(client: S3Client, bucket: impl Into) -> Self { + Self { + client, + bucket: bucket.into(), + } + } + + pub async fn from_config(config: &ArchiveConfig) -> Result { + let shared_config = aws_config::defaults(BehaviorVersion::latest()) + .region(Region::new(config.region.clone())) + .load() + .await; + let mut builder = aws_sdk_s3::config::Builder::from(&shared_config) + .force_path_style(config.force_path_style); + if let Some(endpoint) = &config.endpoint { + builder = builder.endpoint_url(endpoint); + } + + Ok(Self { + client: S3Client::from_conf(builder.build()), + bucket: config.bucket.clone(), + }) + } +} + +#[async_trait] +impl ArchiveObjectStore for S3ArchiveStore { + async fn ensure_bucket_access(&self) -> Result<()> { + self.client + .head_bucket() + .bucket(&self.bucket) + .send() + .await + .context("failed to access archive bucket")?; + Ok(()) + } + + async fn put_archive_object(&self, key: &str, payload: Vec) -> Result> { + let response = self + .client + .put_object() + .bucket(&self.bucket) + .key(key) + .content_type("application/zstd") + .body(ByteStream::from(payload)) + .send() + .await + .with_context(|| format!("failed to upload archive object {key}"))?; + Ok(response.e_tag().map(ToOwned::to_owned)) + } + + async fn put_manifest_object(&self, key: &str, payload: Vec) -> Result> { + let response = self + .client + .put_object() + .bucket(&self.bucket) + .key(key) + .content_type("application/json") + .body(ByteStream::from(payload)) + .send() + .await + .with_context(|| format!("failed to upload archive manifest {key}"))?; + Ok(response.e_tag().map(ToOwned::to_owned)) + } +} + +#[derive(Clone)] +pub struct ArchiveUploader { + pool: PgPool, + store: Arc, + config: ArchiveConfig, + chain_id: u64, +} + +impl ArchiveUploader { + pub fn new( + pool: PgPool, + store: Arc, + config: ArchiveConfig, + chain_id: u64, + ) -> Self { + Self { + pool, + store, + config, + chain_id, + } + } + + pub async fn run(&self) -> Result<()> { + let mut join_set = tokio::task::JoinSet::new(); + let workers = self.config.upload_concurrency.max(1); + + for _ in 0..workers { + let uploader = self.clone(); + join_set.spawn(async move { uploader.worker_loop().await }); + } + + while let Some(result) = join_set.join_next().await { + match result { + Ok(Ok(())) => {} + Ok(Err(err)) => return Err(err), + Err(err) => return Err(anyhow!("archive worker task failed: {err}")), + } + } + + Ok(()) + } + + async fn worker_loop(&self) -> Result<()> { + loop { + let mut did_work = false; + + if let Some(row) = self.claim_pending_row().await? { + did_work = true; + self.process_row(row).await?; + } + + if self.try_upload_manifest().await? { + did_work = true; + } + + if !did_work { + tokio::time::sleep(IDLE_SLEEP).await; + } + } + } + + async fn claim_pending_row(&self) -> Result> { + let mut tx = self.pool.begin().await?; + let row = sqlx::query_as::<_, PendingArchiveRow>( + "WITH next AS ( + SELECT block_number + FROM archive_blocks + WHERE uploaded_at IS NULL + AND payload IS NOT NULL + AND next_attempt_at <= NOW() + ORDER BY next_attempt_at ASC, block_number ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE archive_blocks AS blocks + SET next_attempt_at = NOW() + make_interval(secs => $1), + updated_at = NOW() + FROM next + WHERE blocks.block_number = next.block_number + RETURNING blocks.block_number, blocks.object_key, blocks.payload, blocks.retry_count", + ) + .bind(CLAIM_VISIBILITY_TIMEOUT_SECS) + .fetch_optional(&mut *tx) + .await?; + tx.commit().await?; + Ok(row) + } + + async fn process_row(&self, row: PendingArchiveRow) -> Result<()> { + let payload = row + .payload + .ok_or_else(|| anyhow!("archive row {} is missing payload", row.block_number))?; + + match self + .store + .put_archive_object(&row.object_key, payload) + .await + { + Ok(etag) => { + let mut tx = self.pool.begin().await?; + sqlx::query( + "UPDATE archive_blocks + SET uploaded_at = NOW(), + etag = $2, + payload = NULL, + last_error = NULL, + updated_at = NOW() + WHERE block_number = $1", + ) + .bind(row.block_number) + .bind(etag) + .execute(&mut *tx) + .await?; + + if self.advance_contiguous_head(&mut tx).await? { + sqlx::query( + "UPDATE archive_state + SET manifest_dirty = TRUE, + updated_at = NOW() + WHERE stream = $1", + ) + .bind(ARCHIVE_STREAM_BLOCKS) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + Ok(()) + } + Err(err) => { + let next_retry_count = row.retry_count + 1; + let delay = retry_delay_seconds(self.config.retry_base_seconds, row.retry_count); + if delay >= MAX_RETRY_DELAY_SECS { + tracing::warn!( + block_number = row.block_number, + retry_count = next_retry_count, + "archive upload retry delay has hit the 1-hour cap; block is stuck" + ); + } + let next_attempt_at = Utc::now() + chrono::Duration::seconds(delay as i64); + sqlx::query( + "UPDATE archive_blocks + SET retry_count = retry_count + 1, + last_error = $2, + next_attempt_at = $3, + updated_at = NOW() + WHERE block_number = $1", + ) + .bind(row.block_number) + .bind(err.to_string()) + .bind(next_attempt_at) + .execute(&self.pool) + .await?; + Ok(()) + } + } + } + + async fn advance_contiguous_head( + &self, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ) -> Result { + let state = sqlx::query_as::<_, ArchiveStateRow>( + "SELECT archive_start_block, latest_contiguous_uploaded_block, schema_version, manifest_dirty + FROM archive_state + WHERE stream = $1 + FOR UPDATE", + ) + .bind(ARCHIVE_STREAM_BLOCKS) + .fetch_optional(&mut **tx) + .await?; + + let Some(state) = state else { + return Ok(false); + }; + + let mut expected = state + .latest_contiguous_uploaded_block + .map(|n| n + 1) + .unwrap_or(state.archive_start_block); + let mut latest = state.latest_contiguous_uploaded_block; + + loop { + let rows: Vec<(i64,)> = sqlx::query_as( + "SELECT block_number + FROM archive_blocks + WHERE uploaded_at IS NOT NULL + AND block_number >= $1 + ORDER BY block_number ASC + LIMIT 512", + ) + .bind(expected) + .fetch_all(&mut **tx) + .await?; + + if rows.is_empty() { + break; + } + + let mut advanced = false; + for (block_number,) in rows { + if block_number != expected { + break; + } + latest = Some(block_number); + expected += 1; + advanced = true; + } + + if !advanced { + break; + } + } + + if latest != state.latest_contiguous_uploaded_block { + sqlx::query( + "UPDATE archive_state + SET latest_contiguous_uploaded_block = $2, + updated_at = NOW() + WHERE stream = $1", + ) + .bind(ARCHIVE_STREAM_BLOCKS) + .bind(latest) + .execute(&mut **tx) + .await?; + return Ok(true); + } + + Ok(false) + } + + async fn try_upload_manifest(&self) -> Result { + let mut tx = self.pool.begin().await?; + let state = sqlx::query_as::<_, ArchiveStateRow>( + "SELECT archive_start_block, latest_contiguous_uploaded_block, schema_version, manifest_dirty + FROM archive_state + WHERE stream = $1 + FOR UPDATE", + ) + .bind(ARCHIVE_STREAM_BLOCKS) + .fetch_optional(&mut *tx) + .await?; + + let Some(state) = state else { + tx.commit().await?; + return Ok(false); + }; + + if !state.manifest_dirty { + tx.commit().await?; + return Ok(false); + } + + let now = Utc::now(); + let manifest = ArchiveManifestV1 { + schema_version: state.schema_version, + chain_id: self.chain_id, + archive_start_block: state.archive_start_block, + latest_contiguous_uploaded_block: state.latest_contiguous_uploaded_block, + updated_at: now.to_rfc3339(), + }; + let payload = serde_json::to_vec(&manifest)?; + + match self + .store + .put_manifest_object(&manifest_object_key(&self.config.prefix), payload) + .await + { + Ok(_) => { + sqlx::query( + "UPDATE archive_state + SET manifest_dirty = FALSE, + manifest_updated_at = $2, + last_manifest_error = NULL, + updated_at = $2 + WHERE stream = $1", + ) + .bind(ARCHIVE_STREAM_BLOCKS) + .bind(now) + .execute(&mut *tx) + .await?; + tx.commit().await?; + Ok(true) + } + Err(err) => { + sqlx::query( + "UPDATE archive_state + SET last_manifest_error = $2, + updated_at = NOW() + WHERE stream = $1", + ) + .bind(ARCHIVE_STREAM_BLOCKS) + .bind(err.to_string()) + .execute(&mut *tx) + .await?; + tx.commit().await?; + Ok(false) + } + } + } +} + +impl ArchiveEntry { + pub(crate) fn from_fetched_block( + chain_id: u64, + prefix: &str, + fetched: &FetchedBlock, + ) -> Result { + let bundle = ArchiveBundleV1 { + schema_version: ARCHIVE_SCHEMA_VERSION, + chain_id, + block_number: fetched.number, + block: fetched.block.clone(), + receipts: fetched.receipts.clone(), + }; + let payload = zstd::stream::encode_all(serde_json::to_vec(&bundle)?.as_slice(), 3)?; + Ok(Self { + block_number: fetched.number as i64, + object_key: block_object_key(prefix, fetched.number), + payload, + schema_version: ARCHIVE_SCHEMA_VERSION, + }) + } +} + +pub async fn insert_archive_entries( + tx: &mut Transaction<'_>, + entries: Vec, +) -> Result<()> { + if entries.is_empty() { + return Ok(()); + } + + let mut block_numbers = Vec::with_capacity(entries.len()); + let mut object_keys = Vec::with_capacity(entries.len()); + let mut payloads = Vec::with_capacity(entries.len()); + let mut schema_versions = Vec::with_capacity(entries.len()); + + for entry in entries { + block_numbers.push(entry.block_number); + object_keys.push(entry.object_key); + payloads.push(entry.payload); + schema_versions.push(entry.schema_version); + } + + let min_block = *block_numbers + .iter() + .min() + .ok_or_else(|| anyhow!("archive entries unexpectedly empty"))?; + + let init_params: [&(dyn ToSql + Sync); 3] = + [&ARCHIVE_STREAM_BLOCKS, &min_block, &ARCHIVE_SCHEMA_VERSION]; + tx.execute( + "INSERT INTO archive_state + (stream, archive_start_block, latest_contiguous_uploaded_block, schema_version, manifest_dirty) + VALUES ($1, $2, NULL, $3, FALSE) + ON CONFLICT (stream) DO UPDATE SET + archive_start_block = LEAST(archive_state.archive_start_block, EXCLUDED.archive_start_block), + latest_contiguous_uploaded_block = CASE + WHEN EXCLUDED.archive_start_block < archive_state.archive_start_block THEN NULL + ELSE archive_state.latest_contiguous_uploaded_block + END, + schema_version = GREATEST(archive_state.schema_version, EXCLUDED.schema_version), + manifest_dirty = archive_state.manifest_dirty, + updated_at = NOW()", + &init_params, + ) + .await?; + + let params: [&(dyn ToSql + Sync); 4] = + [&block_numbers, &object_keys, &payloads, &schema_versions]; + tx.execute( + "INSERT INTO archive_blocks + (block_number, object_key, payload, schema_version, retry_count, next_attempt_at, created_at, updated_at) + SELECT block_number, object_key, payload, schema_version, 0, NOW(), NOW(), NOW() + FROM unnest($1::bigint[], $2::text[], $3::bytea[], $4::smallint[]) + AS t(block_number, object_key, payload, schema_version) + ON CONFLICT (block_number) DO UPDATE SET + object_key = EXCLUDED.object_key, + payload = EXCLUDED.payload, + schema_version = EXCLUDED.schema_version, + retry_count = 0, + next_attempt_at = NOW(), + last_error = NULL, + uploaded_at = NULL, + etag = NULL, + updated_at = NOW()", + ¶ms, + ) + .await?; + + Ok(()) +} + +pub fn block_object_key(prefix: &str, block_number: u64) -> String { + let bucket_start = (block_number / 10_000) * 10_000; + if prefix.is_empty() { + format!("v1/blocks/{bucket_start:012}/{block_number:012}.json.zst") + } else { + format!("{prefix}/v1/blocks/{bucket_start:012}/{block_number:012}.json.zst") + } +} + +pub fn manifest_object_key(prefix: &str) -> String { + if prefix.is_empty() { + "v1/manifest.json".to_string() + } else { + format!("{prefix}/v1/manifest.json") + } +} + +pub fn retry_delay_seconds(base: u64, retry_count: i32) -> u64 { + let exponent = retry_count.max(0).min(16) as u32; + base.saturating_mul(2u64.saturating_pow(exponent)) + .min(MAX_RETRY_DELAY_SECS) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn block_object_key_uses_expected_layout() { + assert_eq!( + block_object_key("atlas/dev", 10_237), + "atlas/dev/v1/blocks/000000010000/000000010237.json.zst" + ); + } + + #[test] + fn manifest_key_respects_prefix() { + assert_eq!(manifest_object_key(""), "v1/manifest.json"); + assert_eq!( + manifest_object_key("atlas/dev"), + "atlas/dev/v1/manifest.json" + ); + } + + #[test] + fn retry_delay_is_exponential_and_capped() { + assert_eq!(retry_delay_seconds(30, 0), 30); + assert_eq!(retry_delay_seconds(30, 1), 60); + assert_eq!(retry_delay_seconds(30, 2), 120); + assert_eq!(retry_delay_seconds(30, 10), 3600); + } + + #[test] + fn archive_manifest_round_trip() { + let manifest = ArchiveManifestV1 { + schema_version: ARCHIVE_SCHEMA_VERSION, + chain_id: 42, + archive_start_block: 100, + latest_contiguous_uploaded_block: Some(150), + updated_at: Utc::now().to_rfc3339(), + }; + let bytes = serde_json::to_vec(&manifest).unwrap(); + let decoded: ArchiveManifestV1 = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(decoded.schema_version, ARCHIVE_SCHEMA_VERSION); + assert_eq!(decoded.chain_id, 42); + assert_eq!(decoded.archive_start_block, 100); + assert_eq!(decoded.latest_contiguous_uploaded_block, Some(150)); + } + + #[test] + fn archive_bundle_round_trip_through_zstd() { + let bundle = ArchiveBundleV1 { + schema_version: ARCHIVE_SCHEMA_VERSION, + chain_id: 42, + block_number: 7, + block: alloy::rpc::types::Block::default(), + receipts: Vec::new(), + }; + let compressed = + zstd::stream::encode_all(serde_json::to_vec(&bundle).unwrap().as_slice(), 3).unwrap(); + let decoded = zstd::stream::decode_all(compressed.as_slice()).unwrap(); + let round_trip: ArchiveBundleV1 = serde_json::from_slice(&decoded).unwrap(); + + assert_eq!(round_trip.schema_version, ARCHIVE_SCHEMA_VERSION); + assert_eq!(round_trip.chain_id, 42); + assert_eq!(round_trip.block_number, 7); + assert!(round_trip.receipts.is_empty()); + } +} diff --git a/backend/crates/atlas-server/src/cli.rs b/backend/crates/atlas-server/src/cli.rs index 24bdf52..d3f5323 100644 --- a/backend/crates/atlas-server/src/cli.rs +++ b/backend/crates/atlas-server/src/cli.rs @@ -314,6 +314,7 @@ pub struct FaucetArgs { // FAUCET_PRIVATE_KEY is intentionally env-only (security: never pass secrets as CLI flags) } + #[derive(Args, Clone)] #[command(next_help_heading = "Branding")] pub struct BrandingArgs { diff --git a/backend/crates/atlas-server/src/config.rs b/backend/crates/atlas-server/src/config.rs index 1114654..7e57651 100644 --- a/backend/crates/atlas-server/src/config.rs +++ b/backend/crates/atlas-server/src/config.rs @@ -13,6 +13,7 @@ pub struct Config { // Shared pub database_url: String, pub rpc_url: String, + pub archive: Option, // Indexer pool pub indexer_db_max_connections: u32, @@ -55,6 +56,17 @@ pub struct Config { pub error_color: Option, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ArchiveConfig { + pub bucket: String, + pub region: String, + pub prefix: String, + pub endpoint: Option, + pub force_path_style: bool, + pub upload_concurrency: u32, + pub retry_base_seconds: u64, +} + #[derive(Clone)] pub struct FaucetConfig { pub enabled: bool, @@ -135,6 +147,7 @@ impl Config { Ok(Self { database_url: env::var("DATABASE_URL").context("DATABASE_URL must be set")?, rpc_url: env::var("RPC_URL").context("RPC_URL must be set")?, + archive: ArchiveConfig::from_env()?, indexer_db_max_connections: env::var("DB_MAX_CONNECTIONS") .unwrap_or_else(|_| "20".to_string()) @@ -303,6 +316,7 @@ impl Config { Ok(Self { database_url, rpc_url: args.rpc.url, + archive: ArchiveConfig::from_env()?, indexer_db_max_connections: args.db.max_connections, api_db_max_connections: args.db.api_max_connections, rpc_requests_per_second: args.rpc.requests_per_second, @@ -379,10 +393,74 @@ impl FaucetConfig { } } +impl ArchiveConfig { + pub fn from_env() -> Result> { + let enabled = env::var("S3_ENABLED") + .unwrap_or_else(|_| "false".to_string()) + .parse::() + .context("Invalid S3_ENABLED")?; + if !enabled { + return Ok(None); + } + + let bucket = + required_archive_value(env::var("S3_BUCKET").ok(), "S3_BUCKET")?; + let region = + required_archive_value(env::var("S3_REGION").ok(), "S3_REGION")?; + let prefix = normalize_prefix(env::var("S3_PREFIX").ok().as_deref()); + let endpoint = parse_optional_env(env::var("S3_ENDPOINT").ok()); + let force_path_style = env::var("S3_FORCE_PATH_STYLE") + .unwrap_or_else(|_| "false".to_string()) + .parse::() + .context("Invalid S3_FORCE_PATH_STYLE")?; + let upload_concurrency: u32 = env::var("S3_UPLOAD_CONCURRENCY") + .unwrap_or_else(|_| "4".to_string()) + .parse() + .context("Invalid S3_UPLOAD_CONCURRENCY")?; + let retry_base_seconds: u64 = env::var("S3_RETRY_BASE_SECONDS") + .unwrap_or_else(|_| "30".to_string()) + .parse() + .context("Invalid S3_RETRY_BASE_SECONDS")?; + + if upload_concurrency == 0 { + bail!("S3_UPLOAD_CONCURRENCY must be greater than 0"); + } + if retry_base_seconds == 0 { + bail!("S3_RETRY_BASE_SECONDS must be greater than 0"); + } + + Ok(Some(Self { + bucket, + region, + prefix, + endpoint, + force_path_style, + upload_concurrency, + retry_base_seconds, + })) + } + +} + fn parse_optional_env(val: Option) -> Option { val.map(|s| s.trim().to_string()).filter(|s| !s.is_empty()) } +fn normalize_prefix(prefix: Option<&str>) -> String { + prefix + .map(str::trim) + .map(|s| s.trim_matches('/')) + .filter(|s| !s.is_empty()) + .unwrap_or_default() + .to_string() +} + +fn required_archive_value(value: Option, flag: &str) -> Result { + parse_optional_env(value).ok_or_else(|| { + anyhow::anyhow!("{flag} (or matching env var) must be set when archive is enabled") + }) +} + fn parse_faucet_amount_to_wei(amount: &str) -> Result { let trimmed = amount.trim(); if trimmed.is_empty() { @@ -430,6 +508,20 @@ fn parse_faucet_amount_to_wei(amount: &str) -> Result { mod tests_from_run_args { use super::*; use crate::cli; + use std::sync::Mutex; + + static ENV_LOCK: Mutex<()> = Mutex::new(()); + + fn clear_archive_env() { + env::remove_var("S3_ENABLED"); + env::remove_var("S3_BUCKET"); + env::remove_var("S3_REGION"); + env::remove_var("S3_PREFIX"); + env::remove_var("S3_ENDPOINT"); + env::remove_var("S3_FORCE_PATH_STYLE"); + env::remove_var("S3_UPLOAD_CONCURRENCY"); + env::remove_var("S3_RETRY_BASE_SECONDS"); + } fn minimal_run_args() -> cli::RunArgs { cli::RunArgs { @@ -493,6 +585,7 @@ mod tests_from_run_args { assert_eq!(config.rpc_url, "http://localhost:8545"); assert_eq!(config.chain_name, "TestChain"); assert!(!config.da_tracking_enabled); + assert!(config.archive.is_none()); } #[test] @@ -569,6 +662,54 @@ mod tests_from_run_args { assert!(Config::from_run_args(args).is_err()); } + #[test] + fn archive_enabled_requires_bucket_and_region() { + let _lock = ENV_LOCK.lock().unwrap(); + clear_archive_env(); + env::set_var("S3_ENABLED", "true"); + let err = Config::from_run_args(minimal_run_args()).unwrap_err(); + assert!(err.to_string().contains("S3_BUCKET")); + clear_archive_env(); + } + + #[test] + fn archive_env_vars_populate_archive_config() { + let _lock = ENV_LOCK.lock().unwrap(); + clear_archive_env(); + env::set_var("S3_ENABLED", "true"); + env::set_var("S3_BUCKET", "atlas-archive"); + env::set_var("S3_REGION", "eu-west-1"); + env::set_var("S3_PREFIX", "/atlas/dev/"); + env::set_var("S3_ENDPOINT", "http://localhost:9000"); + env::set_var("S3_FORCE_PATH_STYLE", "true"); + env::set_var("S3_UPLOAD_CONCURRENCY", "7"); + env::set_var("S3_RETRY_BASE_SECONDS", "45"); + + let config = Config::from_run_args(minimal_run_args()).unwrap(); + let archive = config.archive.expect("archive config"); + assert_eq!(archive.bucket, "atlas-archive"); + assert_eq!(archive.region, "eu-west-1"); + assert_eq!(archive.prefix, "atlas/dev"); + assert_eq!(archive.endpoint.as_deref(), Some("http://localhost:9000")); + assert!(archive.force_path_style); + assert_eq!(archive.upload_concurrency, 7); + assert_eq!(archive.retry_base_seconds, 45); + clear_archive_env(); + } + + #[test] + fn archive_upload_concurrency_must_be_positive() { + let _lock = ENV_LOCK.lock().unwrap(); + clear_archive_env(); + env::set_var("S3_ENABLED", "true"); + env::set_var("S3_BUCKET", "atlas-archive"); + env::set_var("S3_REGION", "eu-west-1"); + env::set_var("S3_UPLOAD_CONCURRENCY", "0"); + let err = Config::from_run_args(minimal_run_args()).unwrap_err(); + assert!(err.to_string().contains("S3_UPLOAD_CONCURRENCY")); + clear_archive_env(); + } + #[test] fn branding_blank_strings_become_none() { let mut args = minimal_run_args(); diff --git a/backend/crates/atlas-server/src/indexer/fetcher.rs b/backend/crates/atlas-server/src/indexer/fetcher.rs index b7b8f26..08adab6 100644 --- a/backend/crates/atlas-server/src/indexer/fetcher.rs +++ b/backend/crates/atlas-server/src/indexer/fetcher.rs @@ -34,6 +34,7 @@ pub(crate) enum FetchResult { } /// Data fetched from RPC for a single block +#[derive(Clone)] pub(crate) struct FetchedBlock { pub(crate) number: u64, pub(crate) block: Block, diff --git a/backend/crates/atlas-server/src/indexer/indexer.rs b/backend/crates/atlas-server/src/indexer/indexer.rs index deccc07..ad8f979 100644 --- a/backend/crates/atlas-server/src/indexer/indexer.rs +++ b/backend/crates/atlas-server/src/indexer/indexer.rs @@ -23,6 +23,7 @@ use super::fetcher::{ fetch_blocks_batch, get_block_number_with_retry, FetchResult, FetchedBlock, SharedRateLimiter, WorkItem, }; +use crate::archive::{insert_archive_entries, ArchiveEntry}; use crate::config::Config; use crate::head::HeadTracker; @@ -37,6 +38,7 @@ const ZERO_ADDRESS: &str = "0x0000000000000000000000000000000000000000"; pub struct Indexer { pool: PgPool, config: Config, + chain_id: u64, /// Tracks the maximum partition number that has been created /// Used to avoid checking pg_class on every batch current_max_partition: std::sync::atomic::AtomicU64, @@ -50,12 +52,14 @@ impl Indexer { pub fn new( pool: PgPool, config: Config, + chain_id: u64, block_events_tx: broadcast::Sender<()>, head_tracker: Arc, ) -> Self { Self { pool, config, + chain_id, // Will be initialized on first run based on start block current_max_partition: std::sync::atomic::AtomicU64::new(0), block_events_tx, @@ -245,6 +249,7 @@ impl Indexer { let mut blocks_received = 0; let mut failed_blocks: Vec<(u64, String)> = Vec::new(); let mut batch = BlockBatch::new(); + let mut fetched_blocks_for_archive: Vec = Vec::new(); // Receive all blocks for this batch while blocks_received < batch_size { @@ -255,6 +260,9 @@ impl Indexer { // Collect consecutive blocks in order (sync, no await) while let Some(data) = buffer.remove(&next_to_process) { + if self.config.archive.is_some() { + fetched_blocks_for_archive.push(data.clone()); + } Self::collect_block(&mut batch, &known_erc20, &known_nft, data); next_to_process += 1; } @@ -293,8 +301,25 @@ impl Indexer { .await; let _ = self.block_events_tx.send(()); + // Build archive entries outside the hot loop (serialization + zstd compression) + let archive_entries = if let Some(archive) = &self.config.archive { + let chain_id = self.chain_id; + let prefix = archive.prefix.clone(); + let blocks = fetched_blocks_for_archive; + tokio::task::spawn_blocking(move || { + blocks + .iter() + .map(|b| ArchiveEntry::from_fetched_block(chain_id, &prefix, b)) + .collect::>>() + }) + .await?? + } else { + Vec::new() + }; + // One DB transaction for the entire batch - self.write_batch(&mut copy_client, batch, true).await?; + self.write_batch(&mut copy_client, batch, archive_entries, true) + .await?; // Write succeeded — now safe to update the persistent in-memory sets known_erc20.extend(new_erc20); @@ -338,6 +363,18 @@ impl Indexer { Some(FetchResult::Success(fetched)) => { // Write retried block immediately let mut mini_batch = BlockBatch::new(); + let archive_entries = if let Some(archive) = &self.config.archive { + let chain_id = self.chain_id; + let prefix = archive.prefix.clone(); + let f = fetched.clone(); + tokio::task::spawn_blocking(move || { + ArchiveEntry::from_fetched_block(chain_id, &prefix, &f) + .map(|e| vec![e]) + }) + .await?? + } else { + Vec::new() + }; Self::collect_block( &mut mini_batch, &known_erc20, @@ -349,8 +386,13 @@ impl Indexer { // Don't update the watermark — the main batch already wrote // a higher last_indexed_block; overwriting it with this // block's lower number would cause a regression on restart. - self.write_batch(&mut copy_client, mini_batch, false) - .await?; + self.write_batch( + &mut copy_client, + mini_batch, + archive_entries, + false, + ) + .await?; known_erc20.extend(new_erc20); known_nft.extend(new_nft); tracing::info!("Block {} retry succeeded", block_num); @@ -648,6 +690,7 @@ impl Indexer { &self, copy_client: &mut Client, batch: BlockBatch, + archive_entries: Vec, update_watermark: bool, ) -> Result<()> { if batch.b_numbers.is_empty() { @@ -662,6 +705,7 @@ impl Indexer { copy_event_logs(&mut pg_tx, &batch).await?; copy_nft_transfers(&mut pg_tx, &batch).await?; copy_erc20_transfers(&mut pg_tx, &batch).await?; + insert_archive_entries(&mut pg_tx, archive_entries).await?; let BlockBatch { tl_hashes, @@ -926,11 +970,13 @@ impl Indexer { async fn truncate_tables(&self) -> Result<()> { sqlx::query( - "TRUNCATE blocks, transactions, addresses, nft_contracts, nft_tokens, nft_transfers, - erc20_contracts, erc20_transfers, erc20_balances, event_logs, proxy_contracts, indexer_state CASCADE" + "TRUNCATE blocks, transactions, event_logs, addresses, nft_contracts, nft_tokens, + nft_transfers, indexer_state, erc20_contracts, erc20_transfers, erc20_balances, + event_signatures, address_labels, proxy_contracts, contract_abis, failed_blocks, + tx_hash_lookup, block_da_status, archive_blocks, archive_state CASCADE", ) - .execute(&self.pool) - .await?; + .execute(&self.pool) + .await?; Ok(()) } } diff --git a/backend/crates/atlas-server/src/lib.rs b/backend/crates/atlas-server/src/lib.rs index f9da928..5562018 100644 --- a/backend/crates/atlas-server/src/lib.rs +++ b/backend/crates/atlas-server/src/lib.rs @@ -1,4 +1,5 @@ pub mod api; +pub mod archive; pub mod cli; pub mod config; pub mod faucet; diff --git a/backend/crates/atlas-server/src/main.rs b/backend/crates/atlas-server/src/main.rs index 14d932c..c761055 100644 --- a/backend/crates/atlas-server/src/main.rs +++ b/backend/crates/atlas-server/src/main.rs @@ -9,6 +9,7 @@ use alloy::providers::ProviderBuilder; use alloy::signers::local::PrivateKeySigner; mod api; +mod archive; mod cli; mod config; mod faucet; @@ -18,6 +19,11 @@ mod indexer; /// Retry delays for exponential backoff (in seconds) const RETRY_DELAYS: &[u64] = &[5, 10, 20, 30, 60]; const MAX_RETRY_DELAY: u64 = 60; +const DB_RESET_TRUNCATE_SQL: &str = + "TRUNCATE blocks, transactions, event_logs, addresses, nft_contracts, nft_tokens, + nft_transfers, indexer_state, erc20_contracts, erc20_transfers, erc20_balances, + event_signatures, address_labels, proxy_contracts, contract_abis, failed_blocks, + tx_hash_lookup, block_da_status, archive_blocks, archive_state CASCADE"; fn init_tracing(filter: &str) { tracing_subscriber::registry() @@ -224,6 +230,14 @@ async fn run(args: cli::RunArgs) -> Result<()> { let config = config::Config::from_run_args(args.clone())?; let faucet_config = config::FaucetConfig::from_faucet_args(&args.faucet)?; + let archive_store = if let Some(archive_config) = config.archive.clone() { + tracing::info!("Archive upload enabled; validating bucket access"); + let store = Arc::new(archive::S3ArchiveStore::from_config(&archive_config).await?); + archive::ArchiveObjectStore::ensure_bucket_access(store.as_ref()).await?; + Some(store as Arc) + } else { + None + }; let faucet = if faucet_config.enabled { tracing::info!("Faucet enabled"); @@ -291,6 +305,7 @@ async fn run(args: cli::RunArgs) -> Result<()> { let indexer = indexer::Indexer::new( indexer_pool.clone(), config.clone(), + chain_id, block_events_tx, head_tracker, ); @@ -324,7 +339,7 @@ async fn run(args: cli::RunArgs) -> Result<()> { }); } - let metadata_pool = indexer_pool; + let metadata_pool = indexer_pool.clone(); let metadata_config = config.clone(); tokio::spawn(async move { if let Err(e) = run_with_retry(|| async { @@ -338,6 +353,16 @@ async fn run(args: cli::RunArgs) -> Result<()> { } }); + if let (Some(archive_config), Some(store)) = (config.archive.clone(), archive_store.clone()) { + let uploader = + archive::ArchiveUploader::new(indexer_pool.clone(), store, archive_config, chain_id); + tokio::spawn(async move { + if let Err(e) = run_with_retry(|| uploader.run()).await { + tracing::error!("Archive uploader terminated with error: {}", e); + } + }); + } + let app = api::build_router(state, config.cors_origin.clone()); let addr = format!("{}:{}", config.api_host, config.api_port); tracing::info!("API listening on {}", addr); @@ -367,6 +392,13 @@ async fn check(args: cli::RunArgs) -> Result<()> { let chain_id = fetch_chain_id(&config.rpc_url).await?; tracing::info!("RPC OK — chain_id={}", chain_id); + if let Some(archive_config) = config.archive.clone() { + tracing::info!("Testing archive bucket connectivity..."); + let store = archive::S3ArchiveStore::from_config(&archive_config).await?; + archive::ArchiveObjectStore::ensure_bucket_access(&store).await?; + tracing::info!("Archive bucket OK"); + } + tracing::info!("Configuration is valid"); Ok(()) } @@ -408,14 +440,7 @@ async fn cmd_db_reset(db_url: &str, confirm: bool) -> Result<()> { } let pool = atlas_common::db::create_pool(required_db_url(db_url)?, 1).await?; - sqlx::query( - "TRUNCATE blocks, transactions, event_logs, addresses, nft_contracts, nft_tokens, - nft_transfers, indexer_state, erc20_contracts, erc20_transfers, erc20_balances, - event_signatures, address_labels, proxy_contracts, contract_abis, failed_blocks, - tx_hash_lookup, block_da_status CASCADE", - ) - .execute(&pool) - .await?; + sqlx::query(DB_RESET_TRUNCATE_SQL).execute(&pool).await?; eprintln!("All indexed data has been reset."); Ok(()) } @@ -625,4 +650,10 @@ mod tests { assert_eq!(env_value(&config, "PGPASSWORD"), Some("query-pass")); assert_eq!(env_value(&config, "PGDATABASE"), Some("query_db")); } + + #[test] + fn db_reset_truncates_archive_tables() { + assert!(DB_RESET_TRUNCATE_SQL.contains("archive_blocks")); + assert!(DB_RESET_TRUNCATE_SQL.contains("archive_state")); + } } diff --git a/backend/crates/atlas-server/tests/integration/archive.rs b/backend/crates/atlas-server/tests/integration/archive.rs new file mode 100644 index 0000000..975e41e --- /dev/null +++ b/backend/crates/atlas-server/tests/integration/archive.rs @@ -0,0 +1,507 @@ +use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, LazyLock, Mutex}; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; + +use atlas_server::archive::{ + block_object_key, insert_archive_entries, manifest_object_key, ArchiveBundleV1, ArchiveEntry, + ArchiveManifestV1, ArchiveObjectStore, ArchiveUploader, ARCHIVE_SCHEMA_VERSION, +}; +use atlas_server::config::ArchiveConfig; + +use crate::common; + +static ARCHIVE_TEST_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); +static BUCKET_COUNTER: AtomicUsize = AtomicUsize::new(0); + +fn next_bucket_name() -> String { + let id = BUCKET_COUNTER.fetch_add(1, Ordering::Relaxed); + format!("atlas-archive-test-{id}") +} + +async fn seed_archive_block( + pool: &PgPool, + prefix: &str, + chain_id: u64, + block_number: i64, + archive_start_block: i64, +) { + sqlx::query( + "INSERT INTO archive_state + (stream, archive_start_block, latest_contiguous_uploaded_block, schema_version, manifest_dirty, updated_at) + VALUES ('blocks', $1, NULL, $2, FALSE, NOW()) + ON CONFLICT (stream) DO NOTHING", + ) + .bind(archive_start_block) + .bind(ARCHIVE_SCHEMA_VERSION) + .execute(pool) + .await + .expect("insert archive state"); + + let bundle = ArchiveBundleV1 { + schema_version: ARCHIVE_SCHEMA_VERSION, + chain_id, + block_number: block_number as u64, + block: alloy::rpc::types::Block::default(), + receipts: Vec::new(), + }; + let payload = zstd::stream::encode_all( + serde_json::to_vec(&bundle) + .expect("serialize bundle") + .as_slice(), + 3, + ) + .expect("compress bundle"); + + sqlx::query( + "INSERT INTO archive_blocks + (block_number, object_key, payload, schema_version, retry_count, next_attempt_at, created_at, updated_at) + VALUES ($1, $2, $3, $4, 0, NOW(), NOW(), NOW())", + ) + .bind(block_number) + .bind(block_object_key(prefix, block_number as u64)) + .bind(payload) + .bind(ARCHIVE_SCHEMA_VERSION) + .execute(pool) + .await + .expect("insert archive block"); +} + +fn archive_entry(prefix: &str, chain_id: u64, block_number: i64) -> ArchiveEntry { + let bundle = ArchiveBundleV1 { + schema_version: ARCHIVE_SCHEMA_VERSION, + chain_id, + block_number: block_number as u64, + block: alloy::rpc::types::Block::default(), + receipts: Vec::new(), + }; + + ArchiveEntry { + block_number, + object_key: block_object_key(prefix, block_number as u64), + payload: zstd::stream::encode_all( + serde_json::to_vec(&bundle) + .expect("serialize bundle") + .as_slice(), + 3, + ) + .expect("compress bundle"), + schema_version: ARCHIVE_SCHEMA_VERSION, + } +} + +async fn wait_until( + timeout: Duration, + mut predicate: impl FnMut() -> std::pin::Pin + Send>>, +) { + let deadline = Instant::now() + timeout; + loop { + if predicate().await { + return; + } + assert!( + Instant::now() < deadline, + "condition timed out after {timeout:?}" + ); + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + +#[derive(Default)] +struct FailingStore; + +#[async_trait] +impl ArchiveObjectStore for FailingStore { + async fn ensure_bucket_access(&self) -> anyhow::Result<()> { + Ok(()) + } + + async fn put_archive_object( + &self, + _key: &str, + _payload: Vec, + ) -> anyhow::Result> { + anyhow::bail!("simulated archive upload failure"); + } + + async fn put_manifest_object( + &self, + _key: &str, + _payload: Vec, + ) -> anyhow::Result> { + Ok(Some("\"manifest\"".to_string())) + } +} + +#[derive(Default)] +struct FlakyStore { + attempts: Mutex>, + manifests: Mutex>>, +} + +#[async_trait] +impl ArchiveObjectStore for FlakyStore { + async fn ensure_bucket_access(&self) -> anyhow::Result<()> { + Ok(()) + } + + async fn put_archive_object( + &self, + key: &str, + _payload: Vec, + ) -> anyhow::Result> { + let mut attempts = self.attempts.lock().expect("lock attempts"); + let attempt = attempts.entry(key.to_string()).or_insert(0); + *attempt += 1; + if key.ends_with("/000000000031.json.zst") && *attempt == 1 { + anyhow::bail!("simulated transient gap") + } + Ok(Some(format!("etag-{key}"))) + } + + async fn put_manifest_object( + &self, + _key: &str, + payload: Vec, + ) -> anyhow::Result> { + self.manifests.lock().expect("lock manifests").push(payload); + Ok(Some("\"manifest\"".to_string())) + } +} + +#[derive(Default)] +struct RecordingStore { + manifests: Mutex>>, +} + +#[async_trait] +impl ArchiveObjectStore for RecordingStore { + async fn ensure_bucket_access(&self) -> anyhow::Result<()> { + Ok(()) + } + + async fn put_archive_object( + &self, + key: &str, + _payload: Vec, + ) -> anyhow::Result> { + Ok(Some(format!("etag-{key}"))) + } + + async fn put_manifest_object( + &self, + _key: &str, + payload: Vec, + ) -> anyhow::Result> { + self.manifests.lock().expect("lock manifests").push(payload); + Ok(Some("\"manifest\"".to_string())) + } +} + +#[test] +fn successful_upload_to_minio_marks_uploaded_and_writes_manifest() { + let _guard = ARCHIVE_TEST_LOCK.lock().expect("archive test lock"); + common::run(async { + common::reset_archive_tables().await; + + let bucket = next_bucket_name(); + common::create_bucket(&bucket).await; + let config = common::archive_config(&bucket); + let prefix = config.prefix.clone(); + let store = Arc::new(common::archive_store(&bucket).await); + + seed_archive_block(common::pool(), &prefix, 42, 11, 11).await; + + let uploader = ArchiveUploader::new( + common::pool().clone(), + store.clone() as Arc, + config, + 42, + ); + let handle = tokio::spawn(async move { uploader.run().await }); + + wait_until(Duration::from_secs(10), || { + Box::pin(async { + let row: (Option>, Option>, Option) = sqlx::query_as( + "SELECT uploaded_at, payload, etag FROM archive_blocks WHERE block_number = 11", + ) + .fetch_one(common::pool()) + .await + .expect("fetch archive row"); + let state: (Option, bool, Option>) = sqlx::query_as( + "SELECT latest_contiguous_uploaded_block, manifest_dirty, manifest_updated_at + FROM archive_state WHERE stream = 'blocks'", + ) + .fetch_one(common::pool()) + .await + .expect("fetch archive state"); + row.0.is_some() + && row.1.is_none() + && row.2.is_some() + && state.0 == Some(11) + && !state.1 + && state.2.is_some() + }) + }) + .await; + + let object_key = block_object_key(&prefix, 11); + let object_bytes = common::get_object_bytes(&bucket, &object_key).await; + let decoded = + zstd::stream::decode_all(object_bytes.as_slice()).expect("decode archive object"); + let bundle: ArchiveBundleV1 = + serde_json::from_slice(&decoded).expect("decode archive bundle"); + assert_eq!(bundle.chain_id, 42); + assert_eq!(bundle.block_number, 11); + + let manifest_bytes = common::get_object_bytes(&bucket, &manifest_object_key(&prefix)).await; + let manifest: ArchiveManifestV1 = + serde_json::from_slice(&manifest_bytes).expect("decode manifest"); + assert_eq!(manifest.chain_id, 42); + assert_eq!(manifest.archive_start_block, 11); + assert_eq!(manifest.latest_contiguous_uploaded_block, Some(11)); + + handle.abort(); + let _ = handle.await; + }); +} + +#[test] +fn failed_upload_preserves_row_and_updates_retry_metadata() { + let _guard = ARCHIVE_TEST_LOCK.lock().expect("archive test lock"); + common::run(async { + common::reset_archive_tables().await; + + let mut config = common::archive_config("unused-bucket"); + config.retry_base_seconds = 10; + let uploader = ArchiveUploader::new( + common::pool().clone(), + Arc::new(FailingStore) as Arc, + config, + 42, + ); + + seed_archive_block(common::pool(), "integration", 42, 21, 21).await; + + let started_at = Utc::now(); + let handle = tokio::spawn(async move { uploader.run().await }); + + wait_until(Duration::from_secs(5), || { + Box::pin(async { + let row: ( + Option>, + Option>, + i32, + Option, + Option>, + ) = sqlx::query_as( + "SELECT uploaded_at, payload, retry_count, last_error, next_attempt_at + FROM archive_blocks WHERE block_number = 21", + ) + .fetch_one(common::pool()) + .await + .expect("fetch failed archive row"); + row.0.is_none() + && row.1.is_some() + && row.2 == 1 + && row + .3 + .as_deref() + .is_some_and(|err| err.contains("simulated archive upload failure")) + && row.4.is_some() + }) + }) + .await; + + let row: (i32, DateTime) = sqlx::query_as( + "SELECT retry_count, next_attempt_at + FROM archive_blocks WHERE block_number = 21", + ) + .fetch_one(common::pool()) + .await + .expect("fetch retry timing"); + assert_eq!(row.0, 1); + assert!( + row.1 >= started_at + chrono::Duration::seconds(9), + "next retry should use the configured base delay" + ); + assert!( + row.1 <= started_at + chrono::Duration::seconds(15), + "next retry should not skip directly to the doubled delay" + ); + + handle.abort(); + let _ = handle.await; + }); +} + +#[test] +fn late_lower_block_rebases_archive_start_and_recovers_contiguous_head() { + let _guard = ARCHIVE_TEST_LOCK.lock().expect("archive test lock"); + common::run(async { + common::reset_archive_tables().await; + + let config: ArchiveConfig = common::archive_config("unused-bucket"); + let prefix = config.prefix.clone(); + let store = Arc::new(RecordingStore::default()); + + let (mut client, connection) = tokio_postgres::connect(common::database_url(), tokio_postgres::NoTls) + .await + .expect("connect tokio-postgres"); + tokio::spawn(async move { + let _ = connection.await; + }); + + let mut tx = client.transaction().await.expect("begin tx for block 11"); + insert_archive_entries(&mut tx, vec![archive_entry(&prefix, 7, 11)]) + .await + .expect("insert block 11 archive entry"); + tx.commit().await.expect("commit block 11 archive entry"); + + sqlx::query( + "UPDATE archive_blocks + SET uploaded_at = NOW(), + payload = NULL, + etag = 'etag-11', + updated_at = NOW() + WHERE block_number = 11", + ) + .execute(common::pool()) + .await + .expect("mark block 11 uploaded"); + sqlx::query( + "UPDATE archive_state + SET latest_contiguous_uploaded_block = 11, + manifest_dirty = FALSE, + manifest_updated_at = NOW(), + updated_at = NOW() + WHERE stream = 'blocks'", + ) + .execute(common::pool()) + .await + .expect("seed archive state"); + + let mut tx = client.transaction().await.expect("begin tx for block 10"); + insert_archive_entries(&mut tx, vec![archive_entry(&prefix, 7, 10)]) + .await + .expect("insert lower block archive entry"); + tx.commit().await.expect("commit block 10 archive entry"); + + let state_after_insert: (i64, Option, bool) = sqlx::query_as( + "SELECT archive_start_block, latest_contiguous_uploaded_block, manifest_dirty + FROM archive_state WHERE stream = 'blocks'", + ) + .fetch_one(common::pool()) + .await + .expect("fetch archive state after rebasing"); + assert_eq!(state_after_insert.0, 10); + assert_eq!(state_after_insert.1, None); + assert!(!state_after_insert.2); + + let uploader = ArchiveUploader::new( + common::pool().clone(), + store.clone() as Arc, + config, + 7, + ); + let handle = tokio::spawn(async move { uploader.run().await }); + + wait_until(Duration::from_secs(5), || { + Box::pin(async { + let state: (i64, Option, bool, Option>) = sqlx::query_as( + "SELECT archive_start_block, latest_contiguous_uploaded_block, manifest_dirty, manifest_updated_at + FROM archive_state WHERE stream = 'blocks'", + ) + .fetch_one(common::pool()) + .await + .expect("fetch archive state"); + state.0 == 10 && state.1 == Some(11) && !state.2 && state.3.is_some() + }) + }) + .await; + + let manifests = store.manifests.lock().expect("lock manifests"); + let last: ArchiveManifestV1 = + serde_json::from_slice(manifests.last().expect("final manifest")) + .expect("decode final manifest"); + assert_eq!(last.archive_start_block, 10); + assert_eq!(last.latest_contiguous_uploaded_block, Some(11)); + + handle.abort(); + let _ = handle.await; + }); +} + +#[test] +fn out_of_order_uploads_do_not_advance_manifest_past_gap() { + let _guard = ARCHIVE_TEST_LOCK.lock().expect("archive test lock"); + common::run(async { + common::reset_archive_tables().await; + + let mut config: ArchiveConfig = common::archive_config("unused-bucket"); + config.upload_concurrency = 3; + config.retry_base_seconds = 1; + let store = Arc::new(FlakyStore::default()); + let uploader = ArchiveUploader::new( + common::pool().clone(), + store.clone() as Arc, + config, + 7, + ); + + for block_number in [30_i64, 31, 32] { + seed_archive_block(common::pool(), "integration", 7, block_number, 30).await; + } + + let handle = tokio::spawn(async move { uploader.run().await }); + + wait_until(Duration::from_secs(5), || { + Box::pin(async { + let head: Option = sqlx::query_scalar( + "SELECT latest_contiguous_uploaded_block + FROM archive_state WHERE stream = 'blocks'", + ) + .fetch_one(common::pool()) + .await + .expect("fetch contiguous head"); + head == Some(30) + }) + }) + .await; + + wait_until(Duration::from_secs(10), || { + Box::pin(async { + let state: (Option, bool, Option>) = sqlx::query_as( + "SELECT latest_contiguous_uploaded_block, manifest_dirty, manifest_updated_at + FROM archive_state WHERE stream = 'blocks'", + ) + .fetch_one(common::pool()) + .await + .expect("fetch archive state"); + state.0 == Some(32) && !state.1 && state.2.is_some() + }) + }) + .await; + + let manifests = store.manifests.lock().expect("lock manifests"); + assert!( + manifests.len() >= 2, + "expected at least two manifest uploads, got {}", + manifests.len() + ); + let first: ArchiveManifestV1 = + serde_json::from_slice(&manifests[0]).expect("decode first manifest"); + let last: ArchiveManifestV1 = + serde_json::from_slice(manifests.last().expect("final manifest")) + .expect("decode final manifest"); + assert_eq!(first.latest_contiguous_uploaded_block, Some(30)); + assert_eq!(last.latest_contiguous_uploaded_block, Some(32)); + + handle.abort(); + let _ = handle.await; + }); +} diff --git a/backend/crates/atlas-server/tests/integration/common.rs b/backend/crates/atlas-server/tests/integration/common.rs index da128f6..2899b92 100644 --- a/backend/crates/atlas-server/tests/integration/common.rs +++ b/backend/crates/atlas-server/tests/integration/common.rs @@ -1,39 +1,54 @@ +use aws_config::{BehaviorVersion, Region}; +use aws_sdk_s3::config::Credentials; +use aws_sdk_s3::Client as S3Client; use axum::Router; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; use std::sync::{Arc, LazyLock}; +use std::time::Duration; +use testcontainers::core::{IntoContainerPort, WaitFor}; use testcontainers::runners::AsyncRunner; -use testcontainers::ContainerAsync; +use testcontainers::{ContainerAsync, GenericImage, ImageExt}; use testcontainers_modules::postgres::Postgres; use tokio::sync::broadcast; use atlas_server::api::{build_router, AppState}; +use atlas_server::archive::S3ArchiveStore; +use atlas_server::config::ArchiveConfig; use atlas_server::head::HeadTracker; +const MINIO_IMAGE_TAG: &str = "RELEASE.2024-01-16T16-07-38Z"; +const MINIO_ROOT_USER: &str = "minioadmin"; +const MINIO_ROOT_PASSWORD: &str = "minioadmin"; +const MINIO_REGION: &str = "us-east-1"; + struct TestEnv { runtime: tokio::runtime::Runtime, pool: PgPool, - _container: ContainerAsync, + database_url: String, + minio_endpoint: String, + _postgres: ContainerAsync, + _minio: ContainerAsync, } // Single LazyLock: runtime + container + pool, all initialized together. static ENV: LazyLock = LazyLock::new(|| { let runtime = tokio::runtime::Runtime::new().expect("create test runtime"); - let (pool, container) = runtime.block_on(async { - let container = Postgres::default() + let (pool, postgres, minio, minio_endpoint, pg_database_url) = runtime.block_on(async { + let postgres = Postgres::default() .start() .await .expect("Failed to start Postgres container"); - let host = container.get_host().await.expect("get host"); - let port = container.get_host_port_ipv4(5432).await.expect("get port"); + let host = postgres.get_host().await.expect("get host"); + let port = postgres.get_host_port_ipv4(5432).await.expect("get port"); - let database_url = format!("postgres://postgres:postgres@{}:{}/postgres", host, port); + let pg_database_url = format!("postgres://postgres:postgres@{}:{}/postgres", host, port); let pool = PgPoolOptions::new() .max_connections(10) - .connect(&database_url) + .connect(&pg_database_url) .await .expect("Failed to create pool"); @@ -42,20 +57,142 @@ static ENV: LazyLock = LazyLock::new(|| { .await .expect("Failed to run migrations"); - (pool, container) + let minio = GenericImage::new("minio/minio", MINIO_IMAGE_TAG) + .with_exposed_port(9000.tcp()) + .with_exposed_port(9001.tcp()) + .with_wait_for(WaitFor::seconds(2)) + .with_env_var("MINIO_ROOT_USER", MINIO_ROOT_USER) + .with_env_var("MINIO_ROOT_PASSWORD", MINIO_ROOT_PASSWORD) + .with_cmd([ + "server", + "/data", + "--address", + ":9000", + "--console-address", + ":9001", + ]) + .start() + .await + .expect("Failed to start MinIO container"); + + let minio_host = minio.get_host().await.expect("get minio host"); + let minio_port = minio + .get_host_port_ipv4(9000.tcp()) + .await + .expect("get minio port"); + let minio_endpoint = format!("http://{}:{}", minio_host, minio_port); + + wait_for_minio(&minio_endpoint).await; + + (pool, postgres, minio, minio_endpoint, pg_database_url) }); TestEnv { runtime, pool, - _container: container, + database_url: pg_database_url, + minio_endpoint, + _postgres: postgres, + _minio: minio, } }); +async fn wait_for_minio(endpoint: &str) { + let client = reqwest::Client::new(); + + for _ in 0..30 { + let response = client + .get(format!("{endpoint}/minio/health/live")) + .timeout(Duration::from_secs(1)) + .send() + .await; + if let Ok(response) = response { + if response.status().is_success() { + return; + } + } + tokio::time::sleep(Duration::from_millis(250)).await; + } + + panic!("MinIO did not become ready at {endpoint}"); +} + pub fn pool() -> &'static PgPool { &ENV.pool } +pub fn database_url() -> &'static str { + &ENV.database_url +} + +pub fn minio_endpoint() -> &'static str { + &ENV.minio_endpoint +} + +pub fn archive_config(bucket: impl Into) -> ArchiveConfig { + ArchiveConfig { + bucket: bucket.into(), + region: MINIO_REGION.to_string(), + prefix: "integration".to_string(), + endpoint: Some(minio_endpoint().to_string()), + force_path_style: true, + upload_concurrency: 1, + retry_base_seconds: 1, + } +} + +pub async fn archive_store(bucket: impl Into) -> S3ArchiveStore { + let bucket = bucket.into(); + let client = s3_client().await; + let store = S3ArchiveStore::new(client, &bucket); + atlas_server::archive::ArchiveObjectStore::ensure_bucket_access(&store) + .await + .expect("bucket access"); + store +} + +pub async fn s3_client() -> S3Client { + let creds = Credentials::new(MINIO_ROOT_USER, MINIO_ROOT_PASSWORD, None, None, "test"); + let s3_config = aws_sdk_s3::config::Builder::new() + .region(Region::new(MINIO_REGION.to_string())) + .endpoint_url(minio_endpoint()) + .credentials_provider(creds) + .force_path_style(true) + .behavior_version(BehaviorVersion::latest()) + .build(); + S3Client::from_conf(s3_config) +} + +pub async fn create_bucket(bucket: &str) { + let client = s3_client().await; + let _ = client.create_bucket().bucket(bucket).send().await; +} + +pub async fn get_object_bytes(bucket: &str, key: &str) -> Vec { + let client = s3_client().await; + let response = client + .get_object() + .bucket(bucket) + .key(key) + .send() + .await + .expect("get object"); + response + .body + .collect() + .await + .expect("collect object body") + .into_bytes() + .to_vec() +} + +pub async fn reset_archive_tables() { + sqlx::query("TRUNCATE archive_blocks, archive_state CASCADE") + .execute(pool()) + .await + .expect("truncate archive tables"); +} + pub fn test_router() -> Router { let pool = pool().clone(); let head_tracker = Arc::new(HeadTracker::empty(10)); diff --git a/backend/crates/atlas-server/tests/integration/main.rs b/backend/crates/atlas-server/tests/integration/main.rs index 92e9a0d..4121b5d 100644 --- a/backend/crates/atlas-server/tests/integration/main.rs +++ b/backend/crates/atlas-server/tests/integration/main.rs @@ -1,6 +1,7 @@ mod common; mod addresses; +mod archive; mod blocks; mod nfts; mod schema; diff --git a/backend/crates/atlas-server/tests/integration/schema.rs b/backend/crates/atlas-server/tests/integration/schema.rs index 306c960..3d28b92 100644 --- a/backend/crates/atlas-server/tests/integration/schema.rs +++ b/backend/crates/atlas-server/tests/integration/schema.rs @@ -29,6 +29,8 @@ fn all_expected_tables_exist() { for expected in [ "addresses", "address_labels", + "archive_blocks", + "archive_state", "block_da_status", "blocks", "contract_abis", @@ -117,6 +119,9 @@ fn key_indexes_exist() { "tx_hash_lookup_pkey", // da status (powers pending-DA queries) "idx_block_da_status_pending", + // archive + "idx_archive_blocks_pending", + "idx_archive_blocks_uploaded", ] { assert!( indexes.contains(&expected.to_string()), diff --git a/backend/migrations/20240109000001_s3_archive.sql b/backend/migrations/20240109000001_s3_archive.sql new file mode 100644 index 0000000..99078d8 --- /dev/null +++ b/backend/migrations/20240109000001_s3_archive.sql @@ -0,0 +1,34 @@ +-- Archive tracking for canonical block bundles uploaded to S3-compatible storage. + +CREATE TABLE IF NOT EXISTS archive_blocks ( + block_number BIGINT PRIMARY KEY, + object_key TEXT NOT NULL, + payload BYTEA, + schema_version SMALLINT NOT NULL, + retry_count INT NOT NULL DEFAULT 0, + next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_error TEXT, + uploaded_at TIMESTAMPTZ, + etag TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_archive_blocks_pending + ON archive_blocks (next_attempt_at, block_number) + WHERE uploaded_at IS NULL; + +CREATE INDEX IF NOT EXISTS idx_archive_blocks_uploaded + ON archive_blocks (uploaded_at) + WHERE uploaded_at IS NOT NULL; + +CREATE TABLE IF NOT EXISTS archive_state ( + stream TEXT PRIMARY KEY, + archive_start_block BIGINT NOT NULL, + latest_contiguous_uploaded_block BIGINT, + schema_version SMALLINT NOT NULL, + manifest_dirty BOOLEAN NOT NULL DEFAULT TRUE, + last_manifest_error TEXT, + manifest_updated_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/docker-compose.yml b/docker-compose.yml index 4ce3d37..96169c6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,30 @@ services: timeout: 5s retries: 5 + minio: + image: minio/minio:RELEASE.2024-01-16T16-07-38Z + command: server /data --address :9000 --console-address :9001 + environment: + MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin} + MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin} + ports: + - "9000:9000" + - "9001:9001" + volumes: + - minio-data:/data + + minio-init: + image: minio/mc:RELEASE.2024-01-16T16-06-34Z + depends_on: + minio: + condition: service_started + entrypoint: > + /bin/sh -c " + until mc alias set atlas http://minio:9000 ${MINIO_ROOT_USER:-minioadmin} ${MINIO_ROOT_PASSWORD:-minioadmin}; do sleep 1; done && + mc mb --ignore-existing atlas/${S3_BUCKET:-atlas-archive} + " + restart: "no" + atlas-server: build: context: ./backend @@ -35,7 +59,7 @@ services: FAUCET_ENABLED: ${FAUCET_ENABLED:-false} FAUCET_PRIVATE_KEY: ${FAUCET_PRIVATE_KEY:-} FAUCET_AMOUNT: ${FAUCET_AMOUNT:-} - FAUCET_COOLDOWN_MINUTES: ${FAUCET_COOLDOWN_MINUTES:-} + FAUCET_COOLDOWN_MINUTES: ${FAUCET_COOLDOWN_MINUTES:-30} CHAIN_NAME: ${CHAIN_NAME:-Unknown} CHAIN_LOGO_URL: ${CHAIN_LOGO_URL:-} ACCENT_COLOR: ${ACCENT_COLOR:-} @@ -43,6 +67,16 @@ services: BACKGROUND_COLOR_LIGHT: ${BACKGROUND_COLOR_LIGHT:-} SUCCESS_COLOR: ${SUCCESS_COLOR:-} ERROR_COLOR: ${ERROR_COLOR:-} + S3_ENABLED: ${S3_ENABLED:-false} + S3_BUCKET: ${S3_BUCKET:-atlas-archive} + S3_REGION: ${S3_REGION:-us-east-1} + S3_PREFIX: ${S3_PREFIX:-} + S3_ENDPOINT: ${S3_ENDPOINT:-http://minio:9000} + S3_FORCE_PATH_STYLE: ${S3_FORCE_PATH_STYLE:-true} + S3_UPLOAD_CONCURRENCY: ${S3_UPLOAD_CONCURRENCY:-4} + S3_RETRY_BASE_SECONDS: ${S3_RETRY_BASE_SECONDS:-30} + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-minioadmin} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-minioadmin} API_HOST: 0.0.0.0 API_PORT: 3000 RUST_LOG: atlas_server=info,tower_http=info @@ -66,3 +100,5 @@ services: volumes: pgdata: name: atlas_pgdata + minio-data: + name: atlas_minio_data