Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-push-to-main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python_ver: ["3.10", "3.11", "3.12", "3.13"]
python_ver: ["3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- uses: actions/checkout@v6
- name: Set up Python ${{ matrix.python_ver }}
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/build-tag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ on:
- grpc-v*
- flask-v*
- fastapi-v*
- langgraph-v*
- strands-v*
workflow_dispatch:

jobs:
Expand Down Expand Up @@ -41,7 +43,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python_ver: ["3.10", "3.11", "3.12", "3.13"]
python_ver: ["3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- uses: actions/checkout@v6
- name: Set up Python ${{ matrix.python_ver }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python_ver: ["3.10", "3.11", "3.12", "3.13"]
python_ver: ["3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- uses: actions/checkout@v6
- name: Set up Python ${{ matrix.python_ver }}
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/fossa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ on:
- grpc-v*
- flask-v*
- fastapi-v*
- langgraph-v*
- strands-v*
pull_request:
branches:
- main
Expand All @@ -43,12 +45,12 @@ jobs:
uses: actions/checkout@v6

- name: "Run FOSSA Scan"
uses: fossas/fossa-action@v1.7.0 # Use a specific version if locking is preferred
uses: fossas/fossa-action@v1.8.0 # Use a specific version if locking is preferred
with:
api-key: ${{ env.FOSSA_API_KEY }}

- name: "Run FOSSA Test"
uses: fossas/fossa-action@v1.7.0 # Use a specific version if locking is preferred
uses: fossas/fossa-action@v1.8.0 # Use a specific version if locking is preferred
with:
api-key: ${{ env.FOSSA_API_KEY }}
run-tests: true
12 changes: 7 additions & 5 deletions .github/workflows/validate_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ on:
- grpc-v*
- fastapi-v*
- flask-v*
- langgraph-v*
- strands-v*
pull_request:
branches:
- main
Expand Down Expand Up @@ -46,7 +48,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python_ver: ["3.10", "3.11", "3.12", "3.13"]
python_ver: ["3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- name: Parse repository_dispatch payload
if: github.event_name == 'repository_dispatch'
Expand All @@ -63,14 +65,14 @@ jobs:
repository: ${{ env.CHECKOUT_REPO }}
ref: ${{ env.CHECKOUT_REF }}
- uses: azure/setup-helm@v4
- name: Determine latest Dapr Runtime version
- name: Determine latest Dapr Runtime version (including prerelease)
run: |
helm repo add dapr https://dapr.github.io/helm-charts/ && helm repo update && export RUNTIME_VERSION=$(helm search repo dapr/dapr --devel --versions | awk '/dapr\/dapr/ {print $3; exit}' )
RUNTIME_VERSION=$(curl -s "https://api.github.com/repos/dapr/dapr/releases" | sort -r | grep '"tag_name"' | head -n 1 | cut -d ':' -f2 | tr -d '",v ')
echo "DAPR_RUNTIME_VER=$RUNTIME_VERSION" >> $GITHUB_ENV
echo "Found $RUNTIME_VERSION"
- name: Determine latest Dapr Cli version
- name: Determine latest Dapr Cli version (including prerelease)
run: |
export CLI_VERSION=$(curl "https://api.github.com/repos/dapr/cli/releases/latest" --header 'authorization: Bearer ${{ secrets.GITHUB_TOKEN }}' | jq '.tag_name'| tr -d '",v')
CLI_VERSION=$(curl -s "https://api.github.com/repos/dapr/cli/releases" | sort -r | grep '"tag_name"' | head -n 1 | cut -d ':' -f2 | tr -d '",v ')
echo "DAPR_CLI_VER=$CLI_VERSION" >> $GITHUB_ENV
echo "Found $CLI_VERSION"
- name: Set up Python ${{ matrix.python_ver }}
Expand Down
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ This includes the following packages:
### Prerequisites

* [Install Dapr standalone mode](https://github.com/dapr/cli#install-dapr-on-your-local-machine-self-hosted)
* [Install Python 3.9+](https://www.python.org/downloads/)
* [Install Python 3.10+](https://www.python.org/downloads/)

### Install Dapr python sdk

Expand Down Expand Up @@ -145,12 +145,10 @@ The generated files will be found in `docs/_build`.

```sh
pip3 install -r tools/requirements.txt
export DAPR_BRANCH=release-1.16 # Optional, defaults to master
export DAPR_BRANCH=release-1.17 # Optional, defaults to master
./tools/regen_grpcclient.sh
```

> Note: The `grpcio-tools` version we're using doesn't support Python 3.13.

## Help & Feedback

Need help or have feedback on the SDK? Please open a GitHub issue or come chat with us in the `#python-sdk` channel of our Discord server ([click here to join](https://discord.gg/MySdVxrH)).
Expand Down
93 changes: 93 additions & 0 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from google.protobuf.any_pb2 import Any as GrpcAny
from google.protobuf.empty_pb2 import Empty as GrpcEmpty
from google.protobuf.message import Message as GrpcMessage
from grpc import StatusCode # type: ignore
from grpc.aio import ( # type: ignore
AioRpcError,
StreamStreamClientInterceptor,
Expand Down Expand Up @@ -69,6 +70,8 @@
)
from dapr.clients.grpc._response import (
BindingResponse,
BulkPublishResponse,
BulkPublishResponseFailedEntry,
BulkStateItem,
BulkStatesResponse,
ConfigurationResponse,
Expand Down Expand Up @@ -484,6 +487,96 @@ async def publish_event(

return DaprResponse(await call.initial_metadata())

async def publish_events(
self,
pubsub_name: str,
topic_name: str,
data: Sequence[Union[bytes, str]],
publish_metadata: Dict[str, str] = {},
data_content_type: Optional[str] = None,
) -> BulkPublishResponse:
"""Bulk publish multiple events to a given topic.
This publishes multiple events to a specified topic and pubsub component.
Each event can be bytes or str. The str data is encoded into bytes with
default charset of utf-8.

The example publishes multiple string events to a topic:

from dapr.aio.clients import DaprClient
async with DaprClient() as d:
resp = await d.publish_events(
pubsub_name='pubsub_1',
topic_name='TOPIC_A',
data=['message1', 'message2', 'message3'],
data_content_type='text/plain',
)
# resp.failed_entries includes any entries that failed to publish.

Args:
pubsub_name (str): the name of the pubsub component
topic_name (str): the topic name to publish to
data (Sequence[Union[bytes, str]]): sequence of events to publish;
each event must be bytes or str
publish_metadata (Dict[str, str], optional): Dapr metadata for the
bulk publish request
data_content_type (str, optional): content type of the event data

Returns:
:class:`BulkPublishResponse` with any failed entries
"""
entries = []
for event in data:
entry_id = str(uuid.uuid4())
if isinstance(event, bytes):
event_data = event
content_type = data_content_type or 'application/octet-stream'
elif isinstance(event, str):
event_data = event.encode('utf-8')
content_type = data_content_type or 'text/plain'
else:
raise ValueError(f'invalid type for event {type(event)}')

entries.append(
api_v1.BulkPublishRequestEntry(
entry_id=entry_id,
event=event_data,
content_type=content_type,
)
)

req = api_v1.BulkPublishRequest(
pubsub_name=pubsub_name,
topic=topic_name,
entries=entries,
metadata=publish_metadata,
)

try:
call = self._stub.BulkPublishEvent(req)
response = await call
except AioRpcError as err:
if err.code() == StatusCode.UNIMPLEMENTED:
try:
call = self._stub.BulkPublishEventAlpha1(req)
response = await call
except AioRpcError as err2:
raise DaprGrpcError(err2) from err2
else:
raise DaprGrpcError(err) from err

failed_entries = [
BulkPublishResponseFailedEntry(
entry_id=entry.entry_id,
error=entry.error,
)
for entry in response.failedEntries
]

return BulkPublishResponse(
failed_entries=failed_entries,
headers=await call.initial_metadata(),
)

async def subscribe(
self,
pubsub_name: str,
Expand Down
10 changes: 2 additions & 8 deletions dapr/clients/grpc/_conversation_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import inspect
import random
import string
import types
from dataclasses import fields, is_dataclass
from enum import Enum
from types import UnionType
from typing import (
Any,
Callable,
Expand All @@ -37,10 +37,6 @@

from dapr.conf import settings

# Make mypy happy. Runtime handle: real class on 3.10+, else None.
# TODO: Python 3.9 is about to be end-of-life, so we can drop this at some point next year (2026)
UnionType: Any = getattr(types, 'UnionType', None)

# duplicated from conversation to avoid circular import
Params = Union[Mapping[str, Any], Sequence[Any], None]

Expand Down Expand Up @@ -857,9 +853,7 @@ def _coerce_literal(value: Any, lit_args: List[Any]) -> Any:

def _is_union(t) -> bool:
origin = get_origin(t)
if origin is Union:
return True
return UnionType is not None and origin is UnionType
return origin is Union or origin is UnionType


def _coerce_and_validate(value: Any, expected_type: Any) -> Any:
Expand Down
58 changes: 58 additions & 0 deletions dapr/clients/grpc/_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,64 @@ def _read_subscribe_config(
pass


class BulkPublishResponseFailedEntry:
"""A failed entry from the bulk publish response.

Attributes:
entry_id (str): the entry ID that failed.
error (str): the error message for the failure.
"""

def __init__(self, entry_id: str, error: str):
"""Initializes BulkPublishResponseFailedEntry.

Args:
entry_id (str): the entry ID that failed.
error (str): the error message for the failure.
"""
self._entry_id = entry_id
self._error = error

@property
def entry_id(self) -> str:
"""Gets the entry ID."""
return self._entry_id

@property
def error(self) -> str:
"""Gets the error message."""
return self._error


class BulkPublishResponse(DaprResponse):
"""The response of publish_events (bulk publish) API.

This inherits from DaprResponse

Attributes:
failed_entries (List[BulkPublishResponseFailedEntry]): the entries that failed to publish.
"""

def __init__(
self,
failed_entries: List[BulkPublishResponseFailedEntry] = [],
headers: MetadataTuple = (),
):
"""Initializes BulkPublishResponse from :obj:`runtime_v1.BulkPublishResponse`.

Args:
failed_entries (List[BulkPublishResponseFailedEntry]): the entries that failed.
headers (Tuple, optional): the headers from Dapr gRPC response.
"""
super(BulkPublishResponse, self).__init__(headers)
self._failed_entries = failed_entries

@property
def failed_entries(self) -> List[BulkPublishResponseFailedEntry]:
"""Gets the failed entries."""
return self._failed_entries


class TopicEventResponseStatus(Enum):
# success is the default behavior: message is acknowledged and not retried
success = appcallback_v1.TopicEventResponse.TopicEventResponseStatus.SUCCESS
Expand Down
Loading