Skip to content

Commit 70cb81d

Browse files
committed
Initial Commit
1 parent c658a52 commit 70cb81d

17 files changed

Lines changed: 3596 additions & 150 deletions

.github/workflows/durabletask.yml

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,7 @@ jobs:
5757
- name: Pytest unit tests
5858
working-directory: tests/durabletask
5959
run: |
60-
pytest -m "not e2e and not dts" --verbose
61-
# Sidecar for running e2e tests requires Go SDK
62-
- name: Install Go SDK
63-
uses: actions/setup-go@v5
64-
with:
65-
go-version: 'stable'
66-
# Install and run the durabletask-go sidecar for running e2e tests
67-
- name: Pytest e2e tests
68-
working-directory: tests/durabletask
69-
run: |
70-
go install github.com/microsoft/durabletask-go@main
71-
durabletask-go --port 4001 &
72-
pytest -m "e2e and not dts" --verbose
60+
pytest -m "not dts" --verbose
7361
7462
publish-release:
7563
if: startsWith(github.ref, 'refs/tags/v') # Only run if a matching tag is pushed

Makefile

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
init:
22
pip3 install -r requirements.txt
33

4-
test-unit:
5-
pytest -m "not e2e" --verbose
6-
7-
test-e2e:
8-
pytest -m e2e --verbose
4+
test:
5+
pytest --verbose
96

