Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ members = [
"crates/extractors/solana-storage-proto",
"crates/extractors/tempo",
"crates/extractors/tempo/gen",
"crates/controller-admin-datasets",
"crates/controller-admin-system",
"crates/controller-admin-tables",
"crates/services/admin-api",
Expand Down
27 changes: 27 additions & 0 deletions crates/controller-admin-datasets/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "amp-controller-admin-datasets"
edition.workspace = true
version.workspace = true
license-file.workspace = true

[features]
# OpenAPI schema generation via utoipa
utoipa = ["dep:utoipa"]

[dependencies]
amp-data-store = { path = "../core/data-store" }
amp-datasets-raw = { path = "../core/datasets-raw", package = "datasets-raw" }
amp-datasets-registry = { path = "../core/datasets-registry" }
axum.workspace = true
common = { path = "../core/common" }
datafusion.workspace = true
datasets-common = { path = "../core/datasets-common" }
datasets-derived = { path = "../core/datasets-derived" }
futures.workspace = true
metadata-db = { path = "../core/metadata-db" }
monitoring = { path = "../core/monitoring" }
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
thiserror.workspace = true
tracing.workspace = true
utoipa = { version = "5.4.0", features = ["axum_extras"], optional = true }
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
//! Common utilities for HTTP handlers
//! Common utilities for dataset and manifest handlers

use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};

