-
Notifications
You must be signed in to change notification settings - Fork 180
Consolidate airlock storage architecture and implement metadata-based management #4853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
57b3d70
5bbe648
1d2172e
fa39c85
138820b
8941b1b
47dcdc8
427515d
b09f990
231f434
76a3d62
cd70948
f08b384
5211f36
76a09bd
2c6235b
0df7e5c
e375cf2
72c9478
2b66bb3
1d5b8ef
7638186
d490b5a
e20e33a
4f2fe0b
aa6c32a
85ab8af
bee6cdc
ff96ee5
e025056
b98ede1
4a9b185
8421bdb
34f2636
3d99220
ad73137
105f38b
7335c65
55f3590
b0c50e8
fcead34
bd14845
8b405ef
115e778
051ef76
98764f9
816e4e8
25c194e
90fc2d7
de87d71
a9125cb
d885d40
4992cfd
3b9dbd6
d3fa795
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -214,3 +214,4 @@ validation.txt | |
|
|
||
| /index.html | ||
| .DS_Store | ||
| *_old.tf | ||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -9,7 +9,7 @@ | |||||||||||
|
|
||||||||||||
| from exceptions import NoFilesInRequestException, TooManyFilesInRequestException | ||||||||||||
|
|
||||||||||||
| from shared_code import blob_operations, constants | ||||||||||||
| from shared_code import blob_operations, constants, airlock_storage_helper, parsers | ||||||||||||
| from pydantic import BaseModel, parse_obj_as | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
|
|
@@ -19,6 +19,8 @@ class RequestProperties(BaseModel): | |||||||||||
| previous_status: Optional[str] | ||||||||||||
| type: str | ||||||||||||
| workspace_id: str | ||||||||||||
| review_workspace_id: Optional[str] = None | ||||||||||||
| airlock_version: int = 1 | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| class ContainersCopyMetadata: | ||||||||||||
|
|
@@ -31,6 +33,8 @@ def __init__(self, source_account_name: str, dest_account_name: str): | |||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def main(msg: func.ServiceBusMessage, stepResultEvent: func.Out[func.EventGridOutputEvent], dataDeletionEvent: func.Out[func.EventGridOutputEvent]): | ||||||||||||
| request_properties = None | ||||||||||||
| request_files = None | ||||||||||||
| try: | ||||||||||||
| request_properties = extract_properties(msg) | ||||||||||||
| request_files = get_request_files(request_properties) if request_properties.new_status == constants.STAGE_SUBMITTED else None | ||||||||||||
|
|
@@ -53,13 +57,25 @@ def handle_status_changed(request_properties: RequestProperties, stepResultEvent | |||||||||||
|
|
||||||||||||
| logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, request_type) | ||||||||||||
|
|
||||||||||||
| # Check if using metadata-based stage management (v2) or legacy per-stage accounts (v1) | ||||||||||||
| use_metadata = request_properties.airlock_version >= 2 | ||||||||||||
|
|
||||||||||||
| if new_status == constants.STAGE_DRAFT: | ||||||||||||
| account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, short_workspace_id=ws_id) | ||||||||||||
| blob_operations.create_container(account_name, req_id) | ||||||||||||
| if use_metadata: | ||||||||||||
| from shared_code.blob_operations_metadata import create_container_with_metadata | ||||||||||||
| account_name = airlock_storage_helper.get_storage_account_name_for_request(request_type, new_status, ws_id, airlock_version=request_properties.airlock_version) | ||||||||||||
| stage = airlock_storage_helper.get_stage_from_status(request_type, new_status) | ||||||||||||
| create_container_with_metadata(account_name, req_id, stage, workspace_id=ws_id, request_type=request_type) | ||||||||||||
| else: | ||||||||||||
| account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, short_workspace_id=ws_id) | ||||||||||||
| blob_operations.create_container(account_name, req_id) | ||||||||||||
| return | ||||||||||||
|
|
||||||||||||
| if new_status == constants.STAGE_CANCELLED: | ||||||||||||
| storage_account_name = get_storage_account(previous_status, request_type, ws_id) | ||||||||||||
| if use_metadata: | ||||||||||||
| storage_account_name = airlock_storage_helper.get_storage_account_name_for_request(request_type, previous_status, ws_id, airlock_version=request_properties.airlock_version) | ||||||||||||
| else: | ||||||||||||
| storage_account_name = get_storage_account(previous_status, request_type, ws_id) | ||||||||||||
| container_to_delete_url = blob_operations.get_blob_url(account_name=storage_account_name, container_name=req_id) | ||||||||||||
| set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url=container_to_delete_url) | ||||||||||||
| return | ||||||||||||
|
|
@@ -68,11 +84,74 @@ def handle_status_changed(request_properties: RequestProperties, stepResultEvent | |||||||||||
| set_output_event_to_report_request_files(stepResultEvent, request_properties, request_files) | ||||||||||||
|
|
||||||||||||
| if (is_require_data_copy(new_status)): | ||||||||||||
| logging.info('Request with id %s. requires data copy between storage accounts', req_id) | ||||||||||||
| containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, short_workspace_id=ws_id) | ||||||||||||
| blob_operations.create_container(containers_metadata.dest_account_name, req_id) | ||||||||||||
| blob_operations.copy_data(containers_metadata.source_account_name, | ||||||||||||
| containers_metadata.dest_account_name, req_id) | ||||||||||||
| if use_metadata: | ||||||||||||
| # Metadata mode: Update container stage instead of copying | ||||||||||||
| from shared_code.blob_operations_metadata import update_container_stage, create_container_with_metadata | ||||||||||||
|
|
||||||||||||
| # For import submit, use review_workspace_id so data goes to review workspace storage | ||||||||||||
| effective_ws_id = ws_id | ||||||||||||
| if new_status == constants.STAGE_SUBMITTED and request_type.lower() == constants.IMPORT_TYPE and request_properties.review_workspace_id: | ||||||||||||
| effective_ws_id = request_properties.review_workspace_id | ||||||||||||
|
|
||||||||||||
| # Get the storage account (might change from core to workspace or vice versa) | ||||||||||||
| source_account = airlock_storage_helper.get_storage_account_name_for_request(request_type, previous_status, ws_id, airlock_version=request_properties.airlock_version) | ||||||||||||
| dest_account = airlock_storage_helper.get_storage_account_name_for_request(request_type, new_status, effective_ws_id, airlock_version=request_properties.airlock_version) | ||||||||||||
| new_stage = airlock_storage_helper.get_stage_from_status(request_type, new_status) | ||||||||||||
|
|
||||||||||||
| if source_account == dest_account: | ||||||||||||
| # Same storage account - just update metadata | ||||||||||||
| logging.info(f'Request {req_id}: Updating container stage to {new_stage} (no copy needed)') | ||||||||||||
| update_container_stage(source_account, req_id, new_stage, changed_by='system') | ||||||||||||
|
|
||||||||||||
| # In v2, same-account transitions don't fire BlobCreated events. | ||||||||||||
| # For SUBMITTED, v1 relies on BlobCreatedTrigger to handle the malware scanning gate | ||||||||||||
| # (skip to in_review when scanning is disabled). We handle this inline for v2. | ||||||||||||
| if new_status == constants.STAGE_SUBMITTED: | ||||||||||||
| try: | ||||||||||||
| enable_malware_scanning = parsers.parse_bool(os.environ["ENABLE_MALWARE_SCANNING"]) | ||||||||||||
| except KeyError: | ||||||||||||
| logging.error("environment variable 'ENABLE_MALWARE_SCANNING' does not exist. Cannot continue.") | ||||||||||||
| raise | ||||||||||||
| if not enable_malware_scanning: | ||||||||||||
| logging.info(f'Request {req_id}: Malware scanning disabled, skipping to in_review') | ||||||||||||
| stepResultEvent.set( | ||||||||||||
| func.EventGridOutputEvent( | ||||||||||||
| id=str(uuid.uuid4()), | ||||||||||||
| data={"completed_step": constants.STAGE_SUBMITTED, "new_status": constants.STAGE_IN_REVIEW, "request_id": req_id}, | ||||||||||||
| subject=req_id, | ||||||||||||
| event_type="Airlock.StepResult", | ||||||||||||
| event_time=datetime.datetime.now(datetime.UTC), | ||||||||||||
| data_version=constants.STEP_RESULT_EVENT_DATA_VERSION)) | ||||||||||||
| else: | ||||||||||||
| logging.info(f'Request {req_id}: Malware scanning enabled, waiting for scan result') | ||||||||||||
| elif new_status in [constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]: | ||||||||||||
| # Terminal transitions: emit StepResult immediately since no BlobCreated event will fire | ||||||||||||
| final_status = constants.STAGE_REJECTED if new_status == constants.STAGE_REJECTION_INPROGRESS else constants.STAGE_BLOCKED_BY_SCAN | ||||||||||||
| logging.info(f'Request {req_id}: Emitting StepResult for terminal transition {new_status} -> {final_status}') | ||||||||||||
| stepResultEvent.set( | ||||||||||||
| func.EventGridOutputEvent( | ||||||||||||
| id=str(uuid.uuid4()), | ||||||||||||
| data={"completed_step": new_status, "new_status": final_status, "request_id": req_id}, | ||||||||||||
| subject=req_id, | ||||||||||||
| event_type="Airlock.StepResult", | ||||||||||||
| event_time=datetime.datetime.now(datetime.UTC), | ||||||||||||
| data_version=constants.STEP_RESULT_EVENT_DATA_VERSION)) | ||||||||||||
| else: | ||||||||||||
| # Different storage account (e.g., core → workspace on import approval, | ||||||||||||
| # workspace → core on export approval) - need to copy. | ||||||||||||
| # BlobCreatedTrigger will fire when the copy completes and emit the StepResult, | ||||||||||||
| # matching the v1 async pattern for large data transfers. | ||||||||||||
| logging.info(f'Request {req_id}: Copying from {source_account} to {dest_account}') | ||||||||||||
| create_container_with_metadata(dest_account, req_id, new_stage, workspace_id=effective_ws_id, request_type=request_type) | ||||||||||||
| blob_operations.copy_data(source_account, dest_account, req_id) | ||||||||||||
| else: | ||||||||||||
| # Legacy mode: Copy data between storage accounts | ||||||||||||
| logging.info('Request with id %s. requires data copy between storage accounts', req_id) | ||||||||||||
| review_ws_id = request_properties.review_workspace_id | ||||||||||||
| containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, short_workspace_id=ws_id, review_workspace_id=review_ws_id) | ||||||||||||
| blob_operations.create_container(containers_metadata.dest_account_name, req_id) | ||||||||||||
| blob_operations.copy_data(containers_metadata.source_account_name, | ||||||||||||
| containers_metadata.dest_account_name, req_id) | ||||||||||||
| return | ||||||||||||
|
|
||||||||||||
| # Other statuses which do not require data copy are dismissed as we don't need to do anything... | ||||||||||||
|
|
@@ -102,7 +181,7 @@ def is_require_data_copy(new_status: str): | |||||||||||
| return False | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata: | ||||||||||||
| def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, short_workspace_id: str, review_workspace_id: str = None) -> ContainersCopyMetadata: | ||||||||||||
| # sanity | ||||||||||||
| if is_require_data_copy(new_status) is False: | ||||||||||||
| raise Exception("Given new status is not supported") | ||||||||||||
|
|
@@ -115,7 +194,7 @@ def get_source_dest_for_copy(new_status: str, previous_status: str, request_type | |||||||||||
| raise Exception(msg) | ||||||||||||
|
|
||||||||||||
| source_account_name = get_storage_account(previous_status, request_type, short_workspace_id) | ||||||||||||
| dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, short_workspace_id) | ||||||||||||
| dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, short_workspace_id, review_workspace_id=review_workspace_id) | ||||||||||||
| return ContainersCopyMetadata(source_account_name, dest_account_name) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
|
|
@@ -151,12 +230,14 @@ def get_storage_account(status: str, request_type: str, short_workspace_id: str) | |||||||||||
| raise Exception(error_message) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def get_storage_account_destination_for_copy(new_status: str, request_type: str, short_workspace_id: str) -> str: | ||||||||||||
| def get_storage_account_destination_for_copy(new_status: str, request_type: str, short_workspace_id: str, review_workspace_id: str = None) -> str: | ||||||||||||
| tre_id = _get_tre_id() | ||||||||||||
|
|
||||||||||||
| if request_type == constants.IMPORT_TYPE: | ||||||||||||
| if new_status == constants.STAGE_SUBMITTED: | ||||||||||||
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id | ||||||||||||
| # Import submit: copy to review workspace storage, or tre_id for legacy compatibility | ||||||||||||
| dest_id = review_workspace_id if review_workspace_id else tre_id | ||||||||||||
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + dest_id | ||||||||||||
|
marrobi marked this conversation as resolved.
Comment on lines
+238
to
+240
|
||||||||||||
| # Import submit: copy to review workspace storage, or tre_id for legacy compatibility | |
| dest_id = review_workspace_id if review_workspace_id else tre_id | |
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + dest_id | |
| # Import submit: in legacy/v1 mode the in-progress storage account remains TRE-scoped. | |
| return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| __version__ = "0.8.9" | ||
| __version__ = "0.8.13" | ||
|
marrobi marked this conversation as resolved.
|
||
Uh oh!
There was an error while loading. Please reload this page.