From ef906745bef242746faf1233a077751a0d533a56 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Fri, 15 Mar 2024 09:26:30 -0400 Subject: [PATCH 1/2] Fix issues running GatherResourceUseage twice. Fix some issues related to running the gather resource usage script twice on the same collection. --- .../lsst/analysis/tools/tasks/gatherResourceUsage.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py index 28d0a7b21..c27862f7c 100644 --- a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py +++ b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py @@ -589,9 +589,10 @@ def __init__( input_metadata_dataset_type = results.parentDatasetType refs_for_type = set(results) if refs_for_type: - gather_task_label, gather_dataset_type_name = self._add_gather_task( - pipeline_graph, input_metadata_dataset_type - ) + task_results = self._add_gather_task(pipeline_graph, input_metadata_dataset_type) + if task_results is None: + continue + gather_task_label, gather_dataset_type_name = task_results metadata_refs[gather_task_label] = refs_for_type consolidate_config.input_names.append(gather_dataset_type_name) pipeline_graph.add_task( @@ -623,7 +624,7 @@ def __init__( @classmethod def _add_gather_task( cls, pipeline_graph: PipelineGraph, input_metadata_dataset_type: DatasetType - ) -> tuple[str, str]: + ) -> tuple[str, str] | None: """Add a single configuration of `GatherResourceUsageTask` to a pipeline graph. @@ -647,6 +648,8 @@ def _add_gather_task( return elif "gatherResourceUsage" in input_metadata_dataset_type.name: return + elif "consolidateResourceUsage" in input_metadata_dataset_type.name: + return else: input_task_label = m.group(1) gather_task_label = f"{input_task_label}_gatherResourceUsage" From 7c86f4d40d1a62855f8628a2c8cfffd25e756a7b Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Fri, 15 Mar 2024 09:28:18 -0400 Subject: [PATCH 2/2] Add the ability to generate measurement bundles Add infrastructure to enable creating MetricMeasurementBundles when producing the consolidated resources. --- .../tools/tasks/gatherResourceUsage.py | 166 +++++++++++++++++- 1 file changed, 165 insertions(+), 1 deletion(-) diff --git a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py index c27862f7c..ff598ba5b 100644 --- a/python/lsst/analysis/tools/tasks/gatherResourceUsage.py +++ b/python/lsst/analysis/tools/tasks/gatherResourceUsage.py @@ -37,6 +37,7 @@ from collections.abc import Iterable, Sequence from typing import Any +import astropy.units as apu import numpy as np import pandas as pd from lsst.daf.butler import Butler, DatasetRef, DatasetType @@ -60,10 +61,66 @@ # written in task metadata were platform-dependent. Once we no longer care # about older runs, this import and the code that uses it can be removed. from lsst.utils.usage import _RUSAGE_MEMORY_MULTIPLIER +from lsst.verify import Measurement + +from ..interfaces import MetricMeasurementBundle +from ..interfaces._task import _timestampValidator _LOG = logging.getLogger(__name__) +def _resource_table_to_bundle( + table: pd.DataFrame, dataset_identifier: str, reference_package: str, timestamp_version: str +) -> MetricMeasurementBundle: + """Convert a resource usage table into a `MetricMeasurementBundle` + + See `lsst.analysis.tools.interfaces.AnalysisPipelineTask` for more + information on each of the following options. + + Parameters + ---------- + table : `DataFrame` + Resource Usge in the the form of a DataFrame. + dataset_identifier : `str` + The name of the data processing to associate with this metric bundle. + reference_package : `str` + The reference package to use if the timestamp version is set to a + package version. + timestamp_version : `str` + The type of timestamp to associate with the bundle. + """ + bundle = MetricMeasurementBundle( + dataset_identifier=dataset_identifier, + reference_package=reference_package, + timestamp_version=timestamp_version, + ) + # determine all the columns in the table these will become measurements. + column_keys = set(table.keys()) + # discard the task, as this will be like the AnalysisTools in the bundle. + column_keys.remove("task") + # Measurements need units, use this to map the column to unit type. + unit_mapping = ( + ("quanta", apu.Unit("count")), + ("_hrs", apu.Unit("hour")), + ("_GB", apu.Unit("Gbyte")), + ("_s", apu.Unit("s")), + ) + # for each row, grab the task name, and create a list of measurements. + for _, row in table.iterrows(): + task_name = f"{row['task']}_memrun" + task_data = [] + for key in column_keys: + unit = None + for stub, value in unit_mapping: + if stub in key: + unit = value + if unit is None: + raise ValueError(f"Could not determine units for task {row['task']}") + task_data.append(Measurement(key, row[key] * unit)) + bundle[task_name] = task_data + return bundle + + class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()): """Connection definitions for `ConsolidateResourceUsageTask`.""" @@ -74,6 +131,16 @@ class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=() doc="Consolidated table of resource usage statistics. One row per task label", ) + output_metrics = cT.Output( + name="ResourceUsageSummary_metrics", + storageClass="MetricMeasurementBundle", + dimensions=(), + doc=( + "MetricMeasurementBundle with the same information as the ResourceUsageSummary in the form " + "required for Sasquatch dispatch" + ), + ) + def __init__(self, *, config): super().__init__(config=config) for name in self.config.input_names: @@ -88,6 +155,8 @@ def __init__(self, *, config): ), ) self.inputs.add(name) + if not self.config.do_make_metrics: + self.outputs.remove("output_metrics") class ConsolidateResourceUsageConfig( @@ -99,6 +168,25 @@ class ConsolidateResourceUsageConfig( doc="Input resource usage dataset type names", default=[], ) + do_make_metrics = Field[bool](doc="Make metric bundle in addition to DataFrame", default=False) + dataset_identifier = Field[str](doc="An identifier to be associated with output Metrics", optional=True) + reference_package = Field[str]( + doc="A package whos version, at the time of metric upload to a " + "time series database, will be converted to a timestamp of when " + "that version was produced", + default="lsst_distrib", + optional=True, + ) + timestamp_version = Field[str]( + doc="Which time stamp should be used as the reference timestamp for a " + "metric in a time series database, valid values are; " + "reference_package_timestamp, run_timestamp, current_timestamp, " + "dataset_timestamp and explicit_timestamp:datetime where datetime is " + "given in the form %Y%m%dT%H%M%S%z", + default="run_timestamp", + check=_timestampValidator, + optional=True, + ) class ConsolidateResourceUsageTask(PipelineTask): @@ -113,6 +201,7 @@ class ConsolidateResourceUsageTask(PipelineTask): """ ConfigClass = ConsolidateResourceUsageConfig + config: ConsolidateResourceUsageConfig _DefaultName = "consolidateResourceUsage" def run(self, **kwargs: Any) -> Struct: @@ -166,8 +255,18 @@ def run(self, **kwargs: Any) -> Struct: .sort_values("task"), memrun, ) + results = Struct(output_table=memrun) + + if self.config.do_make_metrics: + bundle = _resource_table_to_bundle( + memrun, + self.config.dataset_identifier, + self.config.reference_package, + self.config.timestamp_version, + ) + results.output_metrics = bundle - return Struct(output_table=memrun) + return results class GatherResourceUsageConnections( @@ -547,6 +646,18 @@ class ResourceUsageQuantumGraphBuilder(QuantumGraphBuilder): Whether *execution* of this quantum graph will permit clobbering. If `False` (default), existing outputs in ``output_run`` are an error unless ``skip_existing_in`` will cause those quanta to be skipped. + make_metric : `bool`, optional + Produce a metric measurement bundle when processing the output + table. + timestamp_version : `str`, optional + The type of timestamp used when creating a `MetricMeasurementBundle`, + see there for more details. + dataset_identifier: `str`, optional + A processing identifer that is associated with the processing of this + data, for instance "RC2_subset" for the nightly reprocessings. + reference_package : `str`, optional + The dataset used as an identifier when timestamp_version is set to + reference_package. Notes ----- @@ -567,6 +678,10 @@ def __init__( output_run: str | None = None, skip_existing_in: Sequence[str] = (), clobber: bool = False, + make_metric: bool = False, + timestamp_version: str | None = None, + dataset_identifier: str | None = None, + reference_package: str | None = None, ): # Start by querying for metadata datasets, since we'll need to know # which dataset types exist in the input collections in order to @@ -580,6 +695,11 @@ def __init__( pipeline_graph = PipelineGraph() metadata_refs: dict[str, set[DatasetRef]] = {} consolidate_config = ConsolidateResourceUsageConfig() + if make_metric: + consolidate_config.do_make_metrics = True + consolidate_config.dataset_identifier = dataset_identifier + consolidate_config.timestamp_version = timestamp_version + consolidate_config.reference_package = reference_package for results in butler.registry.queryDatasets( input_dataset_types, where=where, @@ -753,6 +873,37 @@ def make_argument_parser(cls) -> argparse.ArgumentParser: default=None, metavar="RUN", ) + parser.add_argument( + "--make-metric", + type=bool, + help=( + "Turn the output resource usage table into a metric measurement bundle format compatible " + "with Sasquatch." + ), + default=True, + metavar="DO_MAKE_METRIC", + ) + parser.add_argument( + "--dataset-identifier", + type=str, + help="Set the dataset these results are associated with.", + default=None, + metavar="DATASET_IDENTIFIER", + ) + parser.add_argument( + "--reference-package", + type=str, + help="Reference package to use when selecting reference timestamp", + default="lsst_distrib", + metavar="REFERENCE_PACKAGE", + ) + parser.add_argument( + "--timestamp-version", + type=str, + help="Set the dataset these results are associated with.", + default="run_timestamp", + metavar="TIMESTAMP_VERSION", + ) return parser @classmethod @@ -770,6 +921,18 @@ def main(cls) -> None: raise ValueError("At least one of --output or --output-run options is required.") args.output_run = "{}/{}".format(args.output, Instrument.makeCollectionTimestamp()) + extra_args = {} + if args.make_metric: + if args.dataset_identifier is None or args.timestamp_version is None: + raise ValueError( + "If metrics are going to be created, --dataset-identifier and --timestamp-version " + "must be specified." + ) + extra_args["make_metric"] = True + extra_args["timestamp_version"] = args.timestamp_version + extra_args["dataset_identifier"] = args.dataset_identifier + extra_args["reference_package"] = args.reference_package + butler = Butler(args.repo, collections=args.collections) builder = cls( butler, @@ -777,6 +940,7 @@ def main(cls) -> None: where=args.where, input_collections=args.collections, output_run=args.output_run, + **extra_args, ) qg: QuantumGraph = builder.build( # Metadata includes a subset of attributes defined in CmdLineFwk.