Skip to content
Draft

[WIP] #109

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
[project]
name = "streamstore"
version = "5.0.0"
name = "s2-sdk"
version = "0.1.0"
description = "Python SDK for s2.dev"
readme = "README.md"
license = "MIT"
license-files = ["LICENSE"]
requires-python = ">=3.11"
dependencies = [
"grpcio-tools>=1.69.0",
"grpcio>=1.69.0",
"types-protobuf>=5.29.1.20241207",
"grpc-stubs>=1.53.0.5",
"httpx[http2]>=0.28.0",
"protobuf>=5.29.0",
"pydantic>=2.0",
"anyio>=4.8.0",
"zstandard>=0.23.0",
]

[dependency-groups]
dev = ["mypy>=1.14.1", "poethepoet>=0.36.0", "ruff>=0.9.1"]
dev = [
"datamodel-code-generator>=0.28.0",
"grpcio-tools>=1.69.0",
"mypy>=1.14.1",
"poethepoet>=0.36.0",
"ruff>=0.9.1",
"types-protobuf>=5.29.1.20241207",
]
test = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
Expand All @@ -34,8 +41,12 @@ docs = [
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/s2_sdk"]

[tool.mypy]
files = ["src/", "tests/", "examples/"]
files = ["src/s2_sdk/", "tests/"]
exclude = ["src/s2_sdk/_generated/"]

[tool.ruff]
exclude = [
Expand All @@ -59,7 +70,7 @@ ci_linter = "uv run ruff check"
ci_formatter = "uv run ruff format --check"
checker = ["linter", "formatter", "type_checker"]
ci_checker = ["ci_linter", "ci_formatter", "type_checker"]
e2e_tests = "uv run pytest tests/ -v -s"
e2e_tests = "uv run pytest tests/ -v -s -m 'account or basin or stream'"
e2e_account_tests = "uv run pytest tests/ -v -s -m account"
e2e_basin_tests = "uv run pytest tests/ -v -s -m basin"
e2e_stream_tests = "uv run pytest tests/ -v -s -m stream"
21 changes: 21 additions & 0 deletions src/s2_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
__all__ = [
"S2",
"S2Basin",
"S2Stream",
"S2Error",
"S2ApiError",
"AppendConditionFailed",
"S2SessionError",
"Endpoints",
"s2_sdk.types",
"s2_sdk.utils",
]

from s2_sdk._endpoints import Endpoints
from s2_sdk._exceptions import (
AppendConditionFailed,
S2ApiError,
S2Error,
S2SessionError,
)
from s2_sdk._ops import S2, S2Basin, S2Stream
103 changes: 103 additions & 0 deletions src/s2_sdk/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from importlib.metadata import version
from typing import Any

import httpx

from s2_sdk._exceptions import AppendConditionFailed, S2ApiError

_VERSION = version("s2-sdk")
_USER_AGENT = f"s2-sdk-python/{_VERSION}"


class HttpClient:
__slots__ = ("_client",)

def __init__(
self,
base_url: str,
access_token: str,
timeout: float,
http2: bool = True,
) -> None:
self._client = httpx.AsyncClient(
base_url=base_url,
headers={
"authorization": f"Bearer {access_token}",
"user-agent": _USER_AGENT,
},
timeout=timeout,
http2=http2,
)

async def request(
self,
method: str,
path: str,
*,
json: Any = None,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
content: bytes | None = None,
) -> httpx.Response:
response = await self._client.request(
method,
path,
json=json,
params=params,
headers=headers,
content=content,
)
_raise_for_status(response)
return response

def stream(
self,
method: str,
path: str,
*,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
):
return self._client.stream(
method,
path,
params=params,
headers=headers,
)

async def close(self) -> None:
await self._client.aclose()


def _raise_for_status(response: httpx.Response) -> None:
status = response.status_code
if 200 <= status < 300:
return

if status == 412:
body = response.json()
if "fencing_token_mismatch" in body:
raise AppendConditionFailed(
f"Fencing token mismatch: {body['fencing_token_mismatch']}",
status_code=status,
)
elif "seq_num_mismatch" in body:
raise AppendConditionFailed(
f"Sequence number mismatch: {body['seq_num_mismatch']}",
status_code=status,
)
raise AppendConditionFailed(str(body), status_code=status)

if status == 416:
# Tail response — not an error, handled by callers
return

try:
body = response.json()
message = body.get("message", response.text)
code = body.get("code")
except Exception:
message = response.text
code = None

raise S2ApiError(message, status_code=status, code=code)
38 changes: 38 additions & 0 deletions src/s2_sdk/_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os

from s2_sdk._exceptions import fallible


class Endpoints:
"""S2 endpoints."""

__slots__ = ("_account_url", "_basin_base_url")

_account_url: str
_basin_base_url: str

def __init__(self, account_url: str, basin_base_url: str):
self._account_url = account_url
self._basin_base_url = basin_base_url

@classmethod
def default(cls) -> "Endpoints":
return cls(
account_url="https://aws.s2.dev/v1",
basin_base_url="https://{basin}.b.aws.s2.dev/v1",
)

@classmethod
@fallible
def from_env(cls) -> "Endpoints":
account_url = os.getenv("S2_ACCOUNT_ENDPOINT")
basin_url = os.getenv("S2_BASIN_ENDPOINT")
if account_url and basin_url and "{basin}" in basin_url:
return cls(account_url=account_url, basin_base_url=basin_url)
raise ValueError("Invalid S2_ACCOUNT_ENDPOINT and/or S2_BASIN_ENDPOINT")

def account(self) -> str:
return self._account_url

def basin(self, basin_name: str) -> str:
return self._basin_base_url.format(basin=basin_name)
30 changes: 26 additions & 4 deletions src/streamstore/_exceptions.py → src/s2_sdk/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,34 @@


class S2Error(Exception):
"""
Base class for all S2 related exceptions.
"""
"""Base class for all S2 related exceptions."""


S2Error.__module__ = "streamstore"
class S2ApiError(S2Error):
"""Error from the S2 API."""

def __init__(self, message: str, status_code: int, code: str | None = None):
self.status_code = status_code
self.code = code
super().__init__(message)


class AppendConditionFailed(S2ApiError):
"""Append condition (fencing token or seq num match) was not met."""


class S2SessionError(S2Error):
"""Error from an S2S session."""

def __init__(self, message: str, status_code: int):
self.status_code = status_code
super().__init__(message)


S2Error.__module__ = "s2_sdk"
S2ApiError.__module__ = "s2_sdk"
AppendConditionFailed.__module__ = "s2_sdk"
S2SessionError.__module__ = "s2_sdk"


def fallible(f):
Expand Down
File renamed without changes.
Loading
Loading