use amp_data_store::{DataStore, PhyTableRevision};
use amp_datasets_raw::dataset_kind::{
EvmRpcDatasetKind, FirehoseDatasetKind, SolanaDatasetKind, TempoDatasetKind,
};
use amp_datasets_registry::error::ResolveRevisionError;
use amp_parquet::footer::{AmpMetadataFromParquetError, amp_metadata_from_parquet_file};
use common::{
amp_catalog_provider::{AMP_CATALOG_NAME, AmpCatalogProvider, AsyncSchemaProvider},
context::plan::PlanContextBuilder,
Expand All @@ -34,7 +32,6 @@ use datasets_derived::{
manifest::{TableInput, View},
sorting::{self, CyclicDepError},
};
use futures::{StreamExt as _, stream};

/// Map of table names to their SQL references (table refs and function refs) using dependency aliases or self-references.
type TableReferencesMap = BTreeMap<
Expand Down Expand Up @@ -688,120 +685,3 @@ pub enum ManifestValidationError {
#[error("failed to create session config")]
SessionConfig(#[source] datafusion::error::DataFusionError),
}

/// Registers all files in a revision with their Amp-specific metadata.
///
/// Lists all files in the revision directory in object storage, extracts
/// Parquet metadata including Amp-specific block range information, and
/// registers each file in the metadata database.
///
/// Files are processed concurrently (up to 16 at a time).
#[tracing::instrument(skip_all, err)]
pub async fn register_revision_files(
store: &DataStore,
revision: &PhyTableRevision,
) -> Result<i32, RegisterRevisionFilesError> {
let files = store
.list_revision_files_in_object_store(revision)
.await
.map_err(RegisterRevisionFilesError::ListFiles)?;
let total_files = files.len();

// Process files in parallel using buffered stream
const CONCURRENT_METADATA_FETCHES: usize = 16;

let object_store = store.clone();
let mut file_stream = stream::iter(files.into_iter())
.map(|object_meta| {
let store = object_store.clone();
async move {
let (file_name, amp_meta, footer) =
amp_metadata_from_parquet_file(&store, &object_meta)
.await
.map_err(RegisterRevisionFilesError::ReadParquetMetadata)?;

let parquet_meta_json = serde_json::to_value(amp_meta)
.map_err(RegisterRevisionFilesError::SerializeMetadata)?;

let object_size = object_meta.size;
let object_e_tag = object_meta.e_tag;
let object_version = object_meta.version;

Ok((
file_name,
object_size,
object_e_tag,
object_version,
parquet_meta_json,
footer,
))
}
})
.buffered(CONCURRENT_METADATA_FETCHES);

// Register all files in the metadata database as they complete
while let Some(result) = file_stream.next().await {
let (file_name, object_size, object_e_tag, object_version, parquet_meta_json, footer) =
result?;
store
.register_revision_file(
revision,
&file_name,
object_size,
object_e_tag,
object_version,
parquet_meta_json,
&footer,
)
.await
.map_err(RegisterRevisionFilesError::RegisterFile)?;
}

Ok(total_files as i32)
}

/// Errors that occur when registering revision files
///
/// This error type is used by [`register_revision_files`].
#[derive(Debug, thiserror::Error)]
pub enum RegisterRevisionFilesError {
/// Failed to list files in the revision directory
///
/// This occurs when:
/// - The object storage path for the revision is inaccessible
/// - Network or permission errors when listing objects
/// - The revision directory does not exist in object storage
#[error("Failed to list files in revision")]
ListFiles(#[source] amp_data_store::ListRevisionFilesInObjectStoreError),

/// Failed to read Amp metadata from parquet file
///
/// This occurs when extracting Amp-specific metadata from a Parquet file fails.
/// Common causes include:
/// - Corrupted or invalid Parquet file structure
/// - Missing required metadata keys in the file
/// - Incompatible metadata schema version
/// - I/O errors reading from object store
/// - JSON parsing failures in metadata values
///
/// See `AmpMetadataFromParquetError` for specific error details.
#[error("Failed to read Amp metadata from parquet file")]
ReadParquetMetadata(#[source] AmpMetadataFromParquetError),

/// Failed to serialize parquet metadata to JSON
///
/// This occurs when:
/// - The extracted Amp metadata cannot be represented as valid JSON
/// - Serialization encounters unsupported types or values
#[error("Failed to serialize parquet metadata to JSON")]
SerializeMetadata(#[source] serde_json::Error),

/// Failed to register file in metadata database
///
/// This occurs when:
/// - Database connection or transaction errors during file registration
/// - Constraint violations when inserting file metadata
/// - Concurrent modification conflicts
#[error("Failed to register file in metadata database")]
RegisterFile(#[source] amp_data_store::RegisterFileError),
}
18 changes: 18 additions & 0 deletions crates/controller-admin-datasets/src/ctx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use amp_data_store::DataStore;
use amp_datasets_registry::DatasetsRegistry;
use common::{datasets_cache::DatasetsCache, udfs::eth_call::EthCallUdfsCache};
use metadata_db::MetadataDb;

/// The controller-admin-datasets context
#[derive(Clone)]
pub struct Ctx {
pub metadata_db: MetadataDb,
/// Datasets registry for manifest and version tag operations.
pub datasets_registry: DatasetsRegistry,
/// Datasets cache for loading datasets.
pub datasets_cache: DatasetsCache,
/// EthCall UDFs cache for eth_call UDF creation.
pub ethcall_udfs_cache: EthCallUdfsCache,
/// Object store for output data.
pub data_store: DataStore,
}
1 change: 1 addition & 0 deletions crates/controller-admin-datasets/src/datasets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod handlers;
8 changes: 8 additions & 0 deletions crates/controller-admin-datasets/src/datasets/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub mod delete;
pub mod delete_version;
pub mod get;
pub mod get_all;
pub mod get_manifest;
pub mod list_all;
pub mod list_versions;
pub mod register;
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use monitoring::logging;

use crate::{
ctx::Ctx,
handlers::error::{ErrorResponse, IntoErrorResponse},
error::{ErrorResponse, IntoErrorResponse},
};

/// Handler for the `DELETE /datasets/{namespace}/{name}` endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use monitoring::logging;

use crate::{
ctx::Ctx,
handlers::error::{ErrorResponse, IntoErrorResponse},
error::{ErrorResponse, IntoErrorResponse},
};

/// Handler for the `DELETE /datasets/{namespace}/{name}/versions/{version}` endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use monitoring::logging;

use crate::{
ctx::Ctx,
handlers::error::{ErrorResponse, IntoErrorResponse},
error::{ErrorResponse, IntoErrorResponse},
};

/// Handler for the `GET /datasets/{namespace}/{name}/versions/{revision}` endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use monitoring::logging;

use crate::{
ctx::Ctx,
handlers::error::{ErrorResponse, IntoErrorResponse},
error::{ErrorResponse, IntoErrorResponse},
};

/// Handler for the `GET /datasets` endpoint
Expand Down Expand Up @@ -38,7 +38,7 @@ use crate::{
operation_id = "datasets_list",
responses(
(status = 200, description = "Returns all datasets", body = DatasetsResponse),
(status = 500, description = "Internal server error", body = crate::handlers::error::ErrorResponse)
(status = 500, description = "Internal server error", body = ErrorResponse)
)
)
)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde_json::Value as JsonValue;

use crate::{
ctx::Ctx,
handlers::error::{ErrorResponse, IntoErrorResponse},
error::{ErrorResponse, IntoErrorResponse},
};

/// Handler for the `GET /datasets/{namespace}/{name}/versions/{revision}/manifest` endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use datasets_common::{name::Name, namespace::Namespace, version::Version};

use crate::{
ctx::Ctx,
handlers::error::{ErrorResponse, IntoErrorResponse},
error::{ErrorResponse, IntoErrorResponse},
};

/// Handler for the `GET /datasets` endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use monitoring::logging;

use crate::{
ctx::Ctx,
handlers::error::{ErrorResponse, IntoErrorResponse},
error::{ErrorResponse, IntoErrorResponse},
};

/// Handler for the `GET /datasets/{namespace}/{name}/versions` endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ use monitoring::logging;
use serde_json::value::RawValue;

use crate::{
ctx::Ctx,
handlers::{
common::{
DatasetKind, ManifestHeader, ManifestValidationError, ParseDerivedManifestError,
ParseRawManifestError, parse_and_canonicalize_derived_dataset_manifest,
parse_and_canonicalize_raw_dataset_manifest,
},
error::{ErrorResponse, IntoErrorResponse},
common::{
DatasetKind, ManifestHeader, ManifestValidationError, ParseDerivedManifestError,
ParseRawManifestError, parse_and_canonicalize_derived_dataset_manifest,
parse_and_canonicalize_raw_dataset_manifest,
},
ctx::Ctx,
error::{ErrorResponse, IntoErrorResponse},
};

/// Handler for the `POST /datasets` endpoint
Expand Down Expand Up @@ -113,8 +111,8 @@ use crate::{
request_body = RegisterRequest,
responses(
(status = 201, description = "Dataset successfully registered or updated", body = RegisterResponse),
(status = 400, description = "Invalid request format or manifest", body = crate::handlers::error::ErrorResponse),
(status = 500, description = "Internal server error", body = crate::handlers::error::ErrorResponse)
(status = 400, description = "Invalid request format or manifest", body = ErrorResponse),
(status = 500, description = "Internal server error", body = ErrorResponse)
)
)
)]
Expand Down
Loading
Loading