107
install:
118
python3 -m pip install .
@@ -16,4 +13,4 @@ gen-proto:
1613
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
1714
rm durabletask/internal/*.proto
1815

19-
.PHONY: init test-unit test-e2e gen-proto install
16+
.PHONY: init test gen-proto install

docs/development.md

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,10 @@ make gen-proto
1111

1212
This will download the `orchestrator_service.proto` from the `microsoft/durabletask-protobuf` repo and compile it using `grpcio-tools`. The version of the source proto file that was downloaded can be found in the file `durabletask/internal/PROTO_SOURCE_COMMIT_HASH`.
1313

14-
### Running unit tests
14+
### Running tests
1515

16-
Unit tests can be run using the following command from the project root. Unit tests _don't_ require a sidecar process to be running.
16+
Tests can be run using the following command from the project root.
1717

1818
```sh
19-
make test-unit
20-
```
21-
22-
### Running E2E tests
23-
24-
The E2E (end-to-end) tests require a sidecar process to be running. You can use the Durable Task test sidecar using the following `docker` command:
25-
26-
```sh
27-
go install github.com/microsoft/durabletask-go@main
28-
durabletask-go --port 4001
29-
```
30-
31-
To run the E2E tests, run the following command from the project root:
32-
33-
```sh
34-
make test-e2e
19+
make test
3520
```

durabletask/testing/README.md

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
# Durable Task Testing Utilities
2+
3+
This package provides testing utilities for the Durable Task Python SDK,
4+
including an in-memory backend that eliminates the need for external
5+
dependencies during testing.
6+
7+
## In-Memory Backend
8+
9+
The `InMemoryOrchestrationBackend` is a lightweight, in-memory implementation
10+
of the Durable Task backend that runs as a gRPC server. It's designed for
11+
testing scenarios where you want to test orchestrations without requiring a
12+
sidecar process or external storage.
13+
14+
### Features
15+
16+
- **In-memory state storage**: All orchestration state is stored in memory
17+
- **Full gRPC compatibility**: Implements the same gRPC interface as the production backend
18+
- **Thread-safe**: Safe for concurrent access from multiple threads
19+
- **Work item streaming**: Supports streaming work items to workers
20+
- **Event handling**: Supports raising events, timers, and sub-orchestrations
21+
- **Entity support**: Supports function-based and class-based entities
22+
- **Lifecycle management**: Supports suspend, resume, and terminate operations
23+
- **State waiting**: Built-in support for waiting on orchestration state changes
24+
25+
### Quick Start
26+
27+
```python
28+
import pytest
29+
from durabletask.testing import create_test_backend
30+
from durabletask.client import TaskHubGrpcClient, OrchestrationStatus
31+
from durabletask.worker import TaskHubGrpcWorker
32+
33+
@pytest.fixture
34+
def backend():
35+
"""Create an in-memory backend for testing."""
36+
backend = create_test_backend(port=50051)
37+
yield backend
38+
backend.stop()
39+
backend.reset()
40+
41+
def test_simple_orchestration(backend):
42+
# Create client and worker
43+
client = TaskHubGrpcClient(host_address="localhost:50051")
44+
worker = TaskHubGrpcWorker(host_address="localhost:50051")
45+
46+
# Define orchestrator and activity
47+
def hello_orchestrator(ctx, _):
48+
result = yield ctx.call_activity(say_hello, input="World")
49+
return result
50+
51+
def say_hello(ctx, name: str):
52+
return f"Hello, {name}!"
53+
54+
# Register orchestrator and activity with the worker
55+
worker.add_orchestrator(hello_orchestrator)
56+
worker.add_activity(say_hello)
57+
58+
# Start worker
59+
worker.start()
60+
61+
try:
62+
# Schedule orchestration
63+
instance_id = client.schedule_new_orchestration(hello_orchestrator)
64+
65+
# Wait for completion
66+
state = client.wait_for_orchestration_completion(instance_id, timeout=10)
67+
68+
# Verify results
69+
assert state.runtime_status == OrchestrationStatus.COMPLETED
70+
assert state.serialized_output == '"Hello, World!"'
71+
finally:
72+
worker.stop()
73+
```
74+
75+
### Advanced Usage
76+
77+
#### Testing with Multiple Ports
78+
79+
```python
80+
import random
81+
import pytest
82+
from durabletask.testing import create_test_backend
83+
from durabletask.client import TaskHubGrpcClient
84+
from durabletask.worker import TaskHubGrpcWorker
85+
86+
@pytest.fixture
87+
def backend():
88+
# Use a random port to avoid conflicts
89+
port = random.randint(50000, 60000)
90+
backend = create_test_backend(port=port)
91+
yield backend, port
92+
backend.stop()
93+
backend.reset()
94+
95+
def test_orchestration(backend):
96+
backend_instance, port = backend
97+
client = TaskHubGrpcClient(host_address=f"localhost:{port}")
98+
worker = TaskHubGrpcWorker(host_address=f"localhost:{port}")
99+
# ...
100+
```
101+
102+
#### Testing Event Handling
103+
104+
```python
105+
def test_external_events(backend):
106+
client = TaskHubGrpcClient(host_address="localhost:50051")
107+
worker = TaskHubGrpcWorker(host_address="localhost:50051")
108+
109+
def wait_for_event_orchestrator(ctx, _):
110+
event_data = yield ctx.wait_for_external_event("approval")
111+
return event_data
112+
113+
worker.add_orchestrator(wait_for_event_orchestrator)
114+
worker.start()
115+
116+
try:
117+
instance_id = client.schedule_new_orchestration(wait_for_event_orchestrator)
118+
119+
# Wait for orchestration to start
120+
client.wait_for_orchestration_start(instance_id, timeout=5)
121+
122+
# Raise an event
123+
client.raise_orchestration_event(instance_id, "approval", data="approved")
124+
125+
# Wait for completion
126+
state = client.wait_for_orchestration_completion(instance_id, timeout=10)
127+
128+
assert state.runtime_status == OrchestrationStatus.COMPLETED
129+
assert state.serialized_output == '"approved"'
130+
finally:
131+
worker.stop()
132+
```
133+
134+
#### Testing Sub-Orchestrations
135+
136+
```python
137+
def test_sub_orchestrations(backend):
138+
client = TaskHubGrpcClient(host_address="localhost:50051")
139+
worker = TaskHubGrpcWorker(host_address="localhost:50051")
140+
141+
def parent_orchestrator(ctx, _):
142+
result1 = yield ctx.call_sub_orchestrator(child_orchestrator, input=1)
143+
result2 = yield ctx.call_sub_orchestrator(child_orchestrator, input=2)
144+
return result1 + result2
145+
146+
def child_orchestrator(ctx, input: int):
147+
return input * 2
148+
149+
worker.add_orchestrator(parent_orchestrator)
150+
worker.add_orchestrator(child_orchestrator)
151+
worker.start()
152+
153+
try:
154+
instance_id = client.schedule_new_orchestration(parent_orchestrator)
155+
state = client.wait_for_orchestration_completion(instance_id, timeout=10)
156+
157+
assert state.runtime_status == OrchestrationStatus.COMPLETED
158+
assert state.serialized_output == "6" # (1*2) + (2*2)
159+
finally:
160+
worker.stop()
161+
```
162+
163+
#### Testing Timers
164+
165+
```python
166+
def test_durable_timers(backend):
167+
import time
168+
from datetime import timedelta
169+
170+
client = TaskHubGrpcClient(host_address="localhost:50051")
171+
worker = TaskHubGrpcWorker(host_address="localhost:50051")
172+
173+
def timer_orchestrator(ctx, _):
174+
fire_at = ctx.current_utc_datetime + timedelta(seconds=1)
175+
yield ctx.create_timer(fire_at)
176+
return "timer_fired"
177+
178+
worker.add_orchestrator(timer_orchestrator)
179+
worker.start()
180+
181+
try:
182+
start_time = time.time()
183+
instance_id = client.schedule_new_orchestration(timer_orchestrator)
184+
state = client.wait_for_orchestration_completion(instance_id, timeout=10)
185+
elapsed = time.time() - start_time
186+
187+
assert state.runtime_status == OrchestrationStatus.COMPLETED
188+
assert elapsed >= 1.0 # Timer should have waited at least 1 second
189+
finally:
190+
worker.stop()
191+
```
192+
193+
#### Testing Termination
194+
195+
```python
196+
def test_orchestration_termination(backend):
197+
client = TaskHubGrpcClient(host_address="localhost:50051")
198+
worker = TaskHubGrpcWorker(host_address="localhost:50051")
199+
200+
def long_running_orchestrator(ctx, _):
201+
yield ctx.wait_for_external_event("never_happens")
202+
return "completed"
203+
204+
worker.add_orchestrator(long_running_orchestrator)
205+
worker.start()
206+
207+
try:
208+
instance_id = client.schedule_new_orchestration(long_running_orchestrator)
209+
210+
# Wait for it to start
211+
client.wait_for_orchestration_start(instance_id, timeout=5)
212+
213+
# Terminate it
214+
client.terminate_orchestration(instance_id, output="terminated_by_test")
215+
216+
# Verify termination
217+
state = client.wait_for_orchestration_completion(instance_id, timeout=10)
218+
219+
assert state.runtime_status == OrchestrationStatus.TERMINATED
220+
finally:
221+
worker.stop()
222+
```
223+
224+
### Configuration Options
225+
226+
The `InMemoryOrchestrationBackend` supports the following configuration options:
227+
228+
- **port** (int): Port to listen on for gRPC connections (default: 50051)
229+
- **max_history_size** (int): Maximum number of history events per orchestration (default: 10000)
230+
231+
```python
232+
backend = InMemoryOrchestrationBackend(
233+
port=50051,
234+
max_history_size=100000 # Support larger orchestrations
235+
)
236+
backend.start()
237+
```
238+
239+
Or use the convenience factory, which starts the server automatically:
240+
241+
```python
242+
backend = create_test_backend(port=50051, max_history_size=10000)
243+
```
244+
245+
### Thread Safety
246+
247+
The in-memory backend is thread-safe and can be safely accessed from
248+
multiple threads. All state mutations are protected by locks to ensure
249+
consistency.
250+
251+
### Limitations
252+
253+
The in-memory backend is designed for testing and has some limitations compared to production backends:
254+
255+
1. **No persistence**: All state is lost when the backend is stopped
256+
2. **No distributed execution**: Runs in a single process
257+
3. **No history streaming**: StreamInstanceHistory is not implemented
258+
4. **No rewind**: RewindInstance is not implemented
259+
260+
### Best Practices
261+
262+
1. **Use fixtures**: Create pytest fixtures to manage backend lifecycle
263+
2. **Reset between tests**: Call `backend.reset()` to clear state between tests
264+
3. **Use random ports**: When running tests in parallel, use random port assignments
265+
4. **Set appropriate timeouts**: Use reasonable timeout values in wait operations
266+
5. **Clean up workers**: Always stop workers in finally blocks to prevent resource leaks
267+
268+
### Troubleshooting
269+
270+
#### Connection Errors
271+
272+
If you see connection errors:
273+
274+
- Ensure the backend is started before creating clients/workers
275+
- Verify the port is not already in use
276+
- Check that the host address matches the backend port
277+
278+
#### Timeouts
279+
280+
If tests timeout:
281+
282+
- Increase timeout values in `wait_for_orchestration_completion`
283+
- Check that workers are started and processing work items
284+
- Verify orchestrators and activities are registered correctly
285+
286+
#### State Not Found
287+
288+
If orchestration state is not found:
289+
290+
- Ensure you're using the correct instance ID
291+
- Verify the orchestration was successfully scheduled
292+
- Check that the backend wasn't reset between operations

durabletask/testing/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""Testing utilities for the Durable Task Python SDK."""
5+
6+
from durabletask.testing.in_memory_backend import (
7+
InMemoryOrchestrationBackend,
8+
create_test_backend,
9+
)
10+
11+
__all__ = [
12+
"InMemoryOrchestrationBackend",
13+
"create_test_backend",
14+
]

0 commit comments

Comments
 (0)