1- import simplejson as json
2- import traceback
1+ # Copyright (c) Microsoft Corporation.
2+ # Licensed under the MIT License.
33
4+ import traceback
45from datetime import datetime
56from typing import Any
7+
8+ import simplejson as json
69from google.protobuf import timestamp_pb2, wrappers_pb2
710
8- from durabletask.protos.orchestrator_service_pb2 import *
11+ import durabletask.protos.orchestrator_service_pb2 as pb
912
1013# TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere
1114
1215
13- def new_orchestrator_started_event(timestamp: datetime | None = None) -> HistoryEvent:
16+ def new_orchestrator_started_event(timestamp: datetime | None = None) -> pb. HistoryEvent:
1417 ts = timestamp_pb2.Timestamp()
1518 if timestamp is not None:
1619 ts.FromDatetime(timestamp)
17- return HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=OrchestratorStartedEvent())
20+ return pb. HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=pb. OrchestratorStartedEvent())
1821
1922
20- def new_execution_started_event(name: str, instance_id: str, encoded_input: str | None = None) -> HistoryEvent:
21- return HistoryEvent(
23+ def new_execution_started_event(name: str, instance_id: str, encoded_input: str | None = None) -> pb. HistoryEvent:
24+ return pb. HistoryEvent(
2225 eventId=-1,
2326 timestamp=timestamp_pb2.Timestamp(),
24- executionStarted=ExecutionStartedEvent(
25- name=name, input=get_string_value(encoded_input), orchestrationInstance=OrchestrationInstance(instanceId=instance_id)))
27+ executionStarted=pb.ExecutionStartedEvent(
28+ name=name,
29+ input=get_string_value(encoded_input),
30+ orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id)))
2631
2732
28- def new_timer_created_event(timer_id: int, fire_at: datetime) -> HistoryEvent:
33+ def new_timer_created_event(timer_id: int, fire_at: datetime) -> pb. HistoryEvent:
2934 ts = timestamp_pb2.Timestamp()
3035 ts.FromDatetime(fire_at)
31- return HistoryEvent(
36+ return pb. HistoryEvent(
3237 eventId=timer_id,
3338 timestamp=timestamp_pb2.Timestamp(),
34- timerCreated=TimerCreatedEvent(fireAt=ts)
39+ timerCreated=pb. TimerCreatedEvent(fireAt=ts)
3540 )
3641
3742
38- def new_timer_fired_event(timer_id: int, fire_at: datetime) -> HistoryEvent:
43+ def new_timer_fired_event(timer_id: int, fire_at: datetime) -> pb. HistoryEvent:
3944 ts = timestamp_pb2.Timestamp()
4045 ts.FromDatetime(fire_at)
41- return HistoryEvent(
46+ return pb. HistoryEvent(
4247 eventId=-1,
4348 timestamp=timestamp_pb2.Timestamp(),
44- timerFired=TimerFiredEvent(fireAt=ts, timerId=timer_id)
49+ timerFired=pb. TimerFiredEvent(fireAt=ts, timerId=timer_id)
4550 )
4651
4752
48- def new_task_scheduled_event(event_id: int, name: str, encoded_input: str | None = None) -> HistoryEvent:
49- return HistoryEvent(
53+ def new_task_scheduled_event(event_id: int, name: str, encoded_input: str | None = None) -> pb. HistoryEvent:
54+ return pb. HistoryEvent(
5055 eventId=event_id,
5156 timestamp=timestamp_pb2.Timestamp(),
52- taskScheduled=TaskScheduledEvent(name=name, input=get_string_value(encoded_input))
57+ taskScheduled=pb. TaskScheduledEvent(name=name, input=get_string_value(encoded_input))
5358 )
5459
5560
56- def new_task_completed_event(event_id: int, encoded_output: str | None = None) -> HistoryEvent:
57- return HistoryEvent(
61+ def new_task_completed_event(event_id: int, encoded_output: str | None = None) -> pb. HistoryEvent:
62+ return pb. HistoryEvent(
5863 eventId=-1,
5964 timestamp=timestamp_pb2.Timestamp(),
60- taskCompleted=TaskCompletedEvent(taskScheduledId=event_id, result=get_string_value(encoded_output))
65+ taskCompleted=pb. TaskCompletedEvent(taskScheduledId=event_id, result=get_string_value(encoded_output))
6166 )
6267
6368
64- def new_task_failed_event(event_id: int, ex: Exception) -> HistoryEvent:
65- return HistoryEvent(
69+ def new_task_failed_event(event_id: int, ex: Exception) -> pb. HistoryEvent:
70+ return pb. HistoryEvent(
6671 eventId=-1,
6772 timestamp=timestamp_pb2.Timestamp(),
68- taskFailed=TaskFailedEvent(taskScheduledId=event_id, failureDetails=new_failure_details(ex))
73+ taskFailed=pb. TaskFailedEvent(taskScheduledId=event_id, failureDetails=new_failure_details(ex))
6974 )
7075
7176
72- def new_sub_orchestration_created_event(event_id: int, name: str, instance_id: str, encoded_input: str | None = None) -> HistoryEvent:
73- return HistoryEvent(
77+ def new_sub_orchestration_created_event(
78+ event_id: int,
79+ name: str,
80+ instance_id: str,
81+ encoded_input: str | None = None) -> pb.HistoryEvent:
82+ return pb.HistoryEvent(
7483 eventId=event_id,
7584 timestamp=timestamp_pb2.Timestamp(),
76- subOrchestrationInstanceCreated=SubOrchestrationInstanceCreatedEvent(
85+ subOrchestrationInstanceCreated=pb. SubOrchestrationInstanceCreatedEvent(
7786 name=name,
7887 input=get_string_value(encoded_input),
7988 instanceId=instance_id)
8089 )
8190
8291
83- def new_sub_orchestration_completed_event(event_id: int, encoded_output: str | None = None) -> HistoryEvent:
84- return HistoryEvent(
92+ def new_sub_orchestration_completed_event(event_id: int, encoded_output: str | None = None) -> pb. HistoryEvent:
93+ return pb. HistoryEvent(
8594 eventId=-1,
8695 timestamp=timestamp_pb2.Timestamp(),
87- subOrchestrationInstanceCompleted=SubOrchestrationInstanceCompletedEvent(
96+ subOrchestrationInstanceCompleted=pb. SubOrchestrationInstanceCompletedEvent(
8897 result=get_string_value(encoded_output),
8998 taskScheduledId=event_id)
9099 )
91100
92101
93- def new_sub_orchestration_failed_event(event_id: int, ex: Exception) -> HistoryEvent:
94- return HistoryEvent(
102+ def new_sub_orchestration_failed_event(event_id: int, ex: Exception) -> pb. HistoryEvent:
103+ return pb. HistoryEvent(
95104 eventId=-1,
96105 timestamp=timestamp_pb2.Timestamp(),
97- subOrchestrationInstanceFailed=SubOrchestrationInstanceFailedEvent(
106+ subOrchestrationInstanceFailed=pb. SubOrchestrationInstanceFailedEvent(
98107 failureDetails=new_failure_details(ex),
99108 taskScheduledId=event_id)
100109 )
101110
102111
103- def new_failure_details(ex: Exception) -> TaskFailureDetails:
104- return TaskFailureDetails(
112+ def new_failure_details(ex: Exception) -> pb. TaskFailureDetails:
113+ return pb. TaskFailureDetails(
105114 errorType=type(ex).__name__,
106115 errorMessage=str(ex),
107116 stackTrace=wrappers_pb2.StringValue(value=''.join(traceback.format_tb(ex.__traceback__)))
@@ -117,29 +126,29 @@ def get_string_value(val: str | None) -> wrappers_pb2.StringValue | None:
117126
118127def new_complete_orchestration_action(
119128 id: int,
120- status: OrchestrationStatus,
129+ status: pb. OrchestrationStatus,
121130 result: str | None = None,
122- failure_details: TaskFailureDetails | None = None) -> OrchestratorAction:
131+ failure_details: pb. TaskFailureDetails | None = None) -> pb. OrchestratorAction:
123132
124- completeOrchestrationAction = CompleteOrchestrationAction(
133+ completeOrchestrationAction = pb. CompleteOrchestrationAction(
125134 orchestrationStatus=status,
126135 result=get_string_value(result),
127136 failureDetails=failure_details)
128137
129138 # TODO: CarryoverEvents
130139
131- return OrchestratorAction(id=id, completeOrchestration=completeOrchestrationAction)
140+ return pb. OrchestratorAction(id=id, completeOrchestration=completeOrchestrationAction)
132141
133142
134- def new_create_timer_action(id: int, fire_at: datetime) -> OrchestratorAction:
143+ def new_create_timer_action(id: int, fire_at: datetime) -> pb. OrchestratorAction:
135144 timestamp = timestamp_pb2.Timestamp()
136145 timestamp.FromDatetime(fire_at)
137- return OrchestratorAction(id=id, createTimer=CreateTimerAction(fireAt=timestamp))
146+ return pb. OrchestratorAction(id=id, createTimer=pb. CreateTimerAction(fireAt=timestamp))
138147
139148
140- def new_schedule_task_action(id: int, name: str, input: Any) -> OrchestratorAction:
149+ def new_schedule_task_action(id: int, name: str, input: Any) -> pb. OrchestratorAction:
141150 encoded_input = json.dumps(input) if input is not None else None
142- return OrchestratorAction(id=id, scheduleTask=ScheduleTaskAction(
151+ return pb. OrchestratorAction(id=id, scheduleTask=pb. ScheduleTaskAction(
143152 name=name,
144153 input=get_string_value(encoded_input)
145154 ))
@@ -151,9 +160,13 @@ def new_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
151160 return ts
152161
153162
154- def new_create_sub_orchestration_action(id: int, name: str, instance_id: str | None, input: Any) -> OrchestratorAction:
163+ def new_create_sub_orchestration_action(
164+ id: int,
165+ name: str,
166+ instance_id: str | None,
167+ input: Any) -> pb.OrchestratorAction:
155168 encoded_input = json.dumps(input) if input is not None else None
156- return OrchestratorAction(id=id, createSubOrchestration=CreateSubOrchestrationAction(
169+ return pb. OrchestratorAction(id=id, createSubOrchestration=pb. CreateSubOrchestrationAction(
157170 name=name,
158171 instanceId=instance_id,
159172 input=get_string_value(encoded_input)
0 commit comments