diff --git a/cirro/sdk/process.py b/cirro/sdk/process.py index 9777fd1..282924f 100644 --- a/cirro/sdk/process.py +++ b/cirro/sdk/process.py @@ -1,10 +1,12 @@ from typing import List, Union -from cirro_api_client.v1.models import Process, Executor, ProcessDetail, CustomPipelineSettings, PipelineCode +from cirro_api_client.v1.models import Process, Executor, ProcessDetail, CustomPipelineSettings, PipelineCode, \ + RunAnalysisRequest, RunAnalysisRequestParams from cirro.cirro_client import CirroApi from cirro.models.form_specification import ParameterSpecification from cirro.sdk.asset import DataPortalAssets, DataPortalAsset +from cirro.sdk.exceptions import DataPortalInputError class DataPortalProcess(DataPortalAsset): @@ -95,6 +97,82 @@ def get_parameter_spec(self) -> ParameterSpecification: """ return self._client.processes.get_parameter_spec(self.id) + def run_analysis( + self, + name: str = None, + project_id: str = None, + datasets: list = None, + description: str = "", + params=None, + notifications_emails: List[str] = None, + compute_environment: str = None, + resume_dataset_id: str = None + ) -> str: + """ + Runs this process on one or more input datasets, returns the ID of the newly created dataset. + + Args: + name (str): Name of newly created dataset + project_id (str): ID of the project to run the analysis in + datasets (List[DataPortalDataset or str]): One or more input datasets + (as DataPortalDataset objects or dataset ID strings) + description (str): Description of newly created dataset + params (dict): Analysis parameters + notifications_emails (List[str]): Notification email address(es) + compute_environment (str): Name or ID of compute environment to use, + if blank it will run in AWS + resume_dataset_id (str): ID of dataset to resume from, used for caching task execution. + It will attempt to re-use the previous output to minimize duplicate work. + Note that Nextflow does not require this parameter, as it will automatically resume + from any previous attempts using a global cache. + + Returns: + dataset_id (str): ID of newly created dataset + """ + if name is None: + raise DataPortalInputError("Must specify 'name' for run_analysis") + if project_id is None: + raise DataPortalInputError("Must specify 'project_id' for run_analysis") + if not datasets: + raise DataPortalInputError("Must specify 'datasets' for run_analysis") + if notifications_emails is None: + notifications_emails = [] + if params is None: + params = {} + + # Accept DataPortalDataset objects or raw ID strings + source_dataset_ids = [ + ds if isinstance(ds, str) else ds.id + for ds in datasets + ] + + if compute_environment: + compute_environments = self._client.compute_environments.list_environments_for_project( + project_id=project_id + ) + compute_environment = next( + (env for env in compute_environments + if env.name == compute_environment or env.id == compute_environment), + None + ) + if compute_environment is None: + raise DataPortalInputError(f"Compute environment '{compute_environment}' not found") + + resp = self._client.execution.run_analysis( + project_id=project_id, + request=RunAnalysisRequest( + name=name, + description=description, + process_id=self.id, + source_dataset_ids=source_dataset_ids, + params=RunAnalysisRequestParams.from_dict(params), + notification_emails=notifications_emails, + resume_dataset_id=resume_dataset_id, + compute_environment_id=compute_environment.id if compute_environment else None + ) + ) + return resp.id + class DataPortalProcesses(DataPortalAssets[DataPortalProcess]): """Collection of DataPortalProcess objects."""