diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 21e87bee4..89797a38d 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -58,6 +58,7 @@ set(ICEBERG_SOURCES manifest/v3_metadata.cc metadata_columns.cc metrics_config.cc + metrics_reporters.cc name_mapping.cc partition_field.cc partition_spec.cc diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index 8a082aad1..032f79828 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -22,6 +22,7 @@ #include #include +#include "iceberg/metrics_reporters.h" #include "iceberg/table.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" @@ -351,7 +352,12 @@ InMemoryCatalog::InMemoryCatalog( properties_(std::move(properties)), file_io_(std::move(file_io)), warehouse_location_(std::move(warehouse_location)), - root_namespace_(std::make_unique()) {} + root_namespace_(std::make_unique()) { + auto reporter_result = MetricsReporters::Load(properties_); + if (reporter_result.has_value()) { + reporter_ = std::move(reporter_result.value()); + } +} InMemoryCatalog::~InMemoryCatalog() = default; @@ -427,7 +433,8 @@ Result> InMemoryCatalog::CreateTable( ICEBERG_RETURN_UNEXPECTED( root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location)); return Table::Make(identifier, std::move(table_metadata), - std::move(metadata_file_location), file_io_, shared_from_this()); + std::move(metadata_file_location), file_io_, shared_from_this(), + reporter_); } Result> InMemoryCatalog::UpdateTable( @@ -478,7 +485,7 @@ Result> InMemoryCatalog::UpdateTable( TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated); return Table::Make(identifier, std::move(updated), std::move(new_metadata_location), - file_io_, shared_from_this()); + file_io_, shared_from_this(), reporter_); } Result> InMemoryCatalog::StageCreateTable( @@ -499,7 +506,7 @@ Result> InMemoryCatalog::StageCreateTable( TableMetadata::Make(*schema, *spec, *order, base_location, properties)); ICEBERG_ASSIGN_OR_RAISE( auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_, - shared_from_this())); + shared_from_this(), reporter_)); return Transaction::Make(std::move(table), Transaction::Kind::kCreate, /* auto_commit */ false); } @@ -537,7 +544,7 @@ Result> InMemoryCatalog::LoadTable( ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadataUtil::Read(*file_io_, metadata_location)); return Table::Make(identifier, std::move(metadata), std::move(metadata_location), - file_io_, shared_from_this()); + file_io_, shared_from_this(), reporter_); } Result> InMemoryCatalog::RegisterTable( @@ -557,7 +564,7 @@ Result> InMemoryCatalog::RegisterTable( return UnknownError("The registry failed."); } return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } } // namespace iceberg diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index 22a596c10..81b4671eb 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -22,6 +22,7 @@ #include #include "iceberg/catalog.h" +#include "iceberg/metrics_reporter.h" namespace iceberg { @@ -105,6 +106,7 @@ class ICEBERG_EXPORT InMemoryCatalog std::shared_ptr file_io_; std::string warehouse_location_; std::unique_ptr root_namespace_; + std::shared_ptr reporter_; mutable std::shared_mutex mutex_; }; diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 94c6b1e4e..237144bd0 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -37,6 +37,7 @@ #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/catalog/rest/types.h" #include "iceberg/json_serde_internal.h" +#include "iceberg/metrics_reporters.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" @@ -154,9 +155,17 @@ Result> RestCatalog::Make( ICEBERG_ASSIGN_OR_RAISE(auto catalog_session, auth_manager->CatalogSession(*client, final_config.configs())); - return std::shared_ptr(new RestCatalog( - std::move(final_config), std::move(file_io), std::move(client), std::move(paths), - std::move(endpoints), std::move(auth_manager), std::move(catalog_session))); + // Load metrics reporter from catalog properties + std::shared_ptr reporter; + auto reporter_result = MetricsReporters::Load(final_config.configs()); + if (reporter_result.has_value()) { + reporter = std::move(reporter_result.value()); + } + + return std::shared_ptr( + new RestCatalog(std::move(final_config), std::move(file_io), std::move(client), + std::move(paths), std::move(endpoints), std::move(auth_manager), + std::move(catalog_session), std::move(reporter))); } RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr file_io, @@ -164,7 +173,8 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr f std::unique_ptr paths, std::unordered_set endpoints, std::unique_ptr auth_manager, - std::shared_ptr catalog_session) + std::shared_ptr catalog_session, + std::shared_ptr reporter) : config_(std::move(config)), file_io_(std::move(file_io)), client_(std::move(client)), @@ -172,7 +182,8 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr f name_(config_.Get(RestCatalogProperties::kName)), supported_endpoints_(std::move(endpoints)), auth_manager_(std::move(auth_manager)), - catalog_session_(std::move(catalog_session)) { + catalog_session_(std::move(catalog_session)), + reporter_(std::move(reporter)) { ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be null"); } @@ -338,7 +349,8 @@ Result> RestCatalog::CreateTable( CreateTableInternal(identifier, schema, spec, order, location, properties, /*stage_create=*/false)); return Table::Make(identifier, std::move(result.metadata), - std::move(result.metadata_location), file_io_, shared_from_this()); + std::move(result.metadata_location), file_io_, shared_from_this(), + reporter_); } Result> RestCatalog::UpdateTable( @@ -369,7 +381,7 @@ Result> RestCatalog::UpdateTable( return Table::Make(identifier, std::move(commit_response.metadata), std::move(commit_response.metadata_location), file_io_, - shared_from_this()); + shared_from_this(), reporter_); } Result> RestCatalog::StageCreateTable( @@ -383,7 +395,7 @@ Result> RestCatalog::StageCreateTable( ICEBERG_ASSIGN_OR_RAISE(auto staged_table, StagedTable::Make(identifier, std::move(result.metadata), std::move(result.metadata_location), file_io_, - shared_from_this())); + shared_from_this(), reporter_)); return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate, /*auto_commit=*/false); } @@ -446,7 +458,7 @@ Result> RestCatalog::LoadTable(const TableIdentifier& ide return Table::Make(identifier, std::move(load_result.metadata), std::move(load_result.metadata_location), file_io_, - shared_from_this()); + shared_from_this(), reporter_); } Result> RestCatalog::RegisterTable( @@ -469,7 +481,7 @@ Result> RestCatalog::RegisterTable( ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); return Table::Make(identifier, std::move(load_result.metadata), std::move(load_result.metadata_location), file_io_, - shared_from_this()); + shared_from_this(), reporter_); } } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 5cc61eae2..e3cbfa93e 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -28,6 +28,7 @@ #include "iceberg/catalog/rest/endpoint.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/catalog/rest/type_fwd.h" +#include "iceberg/metrics_reporter.h" #include "iceberg/result.h" /// \file iceberg/catalog/rest/rest_catalog.h @@ -109,7 +110,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, std::unique_ptr client, std::unique_ptr paths, std::unordered_set endpoints, std::unique_ptr auth_manager, - std::shared_ptr catalog_session); + std::shared_ptr catalog_session, + std::shared_ptr reporter); Result LoadTableInternal(const TableIdentifier& identifier) const; @@ -127,6 +129,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, std::unordered_set supported_endpoints_; std::unique_ptr auth_manager_; std::shared_ptr catalog_session_; + std::shared_ptr reporter_; }; } // namespace iceberg::rest diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 220b8585c..24f48fa8a 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -321,12 +321,14 @@ ManifestGroup::ReadEntries() { ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest)); if (!should_match) { // Skip this manifest because it doesn't match partition filter + scan_counters_.skipped_data_manifests++; continue; } if (ignore_deleted_) { // only scan manifests that have entries other than deletes if (!manifest.has_added_files() && !manifest.has_existing_files()) { + scan_counters_.skipped_data_manifests++; continue; } } @@ -334,10 +336,13 @@ ManifestGroup::ReadEntries() { if (ignore_existing_) { // only scan manifests that have entries other than existing if (!manifest.has_added_files() && !manifest.has_deleted_files()) { + scan_counters_.skipped_data_manifests++; continue; } } + scan_counters_.scanned_data_manifests++; + // Read manifest entries ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); ICEBERG_ASSIGN_OR_RAISE(auto entries, @@ -345,6 +350,7 @@ ManifestGroup::ReadEntries() { for (auto& entry : entries) { if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { + scan_counters_.skipped_data_files++; continue; } @@ -354,6 +360,7 @@ ManifestGroup::ReadEntries() { } if (!manifest_entry_predicate_(entry)) { + scan_counters_.skipped_data_files++; continue; } diff --git a/src/iceberg/manifest/manifest_group.h b/src/iceberg/manifest/manifest_group.h index 10b552786..de7127da2 100644 --- a/src/iceberg/manifest/manifest_group.h +++ b/src/iceberg/manifest/manifest_group.h @@ -39,6 +39,16 @@ namespace iceberg { +/// \brief Counters for tracking scan metrics during manifest processing. +struct ICEBERG_EXPORT ScanMetricsCounters { + int64_t scanned_data_manifests = 0; + int64_t skipped_data_manifests = 0; + int64_t scanned_delete_manifests = 0; + int64_t skipped_delete_manifests = 0; + int64_t skipped_data_files = 0; + int64_t skipped_delete_files = 0; +}; + /// \brief Context passed to task creation functions. struct ICEBERG_EXPORT TaskContext { public: @@ -120,6 +130,9 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { /// \param column_ids Field IDs of columns whose statistics should be preserved. ManifestGroup& ColumnsToKeepStats(std::unordered_set column_ids); + /// \brief Returns the scan metrics counters accumulated during plan operations. + const ScanMetricsCounters& scan_counters() const { return scan_counters_; } + /// \brief Plan scan tasks for all matching data files. Result>> PlanFiles(); @@ -162,6 +175,7 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { bool ignore_deleted_ = false; bool ignore_existing_ = false; bool ignore_residuals_ = false; + ScanMetricsCounters scan_counters_; }; } // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index bfc502fd8..b0edaf650 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -76,6 +76,7 @@ iceberg_sources = files( 'manifest/v3_metadata.cc', 'metadata_columns.cc', 'metrics_config.cc', + 'metrics_reporters.cc', 'name_mapping.cc', 'partition_field.cc', 'partition_spec.cc', diff --git a/src/iceberg/metrics_reporter.h b/src/iceberg/metrics_reporter.h new file mode 100644 index 000000000..e819c13c0 --- /dev/null +++ b/src/iceberg/metrics_reporter.h @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Duration type for metrics reporting in milliseconds. +using DurationMs = std::chrono::milliseconds; + +/// \brief Report generated after a table scan operation. +/// +/// Contains metrics about the planning and execution of a table scan, +/// including information about manifests and data files processed. +struct ICEBERG_EXPORT ScanReport { + /// \brief The fully qualified name of the table that was scanned. + std::string table_name; + + /// \brief Snapshot ID that was scanned, if available. + int64_t snapshot_id = -1; + + /// \brief Filter expression used in the scan, if any. + std::string filter; + + /// \brief Schema ID. + int32_t schema_id = -1; + + /// \brief Total duration of the entire scan operation. + DurationMs total_duration{0}; + + /// \brief Duration spent planning the scan. + DurationMs total_planning_duration{0}; + + /// \brief Number of data files in the scan result. + int64_t result_data_files = 0; + + /// \brief Number of delete files in the scan result. + int64_t result_delete_files = 0; + + /// \brief Total number of data manifests. + int64_t total_data_manifests = 0; + + /// \brief Number of data manifests that were skipped. + int64_t skipped_data_files = 0; + + /// \brief Number of data manifests that were skipped. + int64_t skipped_delete_files = 0; + + /// \brief Number of data manifests that were scanned. + int64_t scanned_data_manifests = 0; + + /// \brief Number of data manifests that were skipped due to filtering. + int64_t skipped_data_manifests = 0; + + /// \brief Total number of delete manifests. + int64_t total_delete_manifests = 0; + + /// \brief Number of delete manifests that were scanned. + int64_t scanned_delete_manifests = 0; + + /// \brief Number of delete manifests that were skipped. + int64_t skipped_delete_manifests = 0; + + /// \brief Projected field IDs from the scan schema. + std::vector projected_field_ids; + /// \brief Projected field names from the scan schema. + std::vector projected_field_names; + /// \brief Total size in bytes of all result data files. + int64_t total_file_size_in_bytes = 0; + /// \brief Total size in bytes of all result delete files. + int64_t total_delete_file_size_in_bytes = 0; + /// \brief Number of indexed delete files. + int64_t indexed_delete_files = 0; + /// \brief Number of equality delete files in the scan result. + int64_t equality_delete_files = 0; + /// \brief Number of positional delete files in the scan result. + int64_t positional_delete_files = 0; + /// \brief Number of deletion vectors in the scan result. + int64_t dvs = 0; + /// \brief Additional key-value metadata. + std::unordered_map metadata; +}; + +/// \brief Report generated after a commit operation. +/// +/// Contains metrics about the changes made in a commit, including +/// files added/removed and retry information. +struct ICEBERG_EXPORT CommitReport { + /// \brief The fully qualified name of the table that was modified. + std::string table_name; + + /// \brief The snapshot ID created by this commit. + int64_t snapshot_id = -1; + + /// \brief The sequence number assigned to this commit. + int64_t sequence_number = -1; + + /// \brief The operation that was performed (append, overwrite, delete, etc.). + std::string operation; + + /// \brief Number of commit attempts (1 = success on first try). + int32_t attempts = 1; + + /// \brief Number of data files added in this commit. + int64_t added_data_files = 0; + + /// \brief Number of data files removed in this commit. + int64_t removed_data_files = 0; + + /// \brief Total number of data files after this commit. + int64_t total_data_files = 0; + + /// \brief Number of delete files added in this commit. + int64_t added_delete_files = 0; + + /// \brief Number of delete files removed in this commit. + int64_t removed_delete_files = 0; + + /// \brief Total number of delete files after this commit. + int64_t total_delete_files = 0; + + /// \brief Number of records added in this commit. + int64_t added_records = 0; + + /// \brief Number of records removed in this commit. + int64_t removed_records = 0; + + /// \brief Size in bytes of files added. + int64_t added_files_size = 0; + + /// \brief Size in bytes of files removed. + int64_t removed_files_size = 0; + + /// \brief Total duration of the commit operation. + DurationMs total_duration{0}; + /// \brief Total records after this commit. + int64_t total_records = 0; + /// \brief Total file size in bytes after this commit. + int64_t total_files_size = 0; + /// \brief Equality delete files added. + int64_t added_equality_delete_files = 0; + /// \brief Equality delete files removed. + int64_t removed_equality_delete_files = 0; + /// \brief Positional delete files added. + int64_t added_positional_delete_files = 0; + /// \brief Positional delete files removed. + int64_t removed_positional_delete_files = 0; + /// \brief Position delete records added. + int64_t added_positional_deletes = 0; + /// \brief Position delete records removed. + int64_t removed_positional_deletes = 0; + /// \brief Total position delete records. + int64_t total_positional_deletes = 0; + /// \brief Equality delete records added. + int64_t added_equality_deletes = 0; + /// \brief Equality delete records removed. + int64_t removed_equality_deletes = 0; + /// \brief Total equality delete records. + int64_t total_equality_deletes = 0; + /// \brief Deletion vectors added. + int64_t added_dvs = 0; + /// \brief Deletion vectors removed. + int64_t removed_dvs = 0; + /// \brief Manifests created in this commit. + int64_t manifests_created = 0; + /// \brief Manifests replaced in this commit. + int64_t manifests_replaced = 0; + /// \brief Manifests kept in this commit. + int64_t manifests_kept = 0; + /// \brief Manifest entries processed. + int64_t manifest_entries_processed = 0; + /// \brief Additional key-value metadata. + std::unordered_map metadata; +}; + +/// \brief The type of a metrics report. +enum class MetricsReportType { + kScanReport, + kCommitReport, +}; + +/// \brief Get the string representation of a metrics report type. +ICEBERG_EXPORT constexpr std::string_view ToString(MetricsReportType type) noexcept { + switch (type) { + case MetricsReportType::kScanReport: + return "scan"; + case MetricsReportType::kCommitReport: + return "commit"; + } + std::unreachable(); +} + +/// \brief A metrics report, which can be either a ScanReport or CommitReport. +/// +/// This variant type allows handling both report types uniformly through +/// the MetricsReporter interface. +using MetricsReport = std::variant; + +/// \brief Get the type of a metrics report. +/// +/// \param report The metrics report to get the type of. +/// \return The type of the metrics report. +ICEBERG_EXPORT inline MetricsReportType GetReportType(const MetricsReport& report) { + return std::visit( + [](const auto& r) -> MetricsReportType { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return MetricsReportType::kScanReport; + } else { + return MetricsReportType::kCommitReport; + } + }, + report); +} + +/// \brief Interface for reporting metrics from Iceberg operations. +/// +/// Implementations of this interface can be used to collect and report +/// metrics about scan and commit operations. Common implementations include +/// logging reporters, metrics collectors, and the noop reporter for testing. +class ICEBERG_EXPORT MetricsReporter { + public: + virtual ~MetricsReporter() = default; + + /// \brief Report a metrics report. + /// + /// Implementations should handle the report according to their purpose + /// (e.g., logging, sending to a metrics service, etc.). + /// + /// \param report The metrics report to process. + virtual void Report(const MetricsReport& report) = 0; +}; + +} // namespace iceberg diff --git a/src/iceberg/metrics_reporters.cc b/src/iceberg/metrics_reporters.cc new file mode 100644 index 000000000..02bca93ed --- /dev/null +++ b/src/iceberg/metrics_reporters.cc @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics_reporters.h" + +#include + +#include "iceberg/util/string_util.h" + +namespace iceberg { + +namespace { + +/// \brief Registry type for MetricsReporter factories with heterogeneous lookup support. +using MetricsReporterRegistry = std::unordered_map; + +/// \brief Get the set of known metrics reporter types. +const std::unordered_set& DefaultReporterTypes() { + static const std::unordered_set kReporterTypes = { + std::string(kMetricsReporterTypeNoop), + }; + return kReporterTypes; +} + +/// \brief Infer the reporter type from properties. +std::string InferReporterType( + const std::unordered_map& properties) { + auto it = properties.find(std::string(kMetricsReporterImpl)); + if (it != properties.end() && !it->second.empty()) { + return StringUtils::ToLower(it->second); + } + // Default to noop reporter + return std::string(kMetricsReporterTypeNoop); +} + +/// \brief Metrics reporter that does nothing. +/// +/// This is the default reporter used when no reporter is configured. +/// It silently discards all reports. +class NoopMetricsReporter : public MetricsReporter { + public: + static Result> Make( + [[maybe_unused]] const std::unordered_map& properties) { + return std::make_unique(); + } + + void Report([[maybe_unused]] const MetricsReport& report) override { + // Intentionally empty - noop implementation discards all reports + } +}; + +/// \brief Template helper to create factory functions for reporter types. +template +MetricsReporterFactory MakeReporterFactory() { + return [](const std::unordered_map& props) + -> Result> { return T::Make(props); }; +} + +/// \brief Create the default registry with built-in reporters. +MetricsReporterRegistry CreateDefaultRegistry() { + return { + {std::string(kMetricsReporterTypeNoop), MakeReporterFactory()}, + }; +} + +/// \brief Get the global registry of metrics reporter factories. +MetricsReporterRegistry& GetRegistry() { + static MetricsReporterRegistry registry = CreateDefaultRegistry(); + return registry; +} + +} // namespace + +void MetricsReporters::Register(std::string_view reporter_type, + MetricsReporterFactory factory) { + GetRegistry()[StringUtils::ToLower(reporter_type)] = std::move(factory); +} + +Result> MetricsReporters::Load( + const std::unordered_map& properties) { + std::string reporter_type = InferReporterType(properties); + + auto& registry = GetRegistry(); + auto it = registry.find(reporter_type); + if (it == registry.end()) { + if (DefaultReporterTypes().contains(reporter_type)) { + return NotImplemented("Metrics reporter type '{}' is not yet supported", + reporter_type); + } + return InvalidArgument("Unknown metrics reporter type: '{}'", reporter_type); + } + + return it->second(properties); +} + +} // namespace iceberg diff --git a/src/iceberg/metrics_reporters.h b/src/iceberg/metrics_reporters.h new file mode 100644 index 000000000..82566d017 --- /dev/null +++ b/src/iceberg/metrics_reporters.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics_reporters.h +/// \brief Factory for creating MetricsReporter instances. + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/metrics_reporter.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Property key for configuring the metrics reporter implementation. +/// +/// Set this property in catalog properties to specify which metrics reporter +/// implementation to use. The value should match a registered reporter type. +inline constexpr std::string_view kMetricsReporterImpl = "metrics-reporter-impl"; + +/// \brief Property value for the noop metrics reporter. +inline constexpr std::string_view kMetricsReporterTypeNoop = "noop"; + +/// \brief Function type for creating MetricsReporter instances. +/// +/// \param properties Configuration properties for the reporter. +/// \return A new MetricsReporter instance or an error. +using MetricsReporterFactory = std::function>( + const std::unordered_map& properties)>; + +/// \brief Factory class for creating and managing MetricsReporter instances. +/// +/// This class provides a registry-based factory for creating MetricsReporter +/// implementations. Custom reporter implementations can be registered using +/// the Register() method. +class ICEBERG_EXPORT MetricsReporters { + public: + /// \brief Load a metrics reporter based on properties. + /// + /// This method looks up the "metrics-reporter-impl" property to determine + /// which reporter implementation to create. If not specified, returns a + /// NoopMetricsReporter. + /// + /// \param properties Configuration properties containing reporter type. + /// \return A new MetricsReporter instance or an error. + static Result> Load( + const std::unordered_map& properties); + + /// \brief Register a factory for a metrics reporter type. + /// + /// This method is not thread-safe. All registrations should be done during + /// application startup before any concurrent access to Load(). + /// + /// \param reporter_type Case-insensitive type identifier (e.g., "noop"). + /// \param factory Factory function that produces the reporter. + static void Register(std::string_view reporter_type, MetricsReporterFactory factory); +}; + +} // namespace iceberg diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 0550f61d5..1c470b1a3 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -23,6 +23,7 @@ #include "iceberg/catalog.h" #include "iceberg/location_provider.h" +#include "iceberg/metrics_reporters.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" @@ -46,7 +47,8 @@ Result> Table::Make(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) { + std::shared_ptr catalog, + std::shared_ptr reporter) { if (metadata == nullptr) [[unlikely]] { return InvalidArgument("Metadata cannot be null"); } @@ -61,20 +63,29 @@ Result> Table::Make(TableIdentifier identifier, } return std::shared_ptr(new Table(std::move(identifier), std::move(metadata), std::move(metadata_location), std::move(io), - std::move(catalog))); + std::move(catalog), std::move(reporter))); } Table::~Table() = default; Table::Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) + std::shared_ptr catalog, std::shared_ptr reporter) : identifier_(std::move(identifier)), metadata_(std::move(metadata)), metadata_location_(std::move(metadata_location)), io_(std::move(io)), catalog_(std::move(catalog)), - metadata_cache_(std::make_unique(metadata_.get())) {} + reporter_(std::move(reporter)), + metadata_cache_(std::make_unique(metadata_.get())) { + if (!reporter_) { + // Fall back to noop reporter if none provided + auto noop = MetricsReporters::Load({}); + if (noop.has_value()) { + reporter_ = std::move(noop.value()); + } + } +} const std::string& Table::uuid() const { return metadata_->table_uuid; } @@ -145,12 +156,14 @@ const std::shared_ptr& Table::metadata() const { return metadata_ const std::shared_ptr& Table::catalog() const { return catalog_; } +const std::shared_ptr& Table::reporter() const { return reporter_; } + Result> Table::location_provider() const { return LocationProvider::Make(metadata_->location, metadata_->properties); } Result> Table::NewScan() const { - return TableScanBuilder::Make(metadata_, io_); + return TableScanBuilder::Make(metadata_, io_, reporter_, identifier_.ToString()); } Result> Table::NewTransaction() { @@ -230,7 +243,7 @@ Result> Table::NewSnapshotManager() { Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) { + std::shared_ptr catalog, std::shared_ptr reporter) { if (metadata == nullptr) [[unlikely]] { return InvalidArgument("Metadata cannot be null"); } @@ -240,9 +253,9 @@ Result> StagedTable::Make( if (catalog == nullptr) [[unlikely]] { return InvalidArgument("Catalog cannot be null"); } - return std::shared_ptr( - new StagedTable(std::move(identifier), std::move(metadata), - std::move(metadata_location), std::move(io), std::move(catalog))); + return std::shared_ptr(new StagedTable( + std::move(identifier), std::move(metadata), std::move(metadata_location), + std::move(io), std::move(catalog), std::move(reporter))); } StagedTable::~StagedTable() = default; @@ -260,9 +273,9 @@ Result> StaticTable::Make( if (io == nullptr) [[unlikely]] { return InvalidArgument("FileIO cannot be null"); } - return std::shared_ptr( - new StaticTable(std::move(identifier), std::move(metadata), - std::move(metadata_location), std::move(io), /*catalog=*/nullptr)); + return std::shared_ptr(new StaticTable( + std::move(identifier), std::move(metadata), std::move(metadata_location), + std::move(io), /*catalog=*/nullptr, /*reporter=*/nullptr)); } StaticTable::~StaticTable() = default; diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 423911c21..a4286710f 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -26,6 +26,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics_reporter.h" #include "iceberg/snapshot.h" #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" @@ -43,11 +44,11 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \param[in] metadata_location The location of the table metadata file. /// \param[in] io The FileIO to read and write table data and metadata files. /// \param[in] catalog The catalog that this table belongs to. - static Result> Make(TableIdentifier identifier, - std::shared_ptr metadata, - std::string metadata_location, - std::shared_ptr io, - std::shared_ptr catalog); + static Result> Make( + TableIdentifier identifier, std::shared_ptr metadata, + std::string metadata_location, std::shared_ptr io, + std::shared_ptr catalog, + std::shared_ptr reporter = nullptr); virtual ~Table(); @@ -117,6 +118,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \brief Returns the catalog that this table belongs to const std::shared_ptr& catalog() const; + /// \brief Returns the metrics reporter for this table + const std::shared_ptr& reporter() const; + /// \brief Returns a LocationProvider for this table Result> location_provider() const; @@ -174,13 +178,14 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog); + std::shared_ptr catalog, std::shared_ptr reporter); const TableIdentifier identifier_; std::shared_ptr metadata_; std::string metadata_location_; std::shared_ptr io_; std::shared_ptr catalog_; + std::shared_ptr reporter_; std::unique_ptr metadata_cache_; }; @@ -190,7 +195,8 @@ class ICEBERG_EXPORT StagedTable final : public Table { static Result> Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog); + std::shared_ptr catalog, + std::shared_ptr reporter = nullptr); ~StagedTable() override; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index eeec262e9..8db157c68 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -19,6 +19,7 @@ #include "iceberg/table_scan.h" +#include #include #include @@ -211,16 +212,22 @@ Result FileScanTask::ToArrow( } Result> TableScanBuilder::Make( - std::shared_ptr metadata, std::shared_ptr io) { + std::shared_ptr metadata, std::shared_ptr io, + std::shared_ptr reporter, const std::string& table_name) { ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null"); ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null"); - return std::unique_ptr( - new TableScanBuilder(std::move(metadata), std::move(io))); + return std::unique_ptr(new TableScanBuilder( + std::move(metadata), std::move(io), std::move(reporter), table_name)); } TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, - std::shared_ptr file_io) - : metadata_(std::move(table_metadata)), io_(std::move(file_io)) {} + std::shared_ptr file_io, + std::shared_ptr reporter, + const std::string& table_name) + : metadata_(std::move(table_metadata)), io_(std::move(file_io)) { + context_.reporter = std::move(reporter); + context_.table_name = table_name; +} TableScanBuilder& TableScanBuilder::Option(std::string key, std::string value) { context_.options[std::move(key)] = std::move(value); @@ -473,6 +480,8 @@ DataTableScan::DataTableScan(std::shared_ptr metadata, std::move(context)) {} Result>> DataTableScan::PlanFiles() const { + auto scan_start = std::chrono::steady_clock::now(); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot()); if (!snapshot) { return std::vector>{}; @@ -498,7 +507,70 @@ Result>> DataTableScan::PlanFiles() co if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); } - return manifest_group->PlanFiles(); + + ICEBERG_ASSIGN_OR_RAISE(auto tasks, manifest_group->PlanFiles()); + + // Report scan metrics if a reporter is configured + if (context_.reporter) { + auto scan_end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(scan_end - scan_start); + + ScanReport report; + report.table_name = context_.table_name; + report.snapshot_id = snapshot->snapshot_id; + report.schema_id = schema_ ? schema_->schema_id() : -1; + if (context_.filter) { + report.filter = context_.filter->ToString(); + } + report.total_planning_duration = duration; + + // Manifest counts from ManifestGroup counters + const auto& counters = manifest_group->scan_counters(); + report.total_data_manifests = static_cast(data_manifests.size()); + report.total_delete_manifests = static_cast(delete_manifests.size()); + report.scanned_data_manifests = counters.scanned_data_manifests; + report.skipped_data_manifests = counters.skipped_data_manifests; + report.scanned_delete_manifests = counters.scanned_delete_manifests; + report.skipped_delete_manifests = counters.skipped_delete_manifests; + report.skipped_data_files = counters.skipped_data_files; + report.skipped_delete_files = counters.skipped_delete_files; + + // Result counts and file sizes from tasks + report.result_data_files = static_cast(tasks.size()); + for (const auto& task : tasks) { + report.total_file_size_in_bytes += task->data_file()->file_size_in_bytes; + for (const auto& del_file : task->delete_files()) { + report.total_delete_file_size_in_bytes += del_file->file_size_in_bytes; + switch (del_file->content) { + case DataFile::Content::kEqualityDeletes: + report.equality_delete_files++; + break; + case DataFile::Content::kPositionDeletes: + report.positional_delete_files++; + break; + default: + break; + } + } + report.result_delete_files += static_cast(task->delete_files().size()); + } + + // Projected fields from resolved schema + auto proj_schema_result = ResolveProjectedSchema(); + if (proj_schema_result.has_value()) { + const auto& proj_schema = proj_schema_result.value().get(); + if (proj_schema) { + for (const auto& field : proj_schema->fields()) { + report.projected_field_ids.push_back(field.field_id()); + report.projected_field_names.emplace_back(field.name()); + } + } + } + + context_.reporter->Report(report); + } + + return tasks; } } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index aa225ff81..1aeca58a0 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -28,6 +28,7 @@ #include #include "iceberg/arrow_c_data.h" +#include "iceberg/metrics_reporter.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" @@ -118,6 +119,8 @@ struct TableScanContext { std::optional to_snapshot_id; std::string branch{}; std::optional min_rows_requested; + std::shared_ptr reporter; + std::string table_name; // Validate the context parameters to see if they have conflicts. [[nodiscard]] Status Validate() const; @@ -131,8 +134,12 @@ class ICEBERG_EXPORT TableScanBuilder : public ErrorCollector { /// \brief Constructs a TableScanBuilder for the given table. /// \param metadata Current table metadata. /// \param io FileIO instance for reading manifests files. + /// \param reporter Optional metrics reporter for scan metrics. + /// \param table_name Optional table name for metrics reporting. static Result> Make( - std::shared_ptr metadata, std::shared_ptr io); + std::shared_ptr metadata, std::shared_ptr io, + std::shared_ptr reporter = nullptr, + const std::string& table_name = {}); /// \brief Update property that will override the table's behavior /// based on the incoming pair. Unknown properties will be ignored. @@ -253,7 +260,9 @@ class ICEBERG_EXPORT TableScanBuilder : public ErrorCollector { Result> Build(); private: - TableScanBuilder(std::shared_ptr metadata, std::shared_ptr io); + TableScanBuilder(std::shared_ptr metadata, std::shared_ptr io, + std::shared_ptr reporter, + const std::string& table_name); // Return the schema bound to the specified snapshot. Result>> ResolveSnapshotSchema(); diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fdd88888e..873827080 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -76,6 +76,7 @@ add_iceberg_test(table_test SOURCES location_provider_test.cc metrics_config_test.cc + metrics_reporter_test.cc snapshot_summary_builder_test.cc snapshot_test.cc snapshot_util_test.cc diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 71ab6942e..cbfd79a46 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -49,6 +49,7 @@ iceberg_tests = { 'sources': files( 'location_provider_test.cc', 'metrics_config_test.cc', + 'metrics_reporter_test.cc', 'snapshot_test.cc', 'snapshot_util_test.cc', 'table_metadata_builder_test.cc', diff --git a/src/iceberg/test/metrics_reporter_test.cc b/src/iceberg/test/metrics_reporter_test.cc new file mode 100644 index 000000000..f9793be06 --- /dev/null +++ b/src/iceberg/test/metrics_reporter_test.cc @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics_reporter.h" + +#include +#include +#include +#include +#include +#include + +#include + +#include "iceberg/metrics_reporters.h" + +namespace iceberg { + +class CollectingMetricsReporter : public MetricsReporter { + public: + static Result> Make( + [[maybe_unused]] const std::unordered_map& properties) { + return std::make_unique(); + } + + void Report(const MetricsReport& report) override { reports_.push_back(report); } + + const std::vector& reports() const { return reports_; } + + private: + std::vector reports_; +}; + +TEST(CustomMetricsReporterTest, RegisterAndLoad) { + // Register custom reporter + MetricsReporters::Register("collecting", + [](const std::unordered_map& props) + -> Result> { + return CollectingMetricsReporter::Make(props); + }); + + // Load the custom reporter + std::unordered_map properties = { + {std::string(kMetricsReporterImpl), "collecting"}}; + auto result = MetricsReporters::Load(properties); + + ASSERT_TRUE(result.has_value()); + ASSERT_NE(result.value(), nullptr); + + // Report and verify + auto* reporter = dynamic_cast(result.value().get()); + ASSERT_NE(reporter, nullptr); + + ScanReport scan_report{.table_name = "test.table"}; + reporter->Report(scan_report); + + EXPECT_EQ(reporter->reports().size(), 1); + EXPECT_EQ(GetReportType(reporter->reports()[0]), MetricsReportType::kScanReport); +} + +struct ReporterRegistrationParam { + std::string test_name; + std::string register_name; + std::string load_name; + bool expect_success; +}; + +class ReporterRegistrationTest + : public ::testing::TestWithParam {}; + +TEST_P(ReporterRegistrationTest, LoadsRegisteredReporter) { + const auto& param = GetParam(); + MetricsReporters::Register(param.register_name, + [](const std::unordered_map&) + -> Result> { + return std::make_unique(); + }); + + std::unordered_map props = { + {std::string(kMetricsReporterImpl), param.load_name}}; + auto result = MetricsReporters::Load(props); + EXPECT_EQ(result.has_value(), param.expect_success); +} + +INSTANTIATE_TEST_SUITE_P( + MetricsReporterRegistration, ReporterRegistrationTest, + ::testing::Values(ReporterRegistrationParam{.test_name = "ExactMatch", + .register_name = "custom1", + .load_name = "custom1", + .expect_success = true}, + ReporterRegistrationParam{.test_name = "UpperToLower", + .register_name = "UPPER1", + .load_name = "upper1", + .expect_success = true}, + ReporterRegistrationParam{.test_name = "UnregisteredType", + .register_name = "registered1", + .load_name = "nonexistent1", + .expect_success = false}), + [](const auto& info) { return info.param.test_name; }); + +struct VariantDispatchParam { + std::string test_name; + MetricsReport report; + MetricsReportType expected_type; +}; + +class VariantDispatchTest : public ::testing::TestWithParam {}; + +TEST_P(VariantDispatchTest, CorrectTypeDispatch) { + const auto& param = GetParam(); + EXPECT_EQ(GetReportType(param.report), param.expected_type); +} + +INSTANTIATE_TEST_SUITE_P( + MetricsReportVariant, VariantDispatchTest, + ::testing::Values( + VariantDispatchParam{.test_name = "ScanReportDefault", + .report = ScanReport{}, + .expected_type = MetricsReportType::kScanReport}, + VariantDispatchParam{.test_name = "CommitReportDefault", + .report = CommitReport{}, + .expected_type = MetricsReportType::kCommitReport}), + [](const auto& info) { return info.param.test_name; }); + +struct CollectorParam { + std::string test_name; + MetricsReport report; + MetricsReportType expected_type; + std::string expected_table_name; +}; + +class CollectorTest : public ::testing::TestWithParam {}; + +TEST_P(CollectorTest, CollectsAndPreservesReport) { + const auto& param = GetParam(); + CollectingMetricsReporter reporter; + reporter.Report(param.report); + + ASSERT_EQ(reporter.reports().size(), 1); + EXPECT_EQ(GetReportType(reporter.reports()[0]), param.expected_type); + + std::visit([&](const auto& r) { EXPECT_EQ(r.table_name, param.expected_table_name); }, + reporter.reports()[0]); +} + +INSTANTIATE_TEST_SUITE_P( + MetricsCollector, CollectorTest, + ::testing::Values( + CollectorParam{.test_name = "ScanWithFields", + .report = ScanReport{.table_name = "db.t1", + .snapshot_id = 1, + .total_file_size_in_bytes = 99999}, + .expected_type = MetricsReportType::kScanReport, + .expected_table_name = "db.t1"}, + CollectorParam{.test_name = "CommitWithFields", + .report = CommitReport{.table_name = "db.t2", + .snapshot_id = 2, + .operation = "append"}, + .expected_type = MetricsReportType::kCommitReport, + .expected_table_name = "db.t2"}), + [](const auto& info) { return info.param.test_name; }); + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 6df45b30c..7af2d56db 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -19,6 +19,7 @@ */ #include "iceberg/transaction.h" +#include #include #include @@ -49,6 +50,7 @@ #include "iceberg/util/checked_cast.h" #include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" namespace iceberg { @@ -304,6 +306,7 @@ Result> Transaction::Commit() { ICEBERG_CHECK(last_update_committed_, "Cannot commit transaction when previous update is not committed"); + auto commit_start = std::chrono::steady_clock::now(); const auto& updates = metadata_builder_->changes(); if (updates.empty()) { committed_ = true; @@ -340,6 +343,10 @@ Result> Transaction::Commit() { committed_ = true; table_ = std::move(commit_result.value()); + auto duration = std::chrono::duration_cast( + std::chrono::steady_clock::now() - commit_start); + ReportCommitMetrics(duration); + return table_; } @@ -428,4 +435,73 @@ Result> Transaction::NewSnapshotManager() { return SnapshotManager::Make(shared_from_this()); } +void Transaction::ReportCommitMetrics(DurationMs duration) const { + const auto& reporter = table_->reporter(); + if (!reporter) return; + + auto snapshot_result = table_->current_snapshot(); + if (!snapshot_result.has_value() || !snapshot_result.value()) return; + + const auto& snapshot = snapshot_result.value(); + const auto& summary = snapshot->summary; + + auto parse_int64 = [&summary](const std::string& key) -> int64_t { + auto it = summary.find(key); + if (it != summary.end()) { + auto res = StringUtils::ParseNumber(it->second); + return res.has_value() ? res.value() : 0; + } + return 0; + }; + + CommitReport report; + report.table_name = table_->name().ToString(); + report.snapshot_id = snapshot->snapshot_id; + report.sequence_number = snapshot->sequence_number; + report.total_duration = duration; + + // Operation from summary + if (auto it = summary.find(SnapshotSummaryFields::kOperation); it != summary.end()) { + report.operation = it->second; + } + report.added_data_files = parse_int64(SnapshotSummaryFields::kAddedDataFiles); + report.removed_data_files = parse_int64(SnapshotSummaryFields::kDeletedDataFiles); + report.total_data_files = parse_int64(SnapshotSummaryFields::kTotalDataFiles); + report.added_delete_files = parse_int64(SnapshotSummaryFields::kAddedDeleteFiles); + report.removed_delete_files = parse_int64(SnapshotSummaryFields::kRemovedDeleteFiles); + report.total_delete_files = parse_int64(SnapshotSummaryFields::kTotalDeleteFiles); + report.added_records = parse_int64(SnapshotSummaryFields::kAddedRecords); + report.removed_records = parse_int64(SnapshotSummaryFields::kDeletedRecords); + report.added_files_size = parse_int64(SnapshotSummaryFields::kAddedFileSize); + report.removed_files_size = parse_int64(SnapshotSummaryFields::kRemovedFileSize); + + // New fields parsed from snapshot summary + report.total_records = parse_int64(SnapshotSummaryFields::kTotalRecords); + report.total_files_size = parse_int64(SnapshotSummaryFields::kTotalFileSize); + report.added_equality_delete_files = + parse_int64(SnapshotSummaryFields::kAddedEqDeleteFiles); + report.removed_equality_delete_files = + parse_int64(SnapshotSummaryFields::kRemovedEqDeleteFiles); + report.added_positional_delete_files = + parse_int64(SnapshotSummaryFields::kAddedPosDeleteFiles); + report.removed_positional_delete_files = + parse_int64(SnapshotSummaryFields::kRemovedPosDeleteFiles); + report.added_positional_deletes = parse_int64(SnapshotSummaryFields::kAddedPosDeletes); + report.removed_positional_deletes = + parse_int64(SnapshotSummaryFields::kRemovedPosDeletes); + report.total_positional_deletes = parse_int64(SnapshotSummaryFields::kTotalPosDeletes); + report.added_equality_deletes = parse_int64(SnapshotSummaryFields::kAddedEqDeletes); + report.removed_equality_deletes = parse_int64(SnapshotSummaryFields::kRemovedEqDeletes); + report.total_equality_deletes = parse_int64(SnapshotSummaryFields::kTotalEqDeletes); + report.added_dvs = parse_int64(SnapshotSummaryFields::kAddedDVs); + report.removed_dvs = parse_int64(SnapshotSummaryFields::kRemovedDVs); + report.manifests_created = parse_int64(SnapshotSummaryFields::kManifestsCreated); + report.manifests_replaced = parse_int64(SnapshotSummaryFields::kManifestsReplaced); + report.manifests_kept = parse_int64(SnapshotSummaryFields::kManifestsKept); + report.manifest_entries_processed = + parse_int64(SnapshotSummaryFields::kEntriesProcessed); + + reporter->Report(report); +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 438054b51..6b4191ff6 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -20,10 +20,12 @@ #pragma once +#include #include #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics_reporter.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -130,6 +132,8 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this