Skip to content
19 changes: 13 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ git clone https://github.com/dapr/python-sdk.git
cd python-sdk
```

2. Install a project in a editable mode
2. Create and activate a virtual environment

```bash
python3 -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
```

3. Install a project in editable mode

```bash
pip3 install -e .
Expand All @@ -90,31 +97,31 @@ pip3 install -e ./ext/dapr-ext-langgraph/
pip3 install -e ./ext/dapr-ext-strands/
```

3. Install required packages
4. Install required packages

```bash
pip3 install -r dev-requirements.txt
```

4. Run linter and autofix
5. Run linter and autofix

```bash
tox -e ruff
```

5. Run unit-test
6. Run unit-test

```bash
tox -e py311
```

6. Run type check
7. Run type check

```bash
tox -e type
```

7. Run examples
8. Run examples

```bash
tox -e examples
Expand Down
10 changes: 10 additions & 0 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@

import grpc # type: ignore
from google.protobuf.any_pb2 import Any as GrpcAny
from google.protobuf.duration_pb2 import Duration as GrpcDuration
from google.protobuf.empty_pb2 import Empty as GrpcEmpty
from google.protobuf.message import Message as GrpcMessage
from google.protobuf.struct_pb2 import Struct as GrpcStruct
from grpc import ( # type: ignore
RpcError,
StatusCode,
Expand Down Expand Up @@ -1880,6 +1882,8 @@ def converse_alpha2(
temperature: Optional[float] = None,
tools: Optional[List[conversation.ConversationTools]] = None,
tool_choice: Optional[str] = None,
response_format: Optional[GrpcStruct] = None,
prompt_cache_retention: Optional[GrpcDuration] = None,
) -> conversation.ConversationResponseAlpha2:
"""Invoke an LLM using the conversation API (Alpha2) with tool calling support.

Expand All @@ -1893,6 +1897,8 @@ def converse_alpha2(
temperature: Optional temperature setting for the LLM to optimize for creativity or predictability
tools: Optional list of tools available for the LLM to call
tool_choice: Optional control over which tools can be called ('none', 'auto', 'required', or specific tool name)
response_format: Optional response format (google.protobuf.struct_pb2.Struct, ex: json_schema for structured output)
prompt_cache_retention: Optional retention for prompt cache (google.protobuf.duration_pb2.Duration)

Returns:
ConversationResponseAlpha2 containing the conversation results with choices and tool calls
Expand Down Expand Up @@ -1949,6 +1955,10 @@ def converse_alpha2(
request.temperature = temperature
if tool_choice is not None:
request.tool_choice = tool_choice
if response_format is not None and hasattr(request, 'response_format'):
request.response_format.CopyFrom(response_format)
if prompt_cache_retention is not None and hasattr(request, 'prompt_cache_retention'):
request.prompt_cache_retention.CopyFrom(prompt_cache_retention)

try:
response, call = self.retry_policy.run_rpc(self._stub.ConverseAlpha2.with_call, request)
Expand Down
70 changes: 69 additions & 1 deletion dapr/clients/grpc/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,46 @@ class ConversationResultAlpha2Choices:
message: ConversationResultAlpha2Message


@dataclass
class ConversationResultAlpha2CompletionUsageCompletionTokensDetails:
"""Breakdown of tokens used in the completion."""

accepted_prediction_tokens: int = 0
audio_tokens: int = 0
reasoning_tokens: int = 0
rejected_prediction_tokens: int = 0


@dataclass
class ConversationResultAlpha2CompletionUsagePromptTokensDetails:
"""Breakdown of tokens used in the prompt."""

audio_tokens: int = 0
cached_tokens: int = 0


@dataclass
class ConversationResultAlpha2CompletionUsage:
"""Token usage for one Alpha2 conversation result."""

completion_tokens: int = 0
prompt_tokens: int = 0
total_tokens: int = 0
completion_tokens_details: Optional[
ConversationResultAlpha2CompletionUsageCompletionTokensDetails
] = None
prompt_tokens_details: Optional[ConversationResultAlpha2CompletionUsagePromptTokensDetails] = (
None
)


@dataclass
class ConversationResultAlpha2:
"""One of the outputs in Alpha2 response from conversation input."""

choices: List[ConversationResultAlpha2Choices] = field(default_factory=list)
model: Optional[str] = None
usage: Optional[ConversationResultAlpha2CompletionUsage] = None


@dataclass
Expand Down Expand Up @@ -657,5 +692,38 @@ def _get_outputs_from_grpc_response(
)
)

outputs.append(ConversationResultAlpha2(choices=choices))
model: Optional[str] = None
usage: Optional[ConversationResultAlpha2CompletionUsage] = None
if hasattr(output, 'model') and getattr(output, 'model', None):
model = output.model
if hasattr(output, 'usage') and output.usage:
u = output.usage
completion_details: Optional[
ConversationResultAlpha2CompletionUsageCompletionTokensDetails
] = None
prompt_details: Optional[ConversationResultAlpha2CompletionUsagePromptTokensDetails] = (
None
)
if hasattr(u, 'completion_tokens_details') and u.completion_tokens_details:
cd = u.completion_tokens_details
completion_details = ConversationResultAlpha2CompletionUsageCompletionTokensDetails(
accepted_prediction_tokens=getattr(cd, 'accepted_prediction_tokens', 0) or 0,
audio_tokens=getattr(cd, 'audio_tokens', 0) or 0,
reasoning_tokens=getattr(cd, 'reasoning_tokens', 0) or 0,
rejected_prediction_tokens=getattr(cd, 'rejected_prediction_tokens', 0) or 0,
)
if hasattr(u, 'prompt_tokens_details') and u.prompt_tokens_details:
pd = u.prompt_tokens_details
prompt_details = ConversationResultAlpha2CompletionUsagePromptTokensDetails(
audio_tokens=getattr(pd, 'audio_tokens', 0) or 0,
cached_tokens=getattr(pd, 'cached_tokens', 0) or 0,
)
usage = ConversationResultAlpha2CompletionUsage(
completion_tokens=getattr(u, 'completion_tokens', 0) or 0,
prompt_tokens=getattr(u, 'prompt_tokens', 0) or 0,
total_tokens=getattr(u, 'total_tokens', 0) or 0,
completion_tokens_details=completion_details,
prompt_tokens_details=prompt_details,
)
outputs.append(ConversationResultAlpha2(choices=choices, model=model, usage=usage))
Comment on lines +695 to +728
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output.usage / u.completion_tokens_details / u.prompt_tokens_details are protobuf message fields; their truthiness is not a reliable indicator of presence (and may evaluate truthy even when unset). This can incorrectly populate usage (and details) with zero-valued objects when the server didn't send usage, changing API semantics from None to 'all zeros'. Prefer presence checks like HasField('usage') when available, or u.ListFields() / u.ByteSize() > 0 to detect an actually-populated submessage before constructing these dataclasses.

Copilot uses AI. Check for mistakes.
return outputs
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Flask>=1.1
# needed for auto fix
ruff===0.14.1
# needed for dapr-ext-workflow
durabletask-dapr >= 0.2.0a19
durabletask-dapr >= 0.17.0
# needed for .env file loading in examples
python-dotenv>=1.0.0
# needed for enhanced schema generation from function features
Expand Down
28 changes: 14 additions & 14 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ When you run the example, you will see output like this:
```


### Cross-app Workflow
### Multi-app Workflows

This example demonstrates how to call child workflows and activities in different apps. The multiple Dapr CLI instances can be started using the following commands:

Expand All @@ -361,9 +361,9 @@ sleep: 20
-->

```sh
dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 cross-app3.py &
dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 cross-app2.py &
dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 cross-app1.py
dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 multi-app3.py &
dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 multi-app2.py &
dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 multi-app1.py
```
<!-- END_STEP -->

Expand All @@ -379,9 +379,9 @@ among others. This shows that the workflow calls are working as expected.

#### Error handling on activity calls

This example demonstrates how the error handling works on activity calls across apps.
This example demonstrates how the error handling works on activity calls in multi-app workflows.

Error handling on activity calls across apps works as normal workflow activity calls.
Error handling on activity calls in multi-app workflows works as normal workflow activity calls.

In this example we run `app3` in failing mode, which makes the activity call return error constantly. The activity call from `app2` will fail after the retry policy is exhausted.

Expand All @@ -404,9 +404,9 @@ sleep: 20

```sh
export ERROR_ACTIVITY_MODE=true
dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 cross-app3.py &
dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 cross-app2.py &
dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 cross-app1.py
dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 multi-app3.py &
dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 multi-app2.py &
dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 multi-app1.py
```
<!-- END_STEP -->

Expand All @@ -424,9 +424,9 @@ among others. This shows that the activity calls are failing as expected, and th

#### Error handling on workflow calls

This example demonstrates how the error handling works on workflow calls across apps.
This example demonstrates how the error handling works on workflow calls in multi-app workflows.

Error handling on workflow calls across apps works as normal workflow calls.
Error handling on workflow calls in multi-app workflows works as normal workflow calls.

In this example we run `app2` in failing mode, which makes the workflow call return error constantly. The workflow call from `app1` will fail after the retry policy is exhausted.

Expand All @@ -445,9 +445,9 @@ sleep: 20

```sh
export ERROR_WORKFLOW_MODE=true
dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 cross-app3.py &
dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 cross-app2.py &
dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 cross-app1.py
dapr run --app-id wfexample3 --dapr-http-port 3503 --dapr-grpc-port 50103 -- python3 multi-app3.py &
dapr run --app-id wfexample2 --dapr-http-port 3502 --dapr-grpc-port 50102 -- python3 multi-app2.py &
dapr run --app-id wfexample1 --dapr-http-port 3501 --dapr-grpc-port 50101 -- python3 multi-app1.py
```
<!-- END_STEP -->

Expand Down
5 changes: 1 addition & 4 deletions examples/workflow/child_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time

import dapr.ext.workflow as wf

Expand Down Expand Up @@ -40,12 +39,10 @@ def child_workflow(ctx: wf.DaprWorkflowContext):

if __name__ == '__main__':
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=main_workflow)

# Wait for the workflow to complete
time.sleep(5)
wf_client.wait_for_workflow_completion(instance_id)

wfr.shutdown()
1 change: 0 additions & 1 deletion examples/workflow/fan_out_fan_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def process_results(ctx, final_result: int):

if __name__ == '__main__':
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=batch_processing_workflow, input=10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from datetime import timedelta

import dapr.ext.workflow as wf
Expand Down Expand Up @@ -46,13 +45,11 @@ def app1_workflow(ctx: wf.DaprWorkflowContext):

if __name__ == '__main__':
wfr.start()
time.sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
print('app1 - triggering app1 workflow', flush=True)
instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow)

# Wait for the workflow to complete
time.sleep(7)
wf_client.wait_for_workflow_completion(instance_id)

wfr.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ def app2_workflow(ctx: wf.DaprWorkflowContext):

if __name__ == '__main__':
wfr.start()
time.sleep(15) # wait for workflow runtime to start
time.sleep(15) # Keep the workflow runtime running for a while to process workflows
wfr.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ def app3_activity(ctx: wf.DaprWorkflowContext) -> int:

if __name__ == '__main__':
wfr.start()
time.sleep(15) # wait for workflow runtime to start
time.sleep(15) # Keep the workflow runtime alive for a while to process requests
wfr.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ def call_activity(
retry_policy: Optional[RetryPolicy] = None,
app_id: Optional[str] = None,
) -> task.Task[TOutput]:
# Handle string activity names for cross-app scenarios
# Handle string activity names for multi-app workflow scenarios
if isinstance(activity, str):
activity_name = activity
if app_id is not None:
self._logger.debug(
f'{self.instance_id}: Creating cross-app activity {activity_name} for app {app_id}'
f'{self.instance_id}: Creating multi-app workflow activity {activity_name} for app {app_id}'
)
else:
self._logger.debug(f'{self.instance_id}: Creating activity {activity_name}')
Expand Down Expand Up @@ -106,7 +106,7 @@ def call_child_workflow(
retry_policy: Optional[RetryPolicy] = None,
app_id: Optional[str] = None,
) -> task.Task[TOutput]:
# Handle string workflow names for cross-app scenarios
# Handle string workflow names for multi-app workflow scenarios
if isinstance(workflow, str):
workflow_name = workflow
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow_name}')
Expand Down
3 changes: 3 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ def warning(self, msg, *args, **kwargs):
def error(self, msg, *args, **kwargs):
self._logger.error(msg, *args, **kwargs)

def exception(self, msg, *args, **kwargs):
self._logger.exception(msg, *args, **kwargs)

def critical(self, msg, *args, **kwargs):
self._logger.critical(msg, *args, **kwargs)
4 changes: 2 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def call_activity(
Parameters
----------
activity: Activity[TInput, TOutput] | str
A reference to the activity function to call, or a string name for cross-app activities.
A reference to the activity function to call, or a string name for multi-app workflow activities.
input: TInput | None
The JSON-serializable input (or None) to pass to the activity.
app_id: str | None
Expand All @@ -145,7 +145,7 @@ def call_child_workflow(
Parameters
----------
orchestrator: Orchestrator[TInput, TOutput] | str
A reference to the orchestrator function to call, or a string name for cross-app workflows.
A reference to the orchestrator function to call, or a string name for multi-app workflows.
input: TInput
The optional JSON-serializable input to pass to the orchestrator function.
instance_id: str
Expand Down
Loading
Loading