Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion cirro/sdk/process.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import List, Union

from cirro_api_client.v1.models import Process, Executor, ProcessDetail, CustomPipelineSettings, PipelineCode
from cirro_api_client.v1.models import Process, Executor, ProcessDetail, CustomPipelineSettings, PipelineCode, \
RunAnalysisRequest, RunAnalysisRequestParams

from cirro.cirro_client import CirroApi
from cirro.models.form_specification import ParameterSpecification
from cirro.sdk.asset import DataPortalAssets, DataPortalAsset
from cirro.sdk.exceptions import DataPortalInputError


class DataPortalProcess(DataPortalAsset):
Expand Down Expand Up @@ -95,6 +97,82 @@ def get_parameter_spec(self) -> ParameterSpecification:
"""
return self._client.processes.get_parameter_spec(self.id)

def run_analysis(
self,
name: str = None,
project_id: str = None,
datasets: list = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a type to this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had to remove that because it caused a circular import error

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you do 'DataPortalDataset' in single quotes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I didn't realize that would work if it wasn't defined at some point. My bad! I'll add it to the next PR

description: str = "",
params=None,
notifications_emails: List[str] = None,
compute_environment: str = None,
resume_dataset_id: str = None
) -> str:
"""
Runs this process on one or more input datasets, returns the ID of the newly created dataset.

Args:
name (str): Name of newly created dataset
project_id (str): ID of the project to run the analysis in
datasets (List[DataPortalDataset or str]): One or more input datasets
(as DataPortalDataset objects or dataset ID strings)
description (str): Description of newly created dataset
params (dict): Analysis parameters
notifications_emails (List[str]): Notification email address(es)
compute_environment (str): Name or ID of compute environment to use,
if blank it will run in AWS
resume_dataset_id (str): ID of dataset to resume from, used for caching task execution.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need to add that this doesn't apply to nextflow since it has the auto cache.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note on this

It will attempt to re-use the previous output to minimize duplicate work.
Note that Nextflow does not require this parameter, as it will automatically resume
from any previous attempts using a global cache.

Returns:
dataset_id (str): ID of newly created dataset
"""
if name is None:
raise DataPortalInputError("Must specify 'name' for run_analysis")
if project_id is None:
raise DataPortalInputError("Must specify 'project_id' for run_analysis")
if not datasets:
raise DataPortalInputError("Must specify 'datasets' for run_analysis")
if notifications_emails is None:
notifications_emails = []
if params is None:
params = {}

# Accept DataPortalDataset objects or raw ID strings
source_dataset_ids = [
ds if isinstance(ds, str) else ds.id
for ds in datasets
]

if compute_environment:
compute_environments = self._client.compute_environments.list_environments_for_project(
project_id=project_id
)
compute_environment = next(
(env for env in compute_environments
if env.name == compute_environment or env.id == compute_environment),
None
)
if compute_environment is None:
raise DataPortalInputError(f"Compute environment '{compute_environment}' not found")

resp = self._client.execution.run_analysis(
project_id=project_id,
request=RunAnalysisRequest(
name=name,
description=description,
process_id=self.id,
source_dataset_ids=source_dataset_ids,
params=RunAnalysisRequestParams.from_dict(params),
notification_emails=notifications_emails,
resume_dataset_id=resume_dataset_id,
compute_environment_id=compute_environment.id if compute_environment else None
)
)
return resp.id


class DataPortalProcesses(DataPortalAssets[DataPortalProcess]):
"""Collection of DataPortalProcess objects."""
Expand Down
Loading