From 8a3cf2e164abd663a3f64eb196f12ed7eccdac5b Mon Sep 17 00:00:00 2001 From: Sam Minot Date: Thu, 19 Mar 2026 09:40:19 -0700 Subject: [PATCH 1/3] WIP --- cirro/sdk/process.py | 79 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/cirro/sdk/process.py b/cirro/sdk/process.py index 9777fd15..445ed551 100644 --- a/cirro/sdk/process.py +++ b/cirro/sdk/process.py @@ -1,10 +1,13 @@ from typing import List, Union -from cirro_api_client.v1.models import Process, Executor, ProcessDetail, CustomPipelineSettings, PipelineCode +from cirro_api_client.v1.models import Process, Executor, ProcessDetail, CustomPipelineSettings, PipelineCode, \ + RunAnalysisRequest, RunAnalysisRequestParams from cirro.cirro_client import CirroApi from cirro.models.form_specification import ParameterSpecification from cirro.sdk.asset import DataPortalAssets, DataPortalAsset +from cirro.sdk.exceptions import DataPortalInputError +from cirro.sdk.dataset import DataPortalDataset class DataPortalProcess(DataPortalAsset): @@ -95,6 +98,80 @@ def get_parameter_spec(self) -> ParameterSpecification: """ return self._client.processes.get_parameter_spec(self.id) + def run_analysis( + self, + name: str = None, + project_id: str = None, + datasets: List[DataPortalDataset] = None, + description: str = "", + params=None, + notifications_emails: List[str] = None, + compute_environment: str = None, + resume_dataset_id: str = None + ) -> str: + """ + Runs this process on one or more input datasets, returns the ID of the newly created dataset. + + Args: + name (str): Name of newly created dataset + project_id (str): ID of the project to run the analysis in + datasets (List[DataPortalDataset or str]): One or more input datasets + (as DataPortalDataset objects or dataset ID strings) + description (str): Description of newly created dataset + params (dict): Analysis parameters + notifications_emails (List[str]): Notification email address(es) + compute_environment (str): Name or ID of compute environment to use, + if blank it will run in AWS + resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. + It will attempt to re-use the previous output to minimize duplicate work + + Returns: + dataset_id (str): ID of newly created dataset + """ + if name is None: + raise DataPortalInputError("Must specify 'name' for run_analysis") + if project_id is None: + raise DataPortalInputError("Must specify 'project_id' for run_analysis") + if not datasets: + raise DataPortalInputError("Must specify 'datasets' for run_analysis") + if notifications_emails is None: + notifications_emails = [] + if params is None: + params = {} + + # Accept DataPortalDataset objects or raw ID strings + source_dataset_ids = [ + ds if isinstance(ds, str) else ds.id + for ds in datasets + ] + + if compute_environment: + compute_environments = self._client.compute_environments.list_environments_for_project( + project_id=project_id + ) + compute_environment = next( + (env for env in compute_environments + if env.name == compute_environment or env.id == compute_environment), + None + ) + if compute_environment is None: + raise DataPortalInputError(f"Compute environment '{compute_environment}' not found") + + resp = self._client.execution.run_analysis( + project_id=project_id, + request=RunAnalysisRequest( + name=name, + description=description, + process_id=self.id, + source_dataset_ids=source_dataset_ids, + params=RunAnalysisRequestParams.from_dict(params), + notification_emails=notifications_emails, + resume_dataset_id=resume_dataset_id, + compute_environment_id=compute_environment.id if compute_environment else None + ) + ) + return resp.id + class DataPortalProcesses(DataPortalAssets[DataPortalProcess]): """Collection of DataPortalProcess objects.""" From 63457f0db39d2c88c06652300cfcad32d1182b02 Mon Sep 17 00:00:00 2001 From: Sam Minot Date: Thu, 19 Mar 2026 10:34:40 -0700 Subject: [PATCH 2/3] Remove circular import --- cirro/sdk/process.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cirro/sdk/process.py b/cirro/sdk/process.py index 445ed551..c9e9f3d4 100644 --- a/cirro/sdk/process.py +++ b/cirro/sdk/process.py @@ -7,7 +7,6 @@ from cirro.models.form_specification import ParameterSpecification from cirro.sdk.asset import DataPortalAssets, DataPortalAsset from cirro.sdk.exceptions import DataPortalInputError -from cirro.sdk.dataset import DataPortalDataset class DataPortalProcess(DataPortalAsset): @@ -102,7 +101,7 @@ def run_analysis( self, name: str = None, project_id: str = None, - datasets: List[DataPortalDataset] = None, + datasets: list = None, description: str = "", params=None, notifications_emails: List[str] = None, From c560a3119b888e4d361b7620ee784dee9212f510 Mon Sep 17 00:00:00 2001 From: Sam Minot Date: Thu, 19 Mar 2026 13:00:37 -0700 Subject: [PATCH 3/3] Add note about global cache --- cirro/sdk/process.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cirro/sdk/process.py b/cirro/sdk/process.py index c9e9f3d4..282924fa 100644 --- a/cirro/sdk/process.py +++ b/cirro/sdk/process.py @@ -122,7 +122,9 @@ def run_analysis( compute_environment (str): Name or ID of compute environment to use, if blank it will run in AWS resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. - It will attempt to re-use the previous output to minimize duplicate work + It will attempt to re-use the previous output to minimize duplicate work. + Note that Nextflow does not require this parameter, as it will automatically resume + from any previous attempts using a global cache. Returns: dataset_id (str): ID of newly created dataset