Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ set(ICEBERG_SOURCES
manifest/v3_metadata.cc
metadata_columns.cc
metrics_config.cc
metrics_reporters.cc
name_mapping.cc
partition_field.cc
partition_spec.cc
Expand Down
19 changes: 13 additions & 6 deletions src/iceberg/catalog/memory/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <algorithm>
#include <iterator>

#include "iceberg/metrics_reporters.h"
#include "iceberg/table.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
Expand Down Expand Up @@ -351,7 +352,12 @@ InMemoryCatalog::InMemoryCatalog(
properties_(std::move(properties)),
file_io_(std::move(file_io)),
warehouse_location_(std::move(warehouse_location)),
root_namespace_(std::make_unique<InMemoryNamespace>()) {}
root_namespace_(std::make_unique<InMemoryNamespace>()) {
auto reporter_result = MetricsReporters::Load(properties_);
if (reporter_result.has_value()) {
reporter_ = std::move(reporter_result.value());
}
}

InMemoryCatalog::~InMemoryCatalog() = default;

Expand Down Expand Up @@ -427,7 +433,8 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
ICEBERG_RETURN_UNEXPECTED(
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
return Table::Make(identifier, std::move(table_metadata),
std::move(metadata_file_location), file_io_, shared_from_this());
std::move(metadata_file_location), file_io_, shared_from_this(),
reporter_);
}

Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
Expand Down Expand Up @@ -478,7 +485,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated);

return Table::Make(identifier, std::move(updated), std::move(new_metadata_location),
file_io_, shared_from_this());
file_io_, shared_from_this(), reporter_);
}

Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
Expand All @@ -499,7 +506,7 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
TableMetadata::Make(*schema, *spec, *order, base_location, properties));
ICEBERG_ASSIGN_OR_RAISE(
auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_,
shared_from_this()));
shared_from_this(), reporter_));
return Transaction::Make(std::move(table), Transaction::Kind::kCreate,
/* auto_commit */ false);
}
Expand Down Expand Up @@ -537,7 +544,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
ICEBERG_ASSIGN_OR_RAISE(auto metadata,
TableMetadataUtil::Read(*file_io_, metadata_location));
return Table::Make(identifier, std::move(metadata), std::move(metadata_location),
file_io_, shared_from_this());
file_io_, shared_from_this(), reporter_);
}

Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
Expand All @@ -557,7 +564,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
return UnknownError("The registry failed.");
}
return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_,
shared_from_this());
shared_from_this(), reporter_);
}

} // namespace iceberg
2 changes: 2 additions & 0 deletions src/iceberg/catalog/memory/in_memory_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <shared_mutex>

#include "iceberg/catalog.h"
#include "iceberg/metrics_reporter.h"

namespace iceberg {

Expand Down Expand Up @@ -105,6 +106,7 @@ class ICEBERG_EXPORT InMemoryCatalog
std::shared_ptr<FileIO> file_io_;
std::string warehouse_location_;
std::unique_ptr<class InMemoryNamespace> root_namespace_;
std::shared_ptr<MetricsReporter> reporter_;
mutable std::shared_mutex mutex_;
};

Expand Down
32 changes: 22 additions & 10 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "iceberg/catalog/rest/rest_util.h"
#include "iceberg/catalog/rest/types.h"
#include "iceberg/json_serde_internal.h"
#include "iceberg/metrics_reporters.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
Expand Down Expand Up @@ -154,25 +155,35 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
auth_manager->CatalogSession(*client, final_config.configs()));

return std::shared_ptr<RestCatalog>(new RestCatalog(
std::move(final_config), std::move(file_io), std::move(client), std::move(paths),
std::move(endpoints), std::move(auth_manager), std::move(catalog_session)));
// Load metrics reporter from catalog properties
std::shared_ptr<MetricsReporter> reporter;
auto reporter_result = MetricsReporters::Load(final_config.configs());
if (reporter_result.has_value()) {
reporter = std::move(reporter_result.value());
}

return std::shared_ptr<RestCatalog>(
new RestCatalog(std::move(final_config), std::move(file_io), std::move(client),
std::move(paths), std::move(endpoints), std::move(auth_manager),
std::move(catalog_session), std::move(reporter)));
}

RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> file_io,
std::unique_ptr<HttpClient> client,
std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints,
std::unique_ptr<auth::AuthManager> auth_manager,
std::shared_ptr<auth::AuthSession> catalog_session)
std::shared_ptr<auth::AuthSession> catalog_session,
std::shared_ptr<MetricsReporter> reporter)
: config_(std::move(config)),
file_io_(std::move(file_io)),
client_(std::move(client)),
paths_(std::move(paths)),
name_(config_.Get(RestCatalogProperties::kName)),
supported_endpoints_(std::move(endpoints)),
auth_manager_(std::move(auth_manager)),
catalog_session_(std::move(catalog_session)) {
catalog_session_(std::move(catalog_session)),
reporter_(std::move(reporter)) {
ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be null");
}

Expand Down Expand Up @@ -338,7 +349,8 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
CreateTableInternal(identifier, schema, spec, order, location,
properties, /*stage_create=*/false));
return Table::Make(identifier, std::move(result.metadata),
std::move(result.metadata_location), file_io_, shared_from_this());
std::move(result.metadata_location), file_io_, shared_from_this(),
reporter_);
}

Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
Expand Down Expand Up @@ -369,7 +381,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(

return Table::Make(identifier, std::move(commit_response.metadata),
std::move(commit_response.metadata_location), file_io_,
shared_from_this());
shared_from_this(), reporter_);
}

Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
Expand All @@ -383,7 +395,7 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
StagedTable::Make(identifier, std::move(result.metadata),
std::move(result.metadata_location), file_io_,
shared_from_this()));
shared_from_this(), reporter_));
return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate,
/*auto_commit=*/false);
}
Expand Down Expand Up @@ -446,7 +458,7 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide

return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
shared_from_this());
shared_from_this(), reporter_);
}

Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
Expand All @@ -469,7 +481,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
shared_from_this());
shared_from_this(), reporter_);
}

} // namespace iceberg::rest
5 changes: 4 additions & 1 deletion src/iceberg/catalog/rest/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "iceberg/catalog/rest/endpoint.h"
#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/type_fwd.h"
#include "iceberg/metrics_reporter.h"
#include "iceberg/result.h"

/// \file iceberg/catalog/rest/rest_catalog.h
Expand Down Expand Up @@ -109,7 +110,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
std::unique_ptr<HttpClient> client, std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints,
std::unique_ptr<auth::AuthManager> auth_manager,
std::shared_ptr<auth::AuthSession> catalog_session);
std::shared_ptr<auth::AuthSession> catalog_session,
std::shared_ptr<MetricsReporter> reporter);

Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;

Expand All @@ -127,6 +129,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
std::unordered_set<Endpoint> supported_endpoints_;
std::unique_ptr<auth::AuthManager> auth_manager_;
std::shared_ptr<auth::AuthSession> catalog_session_;
std::shared_ptr<MetricsReporter> reporter_;
};

} // namespace iceberg::rest
7 changes: 7 additions & 0 deletions src/iceberg/manifest/manifest_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,30 +321,36 @@ ManifestGroup::ReadEntries() {
ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest));
if (!should_match) {
// Skip this manifest because it doesn't match partition filter
scan_counters_.skipped_data_manifests++;
continue;
}

if (ignore_deleted_) {
// only scan manifests that have entries other than deletes
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
scan_counters_.skipped_data_manifests++;
continue;
}
}

if (ignore_existing_) {
// only scan manifests that have entries other than existing
if (!manifest.has_added_files() && !manifest.has_deleted_files()) {
scan_counters_.skipped_data_manifests++;
continue;
}
}

scan_counters_.scanned_data_manifests++;

// Read manifest entries
ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
ICEBERG_ASSIGN_OR_RAISE(auto entries,
ignore_deleted_ ? reader->LiveEntries() : reader->Entries());

for (auto& entry : entries) {
if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
scan_counters_.skipped_data_files++;
continue;
}

Expand All @@ -354,6 +360,7 @@ ManifestGroup::ReadEntries() {
}

if (!manifest_entry_predicate_(entry)) {
scan_counters_.skipped_data_files++;
continue;
}

Expand Down
14 changes: 14 additions & 0 deletions src/iceberg/manifest/manifest_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@

namespace iceberg {

/// \brief Counters for tracking scan metrics during manifest processing.
struct ICEBERG_EXPORT ScanMetricsCounters {
int64_t scanned_data_manifests = 0;
int64_t skipped_data_manifests = 0;
int64_t scanned_delete_manifests = 0;
int64_t skipped_delete_manifests = 0;
int64_t skipped_data_files = 0;
int64_t skipped_delete_files = 0;
};

/// \brief Context passed to task creation functions.
struct ICEBERG_EXPORT TaskContext {
public:
Expand Down Expand Up @@ -120,6 +130,9 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector {
/// \param column_ids Field IDs of columns whose statistics should be preserved.
ManifestGroup& ColumnsToKeepStats(std::unordered_set<int32_t> column_ids);

/// \brief Returns the scan metrics counters accumulated during plan operations.
const ScanMetricsCounters& scan_counters() const { return scan_counters_; }

/// \brief Plan scan tasks for all matching data files.
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles();

Expand Down Expand Up @@ -162,6 +175,7 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector {
bool ignore_deleted_ = false;
bool ignore_existing_ = false;
bool ignore_residuals_ = false;
ScanMetricsCounters scan_counters_;
};

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ iceberg_sources = files(
'manifest/v3_metadata.cc',
'metadata_columns.cc',
'metrics_config.cc',
'metrics_reporters.cc',
'name_mapping.cc',
'partition_field.cc',
'partition_spec.cc',
Expand Down
Loading