diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index 0f915e859..7f8a9d46c 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -65,14 +65,14 @@ jobs: repository: ${{ env.CHECKOUT_REPO }} ref: ${{ env.CHECKOUT_REF }} - uses: azure/setup-helm@v4 - - name: Determine latest Dapr Runtime version (including prerelease) + - name: Determine latest Dapr Runtime version run: | - RUNTIME_VERSION=$(curl -s "https://api.github.com/repos/dapr/dapr/releases" | grep '"tag_name"' | head -n 1 | cut -d ':' -f2 | tr -d '",v ') + RUNTIME_VERSION=$(curl -s "https://api.github.com/repos/dapr/dapr/releases" | grep '"tag_name"' | cut -d ':' -f2 | tr -d '",v ' | grep -v -E '\-rc|\-alpha|\-beta|\-pre' | sort -V | tail -1) echo "DAPR_RUNTIME_VER=$RUNTIME_VERSION" >> $GITHUB_ENV echo "Found $RUNTIME_VERSION" - - name: Determine latest Dapr Cli version (including prerelease) + - name: Determine latest Dapr Cli version run: | - CLI_VERSION=$(curl -s "https://api.github.com/repos/dapr/cli/releases" | grep '"tag_name"' | head -n 1 | cut -d ':' -f2 | tr -d '",v ') + CLI_VERSION=$(curl -s "https://api.github.com/repos/dapr/cli/releases" | grep '"tag_name"' | cut -d ':' -f2 | tr -d '",v ' | grep -v -E '\-rc|\-alpha|\-beta|\-pre' | sort -V | tail -1) echo "DAPR_CLI_VER=$CLI_VERSION" >> $GITHUB_ENV echo "Found $CLI_VERSION" - name: Set up Python ${{ matrix.python_ver }} diff --git a/README.md b/README.md index 1a4472c72..e212cbd43 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,14 @@ git clone https://github.com/dapr/python-sdk.git cd python-sdk ``` -2. Install a project in a editable mode +2. Create and activate a virtual environment + +```bash +python3 -m venv .venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate +``` + +3. Install a project in editable mode ```bash pip3 install -e . @@ -90,31 +97,31 @@ pip3 install -e ./ext/dapr-ext-langgraph/ pip3 install -e ./ext/dapr-ext-strands/ ``` -3. Install required packages +4. Install required packages ```bash pip3 install -r dev-requirements.txt ``` -4. Run linter and autofix +5. Run linter and autofix ```bash tox -e ruff ``` -5. Run unit-test +6. Run unit-test ```bash tox -e py311 ``` -6. Run type check +7. Run type check ```bash tox -e type ``` -7. Run examples +8. Run examples ```bash tox -e examples diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index ae1206c4c..fa69c359a 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -25,8 +25,10 @@ import grpc # type: ignore from google.protobuf.any_pb2 import Any as GrpcAny +from google.protobuf.duration_pb2 import Duration as GrpcDuration from google.protobuf.empty_pb2 import Empty as GrpcEmpty from google.protobuf.message import Message as GrpcMessage +from google.protobuf.struct_pb2 import Struct as GrpcStruct from grpc import ( # type: ignore RpcError, StatusCode, @@ -1880,6 +1882,8 @@ def converse_alpha2( temperature: Optional[float] = None, tools: Optional[List[conversation.ConversationTools]] = None, tool_choice: Optional[str] = None, + response_format: Optional[GrpcStruct] = None, + prompt_cache_retention: Optional[GrpcDuration] = None, ) -> conversation.ConversationResponseAlpha2: """Invoke an LLM using the conversation API (Alpha2) with tool calling support. @@ -1893,6 +1897,8 @@ def converse_alpha2( temperature: Optional temperature setting for the LLM to optimize for creativity or predictability tools: Optional list of tools available for the LLM to call tool_choice: Optional control over which tools can be called ('none', 'auto', 'required', or specific tool name) + response_format: Optional response format (google.protobuf.struct_pb2.Struct, ex: json_schema for structured output) + prompt_cache_retention: Optional retention for prompt cache (google.protobuf.duration_pb2.Duration) Returns: ConversationResponseAlpha2 containing the conversation results with choices and tool calls @@ -1949,6 +1955,10 @@ def converse_alpha2( request.temperature = temperature if tool_choice is not None: request.tool_choice = tool_choice + if response_format is not None and hasattr(request, 'response_format'): + request.response_format.CopyFrom(response_format) + if prompt_cache_retention is not None and hasattr(request, 'prompt_cache_retention'): + request.prompt_cache_retention.CopyFrom(prompt_cache_retention) try: response, call = self.retry_policy.run_rpc(self._stub.ConverseAlpha2.with_call, request) diff --git a/dapr/clients/grpc/conversation.py b/dapr/clients/grpc/conversation.py index d11c41979..8fc3db067 100644 --- a/dapr/clients/grpc/conversation.py +++ b/dapr/clients/grpc/conversation.py @@ -338,11 +338,46 @@ class ConversationResultAlpha2Choices: message: ConversationResultAlpha2Message +@dataclass +class ConversationResultAlpha2CompletionUsageCompletionTokensDetails: + """Breakdown of tokens used in the completion.""" + + accepted_prediction_tokens: int = 0 + audio_tokens: int = 0 + reasoning_tokens: int = 0 + rejected_prediction_tokens: int = 0 + + +@dataclass +class ConversationResultAlpha2CompletionUsagePromptTokensDetails: + """Breakdown of tokens used in the prompt.""" + + audio_tokens: int = 0 + cached_tokens: int = 0 + + +@dataclass +class ConversationResultAlpha2CompletionUsage: + """Token usage for one Alpha2 conversation result.""" + + completion_tokens: int = 0 + prompt_tokens: int = 0 + total_tokens: int = 0 + completion_tokens_details: Optional[ + ConversationResultAlpha2CompletionUsageCompletionTokensDetails + ] = None + prompt_tokens_details: Optional[ConversationResultAlpha2CompletionUsagePromptTokensDetails] = ( + None + ) + + @dataclass class ConversationResultAlpha2: """One of the outputs in Alpha2 response from conversation input.""" choices: List[ConversationResultAlpha2Choices] = field(default_factory=list) + model: Optional[str] = None + usage: Optional[ConversationResultAlpha2CompletionUsage] = None @dataclass @@ -657,5 +692,38 @@ def _get_outputs_from_grpc_response( ) ) - outputs.append(ConversationResultAlpha2(choices=choices)) + model: Optional[str] = None + usage: Optional[ConversationResultAlpha2CompletionUsage] = None + if hasattr(output, 'model') and getattr(output, 'model', None): + model = output.model + if hasattr(output, 'usage') and output.usage: + u = output.usage + completion_details: Optional[ + ConversationResultAlpha2CompletionUsageCompletionTokensDetails + ] = None + prompt_details: Optional[ConversationResultAlpha2CompletionUsagePromptTokensDetails] = ( + None + ) + if hasattr(u, 'completion_tokens_details') and u.completion_tokens_details: + cd = u.completion_tokens_details + completion_details = ConversationResultAlpha2CompletionUsageCompletionTokensDetails( + accepted_prediction_tokens=getattr(cd, 'accepted_prediction_tokens', 0) or 0, + audio_tokens=getattr(cd, 'audio_tokens', 0) or 0, + reasoning_tokens=getattr(cd, 'reasoning_tokens', 0) or 0, + rejected_prediction_tokens=getattr(cd, 'rejected_prediction_tokens', 0) or 0, + ) + if hasattr(u, 'prompt_tokens_details') and u.prompt_tokens_details: + pd = u.prompt_tokens_details + prompt_details = ConversationResultAlpha2CompletionUsagePromptTokensDetails( + audio_tokens=getattr(pd, 'audio_tokens', 0) or 0, + cached_tokens=getattr(pd, 'cached_tokens', 0) or 0, + ) + usage = ConversationResultAlpha2CompletionUsage( + completion_tokens=getattr(u, 'completion_tokens', 0) or 0, + prompt_tokens=getattr(u, 'prompt_tokens', 0) or 0, + total_tokens=getattr(u, 'total_tokens', 0) or 0, + completion_tokens_details=completion_details, + prompt_tokens_details=prompt_details, + ) + outputs.append(ConversationResultAlpha2(choices=choices, model=model, usage=usage)) return outputs diff --git a/examples/configuration/README.md b/examples/configuration/README.md index 4dc38f5e7..ec260b531 100644 --- a/examples/configuration/README.md +++ b/examples/configuration/README.md @@ -47,10 +47,10 @@ To run this example, use the following command: name: Run get configuration example match_order: none expected_stdout_lines: - - "== APP == Got key=orderId1 value=100 version=1 metadata={}" - - "== APP == Got key=orderId2 value=200 version=1 metadata={}" - - "== APP == Subscribe key=orderId2 value=210 version=2 metadata={}" - - "== APP == Unsubscribed successfully? True" + - "Got key=orderId1 value=100 version=1 metadata={}" + - "Got key=orderId2 value=200 version=1 metadata={}" + - "Subscribe key=orderId2 value=210 version=2 metadata={}" + - "Unsubscribed successfully? True" background: true timeout_seconds: 30 sleep: 3 @@ -75,7 +75,7 @@ docker exec dapr_redis redis-cli SET orderId2 "210||2" You should be able to see the following output: ``` -== APP == Got key=orderId1 value=100 version=1 -== APP == Got key=orderId2 value=200 version=1 -== APP == Subscribe key=orderId2 value=210 version=2 +Got key=orderId1 value=100 version=1 +Got key=orderId2 value=200 version=1 +Subscribe key=orderId2 value=210 version=2 ``` diff --git a/examples/conversation/README.md b/examples/conversation/README.md index 1d7789b32..4f630d46d 100644 --- a/examples/conversation/README.md +++ b/examples/conversation/README.md @@ -28,8 +28,8 @@ The Conversation API supports real LLM providers including: @@ -47,8 +47,8 @@ The Conversation API supports real LLM providers including: diff --git a/examples/crypto/README.md b/examples/crypto/README.md index 1dca3fb14..e9736b160 100644 --- a/examples/crypto/README.md +++ b/examples/crypto/README.md @@ -39,14 +39,14 @@ openssl rand -out keys/symmetric-key-256 32 @@ -53,23 +53,23 @@ timeout_seconds: 60 Expected output: ``` ... - == APP == Activate DemoActor actor! - == APP == has_value: False - == APP == INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/GetMyData HTTP/1.1" 200 OK - == APP == has_value: False - == APP == INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/GetMyData HTTP/1.1" 200 OK - == APP == set_my_data: {'data': 'new_data'} - == APP == INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/SetMyData HTTP/1.1" 200 OK - == APP == has_value: True - == APP == INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/GetMyData HTTP/1.1" 200 OK - == APP == set reminder to True - == APP == set reminder is done - == APP == INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/SetReminder HTTP/1.1" 200 OK - == APP == set_timer to True - == APP == set_timer is done - == APP == INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/SetTimer HTTP/1.1" 200 OK - == APP == receive_reminder is called - demo_reminder reminder - b'reminder_state' - == APP == clear_my_data + Activate DemoActor actor! + has_value: False + INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/GetMyData HTTP/1.1" 200 OK + has_value: False + INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/GetMyData HTTP/1.1" 200 OK + set_my_data: {'data': 'new_data'} + INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/SetMyData HTTP/1.1" 200 OK + has_value: True + INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/GetMyData HTTP/1.1" 200 OK + set reminder to True + set reminder is done + INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/SetReminder HTTP/1.1" 200 OK + set_timer to True + set_timer is done + INFO: 127.0.0.1:50739 - "PUT /actors/DemoActor/1/method/SetTimer HTTP/1.1" 200 OK + receive_reminder is called - demo_reminder reminder - b'reminder_state' + clear_my_data ... ``` @@ -78,19 +78,19 @@ timeout_seconds: 60 2. Run Demo client in new terminal window @@ -105,20 +105,20 @@ expected_stdout_lines: Expected output: ``` ... - == APP == call actor method via proxy.invoke_method() - == APP == b'null' - == APP == call actor method using rpc style - == APP == None - == APP == Actor reentrancy enabled: True - == APP == call SetMyData actor method to save the state - == APP == call GetMyData actor method to get the state - == APP == {'data': 'new_data', 'ts': datetime.datetime(2020, 11, 13, 0, 38, 36, 163000, tzinfo=tzutc())} - == APP == Register reminder - == APP == Register timer - == APP == waiting for 30 seconds - == APP == stop reminder - == APP == stop timer - == APP == clear actor state + call actor method via proxy.invoke_method() + b'null' + call actor method using rpc style + None + Actor reentrancy enabled: True + call SetMyData actor method to save the state + call GetMyData actor method to get the state + {'data': 'new_data', 'ts': datetime.datetime(2020, 11, 13, 0, 38, 36, 163000, tzinfo=tzutc())} + Register reminder + Register timer + waiting for 30 seconds + stop reminder + stop timer + clear actor state ``` diff --git a/examples/demo_workflow/README.md b/examples/demo_workflow/README.md index cf75112a2..58e4d3bb5 100644 --- a/examples/demo_workflow/README.md +++ b/examples/demo_workflow/README.md @@ -25,28 +25,28 @@ pip3 install -r demo_workflow/requirements.txt @@ -47,13 +47,13 @@ dapr run --app-id=locksapp --app-protocol grpc --resources-path components/ pyth The output should be as follows: ``` -== APP == Will try to acquire a lock from lock store named [lockstore] -== APP == The lock is for a resource named [example-lock-resource] -== APP == The client identifier is [example-client-id] -== APP == The lock will will expire in 60 seconds. -== APP == Lock acquired successfully!!! -== APP == We already released the lock so unlocking will not work. -== APP == We tried to unlock it anyway and got back [UnlockResponseStatus.lock_does_not_exist] +Will try to acquire a lock from lock store named [lockstore] +The lock is for a resource named [example-lock-resource] +The client identifier is [example-client-id] +The lock will will expire in 60 seconds. +Lock acquired successfully!!! +We already released the lock so unlocking will not work. +We tried to unlock it anyway and got back [UnlockResponseStatus.lock_does_not_exist] ``` ## Error Handling diff --git a/examples/error_handling/README.md b/examples/error_handling/README.md index 9f09b8946..480d73635 100644 --- a/examples/error_handling/README.md +++ b/examples/error_handling/README.md @@ -25,15 +25,15 @@ To run this example, the following code can be used: @@ -45,13 +45,13 @@ dapr run --resources-path components -- python3 error_handling.py The output should be as follows: ``` -== APP == Status code: INVALID_ARGUMENT -== APP == Message: input key/keyPrefix 'key||' can't contain '||' -== APP == Error code: DAPR_STATE_ILLEGAL_KEY -== APP == Error info(reason): DAPR_STATE_ILLEGAL_KEY -== APP == Resource info (resource type): state -== APP == Resource info (resource name): statestore -== APP == Bad request (field): key|| -== APP == Bad request (description): input key/keyPrefix 'key||' can't contain '||' -== APP == JSON: {"status_code": "INVALID_ARGUMENT", "message": "input key/keyPrefix 'key||' can't contain '||'", "error_code": "DAPR_STATE_ILLEGAL_KEY", "details": {"error_info": {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "reason": "DAPR_STATE_ILLEGAL_KEY", "domain": "dapr.io"}, "retry_info": null, "debug_info": null, "quota_failure": null, "precondition_failure": null, "bad_request": {"@type": "type.googleapis.com/google.rpc.BadRequest", "field_violations": [{"field": "key||", "description": "input key/keyPrefix 'key||' can't contain '||'"}]}, "request_info": null, "resource_info": {"@type": "type.googleapis.com/google.rpc.ResourceInfo", "resource_type": "state", "resource_name": "statestore"}, "help": null, "localized_message": null}} +Status code: INVALID_ARGUMENT +Message: input key/keyPrefix 'key||' can't contain '||' +Error code: DAPR_STATE_ILLEGAL_KEY +Error info(reason): DAPR_STATE_ILLEGAL_KEY +Resource info (resource type): state +Resource info (resource name): statestore +Bad request (field): key|| +Bad request (description): input key/keyPrefix 'key||' can't contain '||' +JSON: {"status_code": "INVALID_ARGUMENT", "message": "input key/keyPrefix 'key||' can't contain '||'", "error_code": "DAPR_STATE_ILLEGAL_KEY", "details": {"error_info": {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "reason": "DAPR_STATE_ILLEGAL_KEY", "domain": "dapr.io"}, "retry_info": null, "debug_info": null, "quota_failure": null, "precondition_failure": null, "bad_request": {"@type": "type.googleapis.com/google.rpc.BadRequest", "field_violations": [{"field": "key||", "description": "input key/keyPrefix 'key||' can't contain '||'"}]}, "request_info": null, "resource_info": {"@type": "type.googleapis.com/google.rpc.ResourceInfo", "resource_type": "state", "resource_name": "statestore"}, "help": null, "localized_message": null}} ``` diff --git a/examples/grpc_proxying/README.md b/examples/grpc_proxying/README.md index 73b4fafe5..cd9aaa892 100644 --- a/examples/grpc_proxying/README.md +++ b/examples/grpc_proxying/README.md @@ -23,8 +23,9 @@ Run the following command in a terminal/command-prompt: @@ -42,7 +43,7 @@ In another terminal/command prompt run: diff --git a/examples/invoke-binding/README.md b/examples/invoke-binding/README.md index e92d9c65c..e018640b2 100644 --- a/examples/invoke-binding/README.md +++ b/examples/invoke-binding/README.md @@ -37,9 +37,9 @@ docker compose -f ./docker-compose-single-kafka.yml up -d @@ -59,9 +59,9 @@ In another terminal/command-prompt run: diff --git a/examples/invoke-custom-data/README.md b/examples/invoke-custom-data/README.md index 6c1f87f19..1a1ab3881 100644 --- a/examples/invoke-custom-data/README.md +++ b/examples/invoke-custom-data/README.md @@ -33,7 +33,7 @@ To run this example, the following steps should be followed: @@ -49,9 +49,9 @@ sleep: 5 @@ -50,14 +50,14 @@ Start the caller: diff --git a/examples/invoke-simple/README.md b/examples/invoke-simple/README.md index b2af5c60f..c8f452a6f 100644 --- a/examples/invoke-simple/README.md +++ b/examples/invoke-simple/README.md @@ -24,8 +24,8 @@ Run the following command in a terminal/command-prompt: @@ -42,10 +42,10 @@ In another terminal/command prompt run: diff --git a/examples/jobs/README.md b/examples/jobs/README.md index 3e2417472..3964abf68 100644 --- a/examples/jobs/README.md +++ b/examples/jobs/README.md @@ -34,23 +34,23 @@ To run this example, the following code can be utilized: @@ -74,19 +74,19 @@ Run the following command in a terminal/command-prompt: diff --git a/examples/langgraph-checkpointer/README.md b/examples/langgraph-checkpointer/README.md index 0fce355d7..15c199a3c 100644 --- a/examples/langgraph-checkpointer/README.md +++ b/examples/langgraph-checkpointer/README.md @@ -34,14 +34,14 @@ Run the following command in a terminal/command prompt: @@ -56,18 +56,18 @@ dapr run --app-id=my-metadata-app --app-protocol grpc --resources-path component The output should be as follows: ``` -== APP == First, we will assign a new custom label to Dapr sidecar -== APP == Now, we will fetch the sidecar's metadata -== APP == And this is what we got: -== APP == application_id: my-metadata-app -== APP == active_actors_count: {} -== APP == registered_components: -== APP == name=lockstore type=lock.redis version= capabilities=[] -== APP == name=pubsub type=pubsub.redis version=v1 capabilities=[] -== APP == name=statestore type=state.redis version=v1 capabilities=['ACTOR', 'ETAG', 'KEYS_LIKE', 'TRANSACTIONAL', 'TTL'] -== APP == We will update our custom label value and check it was persisted -== APP == We added a custom label named [is-this-our-metadata-example] -== APP == Its old value was [yes] but now it is [You bet it is!] +First, we will assign a new custom label to Dapr sidecar +Now, we will fetch the sidecar's metadata +And this is what we got: + application_id: my-metadata-app + active_actors_count: {} + registered_components: + name=lockstore type=lock.redis version= capabilities=[] + name=pubsub type=pubsub.redis version=v1 capabilities=[] + name=statestore type=state.redis version=v1 capabilities=['ACTOR', 'ETAG', 'KEYS_LIKE', 'TRANSACTIONAL', 'TTL'] +We will update our custom label value and check it was persisted +We added a custom label named [is-this-our-metadata-example] +Its old value was [yes] but now it is [You bet it is!] ``` ## Error Handling diff --git a/examples/pubsub-simple/README.md b/examples/pubsub-simple/README.md index 6ee7ad89d..1cf38c9c1 100644 --- a/examples/pubsub-simple/README.md +++ b/examples/pubsub-simple/README.md @@ -27,23 +27,23 @@ Run the following command in a terminal/command prompt: ```bash # 2. Start Publisher -dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py +dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check -- python3 publisher.py ``` diff --git a/examples/pubsub-streaming-async/README.md b/examples/pubsub-streaming-async/README.md index e626fe75a..35c399b13 100644 --- a/examples/pubsub-streaming-async/README.md +++ b/examples/pubsub-streaming-async/README.md @@ -27,12 +27,12 @@ Run the following command in a terminal/command prompt: @@ -47,12 +47,12 @@ dapr run --app-id=secretsapp --app-protocol grpc --resources-path components/ py You should be able to see the following output: ``` -== APP == Got! -== APP == {'secretKey': 'secretValue'} -== APP == Got! -== APP == [('random', {'random': 'randomValue'}), ('secretKey', {'secretKey': 'secretValue'})] -== APP == Got! -== APP == {'random': 'randomValue'} +Got! +{'secretKey': 'secretValue'} +Got! +[('random', {'random': 'randomValue'}), ('secretKey', {'secretKey': 'secretValue'})] +Got! +{'random': 'randomValue'} ``` In `config.yaml` you can see that the `localsecretstore` secret store has been defined with some restricted permissions. @@ -78,11 +78,11 @@ To see this run the same `example.py` app with the following command: @@ -96,11 +96,11 @@ The above command overrides the default configuration file with the `--config` f The output should be as follows: ``` -== APP == Got! -== APP == {'secretKey': 'secretValue'} -== APP == Got! -== APP == [('secretKey', {'secretKey': 'secretValue'})] -== APP == Got expected error for accessing random key +Got! +{'secretKey': 'secretValue'} +Got! +[('secretKey', {'secretKey': 'secretValue'})] +Got expected error for accessing random key ``` It can be seen that when it tried to get the random key again, it fails as by default the access is denied for any key diff --git a/examples/state_store/README.md b/examples/state_store/README.md index 3569d3603..66627c9bb 100644 --- a/examples/state_store/README.md +++ b/examples/state_store/README.md @@ -34,17 +34,17 @@ To run this example, the following code can be utilized: @@ -56,27 +56,27 @@ dapr run --resources-path components/ -- python3 state_store.py The output should be as follows: ``` -== APP == State store has successfully saved value_1 with key_1 as key +State store has successfully saved value_1 with key_1 as key -== APP == Cannot save due to bad etag. ErrorCode=StatusCode.ABORTED +Cannot save due to bad etag. ErrorCode=StatusCode.ABORTED -== APP == State store has successfully saved value_2 with key_2 as key +State store has successfully saved value_2 with key_2 as key -== APP == State store has successfully saved value_3 with key_3 as key +State store has successfully saved value_3 with key_3 as key -== APP == Cannot save bulk due to bad etags. ErrorCode=StatusCode.ABORTED +Cannot save bulk due to bad etags. ErrorCode=StatusCode.ABORTED -== APP == Got value=b'value_1' eTag=1 +Got value=b'value_1' eTag=1 -== APP == Got items with etags: [(b'value_1_updated', '2'), (b'value_2', '2')] +Got items with etags: [(b'value_1_updated', '2'), (b'value_2', '2')] -== APP == Transaction with outbox pattern executed successfully! +Transaction with outbox pattern executed successfully! -== APP == Got value after outbox pattern: b'val1' +Got value after outbox pattern: b'val1' -== APP == Got values after transaction delete: [b'', b''] +Got values after transaction delete: [b'', b''] -== APP == Got value after delete: b'' +Got value after delete: b'' ``` ## Error Handling diff --git a/examples/state_store_query/README.md b/examples/state_store_query/README.md index 1c257d081..cab871789 100644 --- a/examples/state_store_query/README.md +++ b/examples/state_store_query/README.md @@ -56,14 +56,14 @@ Now run the app @@ -74,14 +74,14 @@ dapr run --app-id queryexample --resources-path components/ -- python3 state_sto You should be able to see the following output: ``` -== APP == 1 {"city": "Seattle", "person": {"id": 1036.0, "org": "Dev Ops"}, "state": "WA"} -== APP == 4 {"city": "Spokane", "person": {"id": 1042.0, "org": "Dev Ops"}, "state": "WA"} -== APP == 10 {"city": "New York", "person": {"id": 1054.0, "org": "Dev Ops"}, "state": "NY"} -== APP == Token: 3 -== APP == 9 {"city": "San Diego", "person": {"id": 1002.0, "org": "Finance"}, "state": "CA"} -== APP == 7 {"city": "San Francisco", "person": {"id": 1015.0, "org": "Dev Ops"}, "state": "CA"} -== APP == 3 {"city": "Sacramento", "person": {"id": 1071.0, "org": "Finance"}, "state": "CA"} -== APP == Token: 6 +1 {"city": "Seattle", "person": {"id": 1036.0, "org": "Dev Ops"}, "state": "WA"} +4 {"city": "Spokane", "person": {"id": 1042.0, "org": "Dev Ops"}, "state": "WA"} +10 {"city": "New York", "person": {"id": 1054.0, "org": "Dev Ops"}, "state": "NY"} +Token: 3 +9 {"city": "San Diego", "person": {"id": 1002.0, "org": "Finance"}, "state": "CA"} +7 {"city": "San Francisco", "person": {"id": 1015.0, "org": "Dev Ops"}, "state": "CA"} +3 {"city": "Sacramento", "person": {"id": 1071.0, "org": "Finance"}, "state": "CA"} +Token: 6 ``` Cleanup diff --git a/examples/w3c-tracing/README.md b/examples/w3c-tracing/README.md index 58faa8590..0dc892ed8 100644 --- a/examples/w3c-tracing/README.md +++ b/examples/w3c-tracing/README.md @@ -227,16 +227,16 @@ name: Run caller app with tracing match_order: none expected_stdout_lines: - "✅ You're up and running! Both Dapr and your app logs will appear here." - - '== APP == application/json' - - '== APP == SAY' - - '== APP == text/plain' - - '== APP == SLEEP' - - '== APP == Trace ID matches after forwarding' - - '== APP == application/json' - - '== APP == SAY' - - '== APP == text/plain' - - '== APP == SLEEP' - - '== APP == Trace ID matches after forwarding' + - 'application/json' + - 'SAY' + - 'text/plain' + - 'SLEEP' + - 'Trace ID matches after forwarding' + - 'application/json' + - 'SAY' + - 'text/plain' + - 'SLEEP' + - 'Trace ID matches after forwarding' - "✅ Exited App successfully" background: true sleep: 10 diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 22a55e868..d3c1a4474 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -34,25 +34,25 @@ It shows several Dapr Workflow features including: @@ -64,25 +64,25 @@ dapr run --app-id wf-simple-example -- python3 simple.py The output of this example should look like this: ``` - - "== APP == Hi Counter!" - - "== APP == New counter value is: 1!" - - "== APP == New counter value is: 11!" - - "== APP == Retry count value is: 0!" - - "== APP == Retry count value is: 1! This print statement verifies retry" - - "== APP == Appending 1 to child_orchestrator_string!" - - "== APP == Appending a to child_orchestrator_string!" - - "== APP == Appending a to child_orchestrator_string!" - - "== APP == Appending 2 to child_orchestrator_string!" - - "== APP == Appending b to child_orchestrator_string!" - - "== APP == Appending b to child_orchestrator_string!" - - "== APP == Appending 3 to child_orchestrator_string!" - - "== APP == Appending c to child_orchestrator_string!" - - "== APP == Appending c to child_orchestrator_string!" - - "== APP == Get response from hello_world_wf after pause call: SUSPENDED" - - "== APP == Get response from hello_world_wf after resume call: RUNNING" - - "== APP == New counter value is: 111!" - - "== APP == New counter value is: 1111!" - - "== APP == Workflow completed! Result: Completed" + - "Hi Counter!" + - "New counter value is: 1!" + - "New counter value is: 11!" + - "Retry count value is: 0!" + - "Retry count value is: 1! This print statement verifies retry" + - "Appending 1 to child_orchestrator_string!" + - "Appending a to child_orchestrator_string!" + - "Appending a to child_orchestrator_string!" + - "Appending 2 to child_orchestrator_string!" + - "Appending b to child_orchestrator_string!" + - "Appending b to child_orchestrator_string!" + - "Appending 3 to child_orchestrator_string!" + - "Appending c to child_orchestrator_string!" + - "Appending c to child_orchestrator_string!" + - "Get response from hello_world_wf after pause call: SUSPENDED" + - "Get response from hello_world_wf after resume call: RUNNING" + - "New counter value is: 111!" + - "New counter value is: 1111!" + - "Workflow completed! Result: Completed" ``` ### Simple Workflow with async workflow client @@ -100,25 +100,25 @@ It shows several Dapr Workflow features including: @@ -130,25 +130,25 @@ dapr run --app-id wf-simple-aio-example -- python3 simple_aio_client.py The output of this example should look like this: ``` - - "== APP == Hi Counter!" - - "== APP == New counter value is: 1!" - - "== APP == New counter value is: 11!" - - "== APP == Retry count value is: 0!" - - "== APP == Retry count value is: 1! This print statement verifies retry" - - "== APP == Appending 1 to child_orchestrator_string!" - - "== APP == Appending a to child_orchestrator_string!" - - "== APP == Appending a to child_orchestrator_string!" - - "== APP == Appending 2 to child_orchestrator_string!" - - "== APP == Appending b to child_orchestrator_string!" - - "== APP == Appending b to child_orchestrator_string!" - - "== APP == Appending 3 to child_orchestrator_string!" - - "== APP == Appending c to child_orchestrator_string!" - - "== APP == Appending c to child_orchestrator_string!" - - "== APP == Get response from hello_world_wf after pause call: SUSPENDED" - - "== APP == Get response from hello_world_wf after resume call: RUNNING" - - "== APP == New counter value is: 111!" - - "== APP == New counter value is: 1111!" - - "== APP == Workflow completed! Result: Completed" + - "Hi Counter!" + - "New counter value is: 1!" + - "New counter value is: 11!" + - "Retry count value is: 0!" + - "Retry count value is: 1! This print statement verifies retry" + - "Appending 1 to child_orchestrator_string!" + - "Appending a to child_orchestrator_string!" + - "Appending a to child_orchestrator_string!" + - "Appending 2 to child_orchestrator_string!" + - "Appending b to child_orchestrator_string!" + - "Appending b to child_orchestrator_string!" + - "Appending 3 to child_orchestrator_string!" + - "Appending c to child_orchestrator_string!" + - "Appending c to child_orchestrator_string!" + - "Get response from hello_world_wf after pause call: SUSPENDED" + - "Get response from hello_world_wf after resume call: RUNNING" + - "New counter value is: 111!" + - "New counter value is: 1111!" + - "Workflow completed! Result: Completed" ``` ### Task Chaining @@ -157,10 +157,10 @@ This example demonstrates how to chain "activity" tasks together in a workflow. @@ -172,11 +172,11 @@ dapr run --app-id wfexample -- python3 task_chaining.py The output of this example should look like this: ``` -== APP == Workflow started. Instance ID: b716208586c24829806b44b62816b598 -== APP == Step 1: Received input: 42. -== APP == Step 2: Received input: 43. -== APP == Step 3: Received input: 86. -== APP == Workflow completed! Status: WorkflowStatus.COMPLETED +Workflow started. Instance ID: b716208586c24829806b44b62816b598 +Step 1: Received input: 42. +Step 2: Received input: 43. +Step 3: Received input: 86. +Workflow completed! Status: WorkflowStatus.COMPLETED ``` ### Fan-out/Fan-in @@ -187,27 +187,27 @@ This example demonstrates how to fan-out a workflow into multiple parallel tasks name: Run the fan-out/fan-in example match_order: none expected_stdout_lines: - - "== APP == Processing work item: 1." - - "== APP == Processing work item: 2." - - "== APP == Processing work item: 3." - - "== APP == Processing work item: 4." - - "== APP == Processing work item: 5." - - "== APP == Processing work item: 6." - - "== APP == Processing work item: 7." - - "== APP == Processing work item: 8." - - "== APP == Processing work item: 9." - - "== APP == Processing work item: 10." - - "== APP == Work item 1 processed. Result: 2." - - "== APP == Work item 2 processed. Result: 4." - - "== APP == Work item 3 processed. Result: 6." - - "== APP == Work item 4 processed. Result: 8." - - "== APP == Work item 5 processed. Result: 10." - - "== APP == Work item 6 processed. Result: 12." - - "== APP == Work item 7 processed. Result: 14." - - "== APP == Work item 8 processed. Result: 16." - - "== APP == Work item 9 processed. Result: 18." - - "== APP == Work item 10 processed. Result: 20." - - "== APP == Final result: 110." + - "Processing work item: 1." + - "Processing work item: 2." + - "Processing work item: 3." + - "Processing work item: 4." + - "Processing work item: 5." + - "Processing work item: 6." + - "Processing work item: 7." + - "Processing work item: 8." + - "Processing work item: 9." + - "Processing work item: 10." + - "Work item 1 processed. Result: 2." + - "Work item 2 processed. Result: 4." + - "Work item 3 processed. Result: 6." + - "Work item 4 processed. Result: 8." + - "Work item 5 processed. Result: 10." + - "Work item 6 processed. Result: 12." + - "Work item 7 processed. Result: 14." + - "Work item 8 processed. Result: 16." + - "Work item 9 processed. Result: 18." + - "Work item 10 processed. Result: 20." + - "Final result: 110." timeout_seconds: 30 --> @@ -219,28 +219,28 @@ dapr run --app-id wfexample -- python3 fan_out_fan_in.py The output of this sample should look like this: ``` -== APP == Workflow started. Instance ID: 2e656befbb304e758776e30642b75944 -== APP == Processing work item: 1. -== APP == Processing work item: 2. -== APP == Processing work item: 3. -== APP == Processing work item: 4. -== APP == Processing work item: 5. -== APP == Processing work item: 6. -== APP == Processing work item: 7. -== APP == Processing work item: 8. -== APP == Processing work item: 9. -== APP == Processing work item: 10. -== APP == Work item 1 processed. Result: 2. -== APP == Work item 2 processed. Result: 4. -== APP == Work item 3 processed. Result: 6. -== APP == Work item 4 processed. Result: 8. -== APP == Work item 5 processed. Result: 10. -== APP == Work item 6 processed. Result: 12. -== APP == Work item 7 processed. Result: 14. -== APP == Work item 8 processed. Result: 16. -== APP == Work item 9 processed. Result: 18. -== APP == Work item 10 processed. Result: 20. -== APP == Final result: 110. +Workflow started. Instance ID: 2e656befbb304e758776e30642b75944 +Processing work item: 1. +Processing work item: 2. +Processing work item: 3. +Processing work item: 4. +Processing work item: 5. +Processing work item: 6. +Processing work item: 7. +Processing work item: 8. +Processing work item: 9. +Processing work item: 10. +Work item 1 processed. Result: 2. +Work item 2 processed. Result: 4. +Work item 3 processed. Result: 6. +Work item 4 processed. Result: 8. +Work item 5 processed. Result: 10. +Work item 6 processed. Result: 12. +Work item 7 processed. Result: 14. +Work item 8 processed. Result: 16. +Work item 9 processed. Result: 18. +Work item 10 processed. Result: 20. +Final result: 110. ``` Note that the ordering of the work-items is non-deterministic since they are all running in parallel. @@ -338,32 +338,32 @@ When you run the example, you will see output like this: ``` -### Cross-app Workflow +### Multi-app Workflows This example demonstrates how to call child workflows and activities in different apps. The multiple Dapr CLI instances can be started using the following commands: ```sh -dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 cross-app3.py & -dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 cross-app2.py & -dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 cross-app1.py +dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 multi-app3.py & +dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 multi-app2.py & +dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 multi-app1.py ``` @@ -379,34 +379,34 @@ among others. This shows that the workflow calls are working as expected. #### Error handling on activity calls -This example demonstrates how the error handling works on activity calls across apps. +This example demonstrates how the error handling works on activity calls in multi-app workflows. -Error handling on activity calls across apps works as normal workflow activity calls. +Error handling on activity calls in multi-app workflows works as normal workflow activity calls. In this example we run `app3` in failing mode, which makes the activity call return error constantly. The activity call from `app2` will fail after the retry policy is exhausted. ```sh export ERROR_ACTIVITY_MODE=true -dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 cross-app3.py & -dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 cross-app2.py & -dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 cross-app1.py +dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 multi-app3.py & +dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 multi-app2.py & +dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 multi-app1.py ``` @@ -424,30 +424,30 @@ among others. This shows that the activity calls are failing as expected, and th #### Error handling on workflow calls -This example demonstrates how the error handling works on workflow calls across apps. +This example demonstrates how the error handling works on workflow calls in multi-app workflows. -Error handling on workflow calls across apps works as normal workflow calls. +Error handling on workflow calls in multi-app workflows works as normal workflow calls. In this example we run `app2` in failing mode, which makes the workflow call return error constantly. The workflow call from `app1` will fail after the retry policy is exhausted. ```sh export ERROR_WORKFLOW_MODE=true -dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 cross-app3.py & -dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 cross-app2.py & -dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 cross-app1.py +dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 multi-app3.py & +dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 multi-app2.py & +dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 multi-app1.py ``` @@ -477,29 +477,29 @@ It had to be done in two parts because the runtime needs to be restarted in orde name: Run the versioning example match_order: none expected_stdout_lines: - - "== APP == test1: triggering workflow" - - "== APP == test1: Received workflow call for version1" - - "== APP == test1: Finished workflow for version1" - - "== APP == test2: triggering workflow" - - "== APP == test2: Received workflow call for version1" - - "== APP == test2: Finished workflow for version1" - - "== APP == test3: triggering workflow" - - "== APP == test3: Received workflow call for version2" - - "== APP == test3: Finished workflow for version2" - - "== APP == test4: start" - - "== APP == test4: patch1 is patched" - - "== APP == test5: start" - - "== APP == test5: patch1 is not patched" - - "== APP == test5: patch2 is patched" - - "== APP == test6: start" - - "== APP == test6: patch1 is patched" - - "== APP == test6: patch2 is patched" - - "== APP == test7: Received workflow call for version1" - - "== APP == test7: Workflow is stalled" - - "== APP == test8: Workflow is stalled" - - "== APP == test100: part2" - - "== APP == test100: Finished stalled version1 workflow" - - "== APP == test100: Finished stalled patching workflow" + - "test1: triggering workflow" + - "test1: Received workflow call for version1" + - "test1: Finished workflow for version1" + - "test2: triggering workflow" + - "test2: Received workflow call for version1" + - "test2: Finished workflow for version1" + - "test3: triggering workflow" + - "test3: Received workflow call for version2" + - "test3: Finished workflow for version2" + - "test4: start" + - "test4: patch1 is patched" + - "test5: start" + - "test5: patch1 is not patched" + - "test5: patch2 is patched" + - "test6: start" + - "test6: patch1 is patched" + - "test6: patch2 is patched" + - "test7: Received workflow call for version1" + - "test7: Workflow is stalled" + - "test8: Workflow is stalled" + - "test100: part2" + - "test100: Finished stalled version1 workflow" + - "test100: Finished stalled patching workflow" timeout_seconds: 60 --> diff --git a/examples/workflow/child_workflow.py b/examples/workflow/child_workflow.py index 57ab2fc3e..20b675ea0 100644 --- a/examples/workflow/child_workflow.py +++ b/examples/workflow/child_workflow.py @@ -10,7 +10,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time import dapr.ext.workflow as wf @@ -40,12 +39,10 @@ def child_workflow(ctx: wf.DaprWorkflowContext): if __name__ == '__main__': wfr.start() - time.sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() instance_id = wf_client.schedule_new_workflow(workflow=main_workflow) - # Wait for the workflow to complete - time.sleep(5) + wf_client.wait_for_workflow_completion(instance_id) wfr.shutdown() diff --git a/examples/workflow/fan_out_fan_in.py b/examples/workflow/fan_out_fan_in.py index f625ea287..9cd1ff6cb 100644 --- a/examples/workflow/fan_out_fan_in.py +++ b/examples/workflow/fan_out_fan_in.py @@ -55,7 +55,6 @@ def process_results(ctx, final_result: int): if __name__ == '__main__': wfr.start() - time.sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() instance_id = wf_client.schedule_new_workflow(workflow=batch_processing_workflow, input=10) diff --git a/examples/workflow/cross-app1.py b/examples/workflow/multi-app1.py similarity index 93% rename from examples/workflow/cross-app1.py rename to examples/workflow/multi-app1.py index 1ef7b48da..9b968def3 100644 --- a/examples/workflow/cross-app1.py +++ b/examples/workflow/multi-app1.py @@ -10,7 +10,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time from datetime import timedelta import dapr.ext.workflow as wf @@ -46,13 +45,11 @@ def app1_workflow(ctx: wf.DaprWorkflowContext): if __name__ == '__main__': wfr.start() - time.sleep(10) # wait for workflow runtime to start wf_client = wf.DaprWorkflowClient() print('app1 - triggering app1 workflow', flush=True) instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow) - # Wait for the workflow to complete - time.sleep(7) + wf_client.wait_for_workflow_completion(instance_id) wfr.shutdown() diff --git a/examples/workflow/cross-app2.py b/examples/workflow/multi-app2.py similarity index 95% rename from examples/workflow/cross-app2.py rename to examples/workflow/multi-app2.py index 2af65912c..7e97b58c0 100644 --- a/examples/workflow/cross-app2.py +++ b/examples/workflow/multi-app2.py @@ -46,5 +46,5 @@ def app2_workflow(ctx: wf.DaprWorkflowContext): if __name__ == '__main__': wfr.start() - time.sleep(15) # wait for workflow runtime to start + time.sleep(15) # Keep the workflow runtime running for a while to process workflows wfr.shutdown() diff --git a/examples/workflow/cross-app3.py b/examples/workflow/multi-app3.py similarity index 93% rename from examples/workflow/cross-app3.py rename to examples/workflow/multi-app3.py index 4bcc158a0..6b72de7e4 100644 --- a/examples/workflow/cross-app3.py +++ b/examples/workflow/multi-app3.py @@ -29,5 +29,5 @@ def app3_activity(ctx: wf.DaprWorkflowContext) -> int: if __name__ == '__main__': wfr.start() - time.sleep(15) # wait for workflow runtime to start + time.sleep(15) # Keep the workflow runtime alive for a while to process requests wfr.shutdown() diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index cc0cfe8ba..d90c72dc2 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -68,12 +68,12 @@ def call_activity( retry_policy: Optional[RetryPolicy] = None, app_id: Optional[str] = None, ) -> task.Task[TOutput]: - # Handle string activity names for cross-app scenarios + # Handle string activity names for multi-app workflow scenarios if isinstance(activity, str): activity_name = activity if app_id is not None: self._logger.debug( - f'{self.instance_id}: Creating cross-app activity {activity_name} for app {app_id}' + f'{self.instance_id}: Creating multi-app workflow activity {activity_name} for app {app_id}' ) else: self._logger.debug(f'{self.instance_id}: Creating activity {activity_name}') @@ -106,7 +106,7 @@ def call_child_workflow( retry_policy: Optional[RetryPolicy] = None, app_id: Optional[str] = None, ) -> task.Task[TOutput]: - # Handle string workflow names for cross-app scenarios + # Handle string workflow names for multi-app workflow scenarios if isinstance(workflow, str): workflow_name = workflow self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow_name}') diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py index b93e7074f..dd33cab86 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py @@ -32,5 +32,8 @@ def warning(self, msg, *args, **kwargs): def error(self, msg, *args, **kwargs): self._logger.error(msg, *args, **kwargs) + def exception(self, msg, *args, **kwargs): + self._logger.exception(msg, *args, **kwargs) + def critical(self, msg, *args, **kwargs): self._logger.critical(msg, *args, **kwargs) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py index 8453e16ef..d41841472 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py @@ -118,7 +118,7 @@ def call_activity( Parameters ---------- activity: Activity[TInput, TOutput] | str - A reference to the activity function to call, or a string name for cross-app activities. + A reference to the activity function to call, or a string name for multi-app workflow activities. input: TInput | None The JSON-serializable input (or None) to pass to the activity. app_id: str | None @@ -145,7 +145,7 @@ def call_child_workflow( Parameters ---------- orchestrator: Orchestrator[TInput, TOutput] | str - A reference to the orchestrator function to call, or a string name for cross-app workflows. + A reference to the orchestrator function to call, or a string name for multi-app workflows. input: TInput The optional JSON-serializable input to pass to the orchestrator function. instance_id: str diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 58b0912a0..9f5edb2b4 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -14,6 +14,7 @@ """ import inspect +import time from functools import wraps from typing import Optional, Sequence, TypeVar, Union @@ -54,8 +55,11 @@ def __init__( maximum_concurrent_activity_work_items: Optional[int] = None, maximum_concurrent_orchestration_work_items: Optional[int] = None, maximum_thread_pool_workers: Optional[int] = None, + worker_ready_timeout: Optional[float] = None, ): self._logger = Logger('WorkflowRuntime', logger_options) + self._worker_ready_timeout = 30.0 if worker_ready_timeout is None else worker_ready_timeout + metadata = tuple() if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) @@ -86,10 +90,20 @@ def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): """Responsible to call Workflow function in orchestrationWrapper""" - daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) - if inp is None: - return fn(daprWfContext) - return fn(daprWfContext, inp) + instance_id = getattr(ctx, 'instance_id', 'unknown') + + try: + daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) + if inp is None: + result = fn(daprWfContext) + else: + result = fn(daprWfContext, inp) + return result + except Exception as e: + self._logger.exception( + f'Workflow execution failed - instance_id: {instance_id}, error: {e}' + ) + raise if hasattr(fn, '_workflow_registered'): # whenever a workflow is registered, it has a _dapr_alternate_name attribute @@ -152,10 +166,20 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None): def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): """Responsible to call Activity function in activityWrapper""" - wfActivityContext = WorkflowActivityContext(ctx) - if inp is None: - return fn(wfActivityContext) - return fn(wfActivityContext, inp) + activity_id = getattr(ctx, 'task_id', 'unknown') + + try: + wfActivityContext = WorkflowActivityContext(ctx) + if inp is None: + result = fn(wfActivityContext) + else: + result = fn(wfActivityContext, inp) + return result + except Exception as e: + self._logger.exception( + f'Activity execution failed - task_id: {activity_id}, error: {e}' + ) + raise if hasattr(fn, '_activity_registered'): # whenever an activity is registered, it has a _dapr_alternate_name attribute @@ -174,13 +198,77 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): ) fn.__dict__['_activity_registered'] = True + def wait_for_worker_ready(self, timeout: float = 30.0) -> bool: + """ + Wait for the worker's gRPC stream to become ready to receive work items. + This method polls the worker's is_worker_ready() method until it returns True + or the timeout is reached. + + Args: + timeout: Maximum time in seconds to wait for the worker to be ready. + Defaults to 30 seconds. + + Returns: + True if the worker's gRPC stream is ready to receive work items, False if timeout. + """ + if not hasattr(self.__worker, 'is_worker_ready'): + return False + + elapsed = 0.0 + poll_interval = 0.1 # 100ms + + while elapsed < timeout: + if self.__worker.is_worker_ready(): + return True + time.sleep(poll_interval) + elapsed += poll_interval + + self._logger.warning( + f'WorkflowRuntime worker readiness check timed out after {timeout} seconds' + ) + return False + def start(self): - """Starts the listening for work items on a background thread.""" - self.__worker.start() + """Starts the listening for work items on a background thread. + This method waits for the worker's gRPC stream to be fully initialized + before returning, ensuring that workflows can be scheduled immediately + after start() completes. + """ + try: + try: + self.__worker.start() + except Exception as start_error: + self._logger.exception(f'WorkflowRuntime worker did not start: {start_error}') + raise + + # Verify the worker and its stream reader are ready + if hasattr(self.__worker, 'is_worker_ready'): + try: + is_ready = self.wait_for_worker_ready(timeout=self._worker_ready_timeout) + if not is_ready: + raise RuntimeError('WorkflowRuntime worker and its stream are not ready') + else: + self._logger.debug( + 'WorkflowRuntime worker is ready and its stream can receive work items' + ) + except Exception as ready_error: + self._logger.exception( + f'WorkflowRuntime wait_for_worker_ready() raised exception: {ready_error}' + ) + raise ready_error + else: + self._logger.warning( + 'Unable to verify stream readiness. Workflows scheduled immediately may not be received.' + ) + except Exception: + raise def shutdown(self): """Stops the listening for work items on a background thread.""" - self.__worker.stop() + try: + self.__worker.stop() + except Exception: + raise def versioned_workflow( self, diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index bf18cd689..16eb4946f 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -26,11 +26,17 @@ class FakeTaskHubGrpcWorker: + def __init__(self): + self._orchestrator_fns = {} + self._activity_fns = {} + def add_named_orchestrator(self, name: str, fn): listOrchestrators.append(name) + self._orchestrator_fns[name] = fn def add_named_activity(self, name: str, fn): listActivities.append(name) + self._activity_fns[name] = fn class WorkflowRuntimeTest(unittest.TestCase): @@ -171,3 +177,124 @@ def test_decorator_register_optinal_name(self): wanted_activity = ['test_act'] assert listActivities == wanted_activity assert client_act._dapr_alternate_name == 'test_act' + + +class WorkflowRuntimeWorkerReadyTest(unittest.TestCase): + """Tests for wait_for_worker_ready() and start() stream readiness.""" + + def setUp(self): + listActivities.clear() + listOrchestrators.clear() + mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() + self.runtime = WorkflowRuntime() + + def test_wait_for_worker_ready_returns_false_when_no_is_worker_ready(self): + mock_worker = mock.MagicMock(spec=['start', 'stop', '_registry']) + del mock_worker.is_worker_ready + self.runtime._WorkflowRuntime__worker = mock_worker + self.assertFalse(self.runtime.wait_for_worker_ready(timeout=0.1)) + + def test_wait_for_worker_ready_returns_true_when_ready(self): + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = True + self.runtime._WorkflowRuntime__worker = mock_worker + self.assertTrue(self.runtime.wait_for_worker_ready(timeout=1.0)) + mock_worker.is_worker_ready.assert_called() + + def test_wait_for_worker_ready_returns_true_after_poll(self): + """Worker becomes ready on second poll.""" + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.side_effect = [False, True] + self.runtime._WorkflowRuntime__worker = mock_worker + self.assertTrue(self.runtime.wait_for_worker_ready(timeout=1.0)) + self.assertEqual(mock_worker.is_worker_ready.call_count, 2) + + def test_wait_for_worker_ready_returns_false_on_timeout(self): + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = False + self.runtime._WorkflowRuntime__worker = mock_worker + self.assertFalse(self.runtime.wait_for_worker_ready(timeout=0.2)) + + def test_start_succeeds_when_worker_ready(self): + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = True + self.runtime._WorkflowRuntime__worker = mock_worker + self.runtime.start() + mock_worker.start.assert_called_once() + mock_worker.is_worker_ready.assert_called() + + def test_start_logs_debug_when_worker_stream_ready(self): + """start() logs at debug when worker and stream are ready.""" + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = True + self.runtime._WorkflowRuntime__worker = mock_worker + with mock.patch.object(self.runtime._logger, 'debug') as mock_debug: + self.runtime.start() + mock_debug.assert_called_once() + call_args = mock_debug.call_args[0][0] + self.assertIn('ready', call_args) + self.assertIn('stream', call_args) + + def test_start_logs_exception_when_worker_start_fails(self): + """start() logs exception when worker.start() raises.""" + mock_worker = mock.MagicMock() + mock_worker.start.side_effect = RuntimeError('start failed') + self.runtime._WorkflowRuntime__worker = mock_worker + with mock.patch.object(self.runtime._logger, 'exception') as mock_exception: + with self.assertRaises(RuntimeError): + self.runtime.start() + mock_exception.assert_called_once() + self.assertIn('did not start', mock_exception.call_args[0][0]) + + def test_start_raises_when_worker_not_ready(self): + listActivities.clear() + listOrchestrators.clear() + mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() + runtime = WorkflowRuntime(worker_ready_timeout=0.2) + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = False + runtime._WorkflowRuntime__worker = mock_worker + with self.assertRaises(RuntimeError) as ctx: + runtime.start() + self.assertIn('not ready', str(ctx.exception)) + + def test_start_logs_warning_when_no_is_worker_ready(self): + mock_worker = mock.MagicMock(spec=['start', 'stop', '_registry']) + del mock_worker.is_worker_ready + self.runtime._WorkflowRuntime__worker = mock_worker + self.runtime.start() + mock_worker.start.assert_called_once() + + def test_worker_ready_timeout_init(self): + listActivities.clear() + listOrchestrators.clear() + mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() + rt = WorkflowRuntime(worker_ready_timeout=15.0) + self.assertEqual(rt._worker_ready_timeout, 15.0) + + def test_start_raises_when_worker_start_fails(self): + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = True + mock_worker.start.side_effect = RuntimeError('start failed') + self.runtime._WorkflowRuntime__worker = mock_worker + with self.assertRaises(RuntimeError) as ctx: + self.runtime.start() + self.assertIn('start failed', str(ctx.exception)) + mock_worker.start.assert_called_once() + + def test_start_raises_when_wait_for_worker_ready_raises(self): + mock_worker = mock.MagicMock() + mock_worker.start.return_value = None + mock_worker.is_worker_ready.side_effect = ValueError('ready check failed') + self.runtime._WorkflowRuntime__worker = mock_worker + with self.assertRaises(ValueError) as ctx: + self.runtime.start() + self.assertIn('ready check failed', str(ctx.exception)) + + def test_shutdown_raises_when_worker_stop_fails(self): + mock_worker = mock.MagicMock() + mock_worker.stop.side_effect = RuntimeError('stop failed') + self.runtime._WorkflowRuntime__worker = mock_worker + with self.assertRaises(RuntimeError) as ctx: + self.runtime.shutdown() + self.assertIn('stop failed', str(ctx.exception)) diff --git a/tests/clients/fake_dapr_server.py b/tests/clients/fake_dapr_server.py index d56cf0790..2c3d9b685 100644 --- a/tests/clients/fake_dapr_server.py +++ b/tests/clients/fake_dapr_server.py @@ -636,6 +636,20 @@ def ConverseAlpha2(self, request, context): # Create result for this input result = api_v1.ConversationResultAlpha2(choices=choices) + if hasattr(result, 'model'): + result.model = 'test-llm' + if hasattr(result, 'usage'): + try: + usage_cls = getattr(api_v1, 'ConversationResultAlpha2CompletionUsage', None) + if usage_cls is not None: + u = usage_cls( + completion_tokens=10, + prompt_tokens=5, + total_tokens=15, + ) + result.usage.CopyFrom(u) + except Exception: + pass outputs.append(result) return api_v1.ConversationResponseAlpha2( diff --git a/tests/clients/test_conversation.py b/tests/clients/test_conversation.py index 50daebc64..105c7b291 100644 --- a/tests/clients/test_conversation.py +++ b/tests/clients/test_conversation.py @@ -17,7 +17,9 @@ import json import unittest import uuid +from unittest.mock import Mock, patch +from google.protobuf.struct_pb2 import Struct from google.rpc import code_pb2, status_pb2 from dapr.aio.clients import DaprClient as AsyncDaprClient @@ -37,12 +39,16 @@ ConversationResponseAlpha2, ConversationResultAlpha2, ConversationResultAlpha2Choices, + ConversationResultAlpha2CompletionUsage, + ConversationResultAlpha2CompletionUsageCompletionTokensDetails, + ConversationResultAlpha2CompletionUsagePromptTokensDetails, ConversationResultAlpha2Message, ConversationToolCalls, ConversationToolCallsOfFunction, ConversationTools, ConversationToolsFunction, FunctionBackend, + _get_outputs_from_grpc_response, create_assistant_message, create_system_message, create_tool_message, @@ -248,6 +254,14 @@ def test_basic_conversation_alpha2(self): self.assertEqual(choice.finish_reason, 'stop') self.assertIn('Hello Alpha2!', choice.message.content) + out = response.outputs[0] + if out.model is not None: + self.assertEqual(out.model, 'test-llm') + if out.usage is not None: + self.assertGreaterEqual(out.usage.total_tokens, 15) + self.assertGreaterEqual(out.usage.prompt_tokens, 5) + self.assertGreaterEqual(out.usage.completion_tokens, 10) + def test_conversation_alpha2_with_system_message(self): """Test Alpha2 conversation with system message.""" system_message = create_system_message('You are a helpful assistant.') @@ -1107,6 +1121,186 @@ def test_empty_and_none_outputs(self): self.assertEqual(response_none.to_assistant_messages(), []) +class TestConversationResultAlpha2ModelAndUsage(unittest.TestCase): + """Tests for model and usage fields on ConversationResultAlpha2 and related types.""" + + def test_result_alpha2_has_model_and_usage_attributes(self): + """ConversationResultAlpha2 accepts and exposes model and usage.""" + msg = ConversationResultAlpha2Message(content='Hi', tool_calls=[]) + choice = ConversationResultAlpha2Choices(finish_reason='stop', index=0, message=msg) + usage = ConversationResultAlpha2CompletionUsage( + completion_tokens=10, + prompt_tokens=5, + total_tokens=15, + ) + result = ConversationResultAlpha2( + choices=[choice], + model='test-model-1', + usage=usage, + ) + self.assertEqual(result.model, 'test-model-1') + self.assertIsNotNone(result.usage) + self.assertEqual(result.usage.completion_tokens, 10) + self.assertEqual(result.usage.prompt_tokens, 5) + self.assertEqual(result.usage.total_tokens, 15) + + def test_result_alpha2_model_and_usage_default_none(self): + """ConversationResultAlpha2 optional fields default to None when not provided. + + When the API returns a response, model and usage are set from the conversation + component. This test only checks that the dataclass defaults are None when + constructing with choices only. + """ + msg = ConversationResultAlpha2Message(content='Hi', tool_calls=[]) + choice = ConversationResultAlpha2Choices(finish_reason='stop', index=0, message=msg) + result = ConversationResultAlpha2(choices=[choice]) + self.assertIsNone(result.model) + self.assertIsNone(result.usage) + + def test_usage_completion_and_prompt_details(self): + """ConversationResultAlpha2CompletionUsage supports details.""" + completion_details = ConversationResultAlpha2CompletionUsageCompletionTokensDetails( + accepted_prediction_tokens=1, + audio_tokens=2, + reasoning_tokens=3, + rejected_prediction_tokens=0, + ) + prompt_details = ConversationResultAlpha2CompletionUsagePromptTokensDetails( + audio_tokens=0, + cached_tokens=4, + ) + usage = ConversationResultAlpha2CompletionUsage( + completion_tokens=10, + prompt_tokens=5, + total_tokens=15, + completion_tokens_details=completion_details, + prompt_tokens_details=prompt_details, + ) + self.assertEqual(usage.completion_tokens_details.accepted_prediction_tokens, 1) + self.assertEqual(usage.completion_tokens_details.audio_tokens, 2) + self.assertEqual(usage.completion_tokens_details.reasoning_tokens, 3) + self.assertEqual(usage.completion_tokens_details.rejected_prediction_tokens, 0) + self.assertEqual(usage.prompt_tokens_details.audio_tokens, 0) + self.assertEqual(usage.prompt_tokens_details.cached_tokens, 4) + self.assertEqual(usage.total_tokens, 15) + self.assertEqual(usage.completion_tokens, 10) + self.assertEqual(usage.prompt_tokens, 5) + + def test_get_outputs_from_grpc_response_populates_model_and_usage(self): + """_get_outputs_from_grpc_response sets model and usage when present on proto.""" + from unittest import mock + + # Build a mock proto response with one output that has model and usage + mock_usage = mock.Mock() + mock_usage.completion_tokens = 20 + mock_usage.prompt_tokens = 8 + mock_usage.total_tokens = 28 + mock_usage.completion_tokens_details = None + mock_usage.prompt_tokens_details = None + + mock_choice_msg = mock.Mock() + mock_choice_msg.content = 'Hello' + mock_choice_msg.tool_calls = [] + + mock_choice = mock.Mock() + mock_choice.finish_reason = 'stop' + mock_choice.index = 0 + mock_choice.message = mock_choice_msg + + mock_output = mock.Mock() + mock_output.model = 'gpt-4o-mini' + mock_output.usage = mock_usage + mock_output.choices = [mock_choice] + + mock_response = mock.Mock() + mock_response.outputs = [mock_output] + + outputs = _get_outputs_from_grpc_response(mock_response) + self.assertEqual(len(outputs), 1) + out = outputs[0] + self.assertEqual(out.model, 'gpt-4o-mini') + self.assertIsNotNone(out.usage) + self.assertEqual(out.usage.completion_tokens, 20) + self.assertEqual(out.usage.prompt_tokens, 8) + self.assertEqual(out.usage.total_tokens, 28) + self.assertEqual(len(out.choices), 1) + self.assertEqual(out.choices[0].message.content, 'Hello') + + def test_get_outputs_from_grpc_response_without_model_usage(self): + """_get_outputs_from_grpc_response leaves model and usage None when absent.""" + from unittest import mock + + mock_choice_msg = mock.Mock() + mock_choice_msg.content = 'Echo' + mock_choice_msg.tool_calls = [] + + mock_choice = mock.Mock() + mock_choice.finish_reason = 'stop' + mock_choice.index = 0 + mock_choice.message = mock_choice_msg + + mock_output = mock.Mock(spec=['choices']) + mock_output.choices = [mock_choice] + # No model or usage attributes + + mock_response = mock.Mock() + mock_response.outputs = [mock_output] + + outputs = _get_outputs_from_grpc_response(mock_response) + self.assertEqual(len(outputs), 1) + out = outputs[0] + self.assertIsNone(out.model) + self.assertIsNone(out.usage) + self.assertEqual(out.choices[0].message.content, 'Echo') + + +class ConverseAlpha2ResponseFormatTests(unittest.TestCase): + """Unit tests for converse_alpha2 response_format parameter.""" + + def test_converse_alpha2_passes_response_format_on_request(self): + """converse_alpha2 sets response_format on the gRPC request when provided.""" + user_message = create_user_message('Structured output please') + input_alpha2 = ConversationInputAlpha2(messages=[user_message]) + response_format = Struct() + response_format.update( + {'type': 'json_schema', 'json_schema': {'name': 'test', 'schema': {}}} + ) + + captured_requests = [] + mock_choice_msg = Mock() + mock_choice_msg.content = 'ok' + mock_choice_msg.tool_calls = [] + mock_choice = Mock() + mock_choice.finish_reason = 'stop' + mock_choice.index = 0 + mock_choice.message = mock_choice_msg + mock_output = Mock() + mock_output.choices = [mock_choice] + mock_response = Mock() + mock_response.outputs = [mock_output] + mock_response.context_id = '' + mock_call = Mock() + + def capture_run_rpc(rpc, request, *args, **kwargs): + captured_requests.append(request) + return (mock_response, mock_call) + + with patch('dapr.clients.health.DaprHealth.wait_for_sidecar'): + client = DaprClient('localhost:50011') + with patch.object(client.retry_policy, 'run_rpc', side_effect=capture_run_rpc): + client.converse_alpha2( + name='test-llm', + inputs=[input_alpha2], + response_format=response_format, + ) + + self.assertEqual(len(captured_requests), 1) + req = captured_requests[0] + self.assertTrue(hasattr(req, 'response_format')) + self.assertEqual(req.response_format['type'], 'json_schema') + self.assertEqual(req.response_format['json_schema']['name'], 'test') + + class ExecuteRegisteredToolSyncTests(unittest.TestCase): def tearDown(self): # Cleanup all tools we may have registered by name prefix