diff --git a/crates/pulsing-actor/src/connect/mod.rs b/crates/pulsing-actor/src/connect/mod.rs new file mode 100644 index 000000000..9ee104b15 --- /dev/null +++ b/crates/pulsing-actor/src/connect/mod.rs @@ -0,0 +1,197 @@ +//! Out-cluster connector for accessing actors without joining the cluster. +//! +//! `PulsingConnect` connects to a cluster gateway node via HTTP/2 and +//! transparently routes requests to any actor in the cluster. It does not +//! participate in gossip, does not register as a node, and does not run +//! fault detection — keeping the connector extremely lightweight. +//! +//! # Example +//! +//! ```rust,ignore +//! let conn = PulsingConnect::connect("10.0.1.1:8080".parse()?).await?; +//! let counter = conn.resolve("services/counter").await?; +//! let result = counter.ask("increment", vec![]).await?; +//! conn.close(); +//! ``` + +mod reference; + +pub use reference::ConnectActorRef; + +use crate::error::{PulsingError, Result, RuntimeError}; +use crate::transport::{Http2Client, Http2Config}; +use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Resolve response from the gateway. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResolveResponse { + pub found: bool, + pub path: String, + pub gateway: String, + pub instance_count: usize, +} + +/// Gateway member list response. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MembersResponse { + pub gateways: Vec, +} + +/// Out-cluster connector — connects to a gateway node without joining the cluster. +pub struct PulsingConnect { + active_gateway: RwLock, + gateways: RwLock>, + http_client: Arc, +} + +impl PulsingConnect { + /// Connect to a single gateway address. + pub async fn connect(gateway_addr: SocketAddr) -> Result> { + Self::connect_multi(vec![gateway_addr]).await + } + + /// Connect with multiple gateway addresses for failover. + pub async fn connect_multi(gateway_addrs: Vec) -> Result> { + if gateway_addrs.is_empty() { + return Err(PulsingError::from(RuntimeError::Other( + "At least one gateway address is required".into(), + ))); + } + + let http_client = Arc::new(Http2Client::new(Http2Config::default())); + http_client.start_background_tasks(); + + let active = gateway_addrs[0]; + let conn = Arc::new(Self { + active_gateway: RwLock::new(active), + gateways: RwLock::new(gateway_addrs), + http_client, + }); + + // Try to refresh gateway list from the cluster + let _ = conn.refresh_gateways().await; + + Ok(conn) + } + + /// Resolve a named actor via the gateway. + pub async fn resolve(&self, path: &str) -> Result { + let gateway = *self.active_gateway.read().await; + let body = serde_json::to_vec(&serde_json::json!({"path": path})) + .map_err(|e| PulsingError::from(RuntimeError::Other(e.to_string())))?; + + let resp_bytes = self.ask_gateway(gateway, "/client/resolve", &body).await?; + + let resp: ResolveResponse = serde_json::from_slice(&resp_bytes) + .map_err(|e| PulsingError::from(RuntimeError::Other(e.to_string())))?; + + if !resp.found { + return Err(PulsingError::from(RuntimeError::named_actor_not_found( + path, + ))); + } + + Ok(ConnectActorRef::new( + gateway, + path.to_string(), + self.http_client.clone(), + )) + } + + /// Refresh the gateway list from the current active gateway. + pub async fn refresh_gateways(&self) -> Result<()> { + let gateway = *self.active_gateway.read().await; + let resp_bytes = self.ask_gateway(gateway, "/client/members", &[]).await?; + + let resp: MembersResponse = serde_json::from_slice(&resp_bytes) + .map_err(|e| PulsingError::from(RuntimeError::Other(e.to_string())))?; + + let mut new_gateways = Vec::new(); + for addr_str in &resp.gateways { + if let Ok(addr) = addr_str.parse::() { + new_gateways.push(addr); + } + } + + if !new_gateways.is_empty() { + *self.gateways.write().await = new_gateways; + } + + Ok(()) + } + + /// Switch to the next available gateway (for failover). + pub async fn failover(&self) -> Result<()> { + let current = *self.active_gateway.read().await; + let gateways = self.gateways.read().await; + + let next = gateways + .iter() + .find(|&&addr| addr != current) + .ok_or_else(|| { + PulsingError::from(RuntimeError::Other( + "No alternative gateway available".into(), + )) + })?; + + *self.active_gateway.write().await = *next; + Ok(()) + } + + /// Get the current active gateway address. + pub async fn active_gateway(&self) -> SocketAddr { + *self.active_gateway.read().await + } + + /// Get all known gateway addresses. + pub async fn gateway_list(&self) -> Vec { + self.gateways.read().await.clone() + } + + /// Shutdown the connector. + pub fn close(&self) { + self.http_client.shutdown(); + } + + /// Internal: send a GET/POST to a gateway management endpoint. + async fn ask_gateway(&self, gateway: SocketAddr, path: &str, body: &[u8]) -> Result> { + self.http_client + .ask(gateway, path, "client", body.to_vec()) + .await + } + + /// Get a reference to the underlying HTTP/2 client. + pub fn http_client(&self) -> &Arc { + &self.http_client + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_resolve_response_deserialize() { + let json = r#"{"found":true,"path":"services/counter","gateway":"10.0.1.1:8080","instance_count":2}"#; + let resp: ResolveResponse = serde_json::from_str(json).unwrap(); + assert!(resp.found); + assert_eq!(resp.path, "services/counter"); + assert_eq!(resp.instance_count, 2); + } + + #[test] + fn test_members_response_deserialize() { + let json = r#"{"gateways":["10.0.1.1:8080","10.0.1.2:8080"]}"#; + let resp: MembersResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.gateways.len(), 2); + } + + #[tokio::test] + async fn test_connect_empty_addrs() { + let result = PulsingConnect::connect_multi(vec![]).await; + assert!(result.is_err()); + } +} diff --git a/crates/pulsing-actor/src/connect/reference.rs b/crates/pulsing-actor/src/connect/reference.rs new file mode 100644 index 000000000..74145c306 --- /dev/null +++ b/crates/pulsing-actor/src/connect/reference.rs @@ -0,0 +1,103 @@ +//! Actor reference for out-cluster connectors. +//! +//! `ConnectActorRef` routes all requests through a gateway node using the +//! `/named/{path}` endpoint. The gateway handles internal cluster routing +//! transparently. + +use crate::actor::Message; +use crate::error::Result; +use crate::transport::{Http2Client, StreamFrame, StreamHandle}; +use std::net::SocketAddr; +use std::sync::Arc; + +/// Actor reference used by out-cluster connectors. +/// +/// All requests are routed through the gateway node. The connector never +/// communicates directly with the target actor's node. +#[derive(Clone)] +pub struct ConnectActorRef { + gateway: SocketAddr, + path: String, + http_client: Arc, +} + +impl ConnectActorRef { + pub fn new(gateway: SocketAddr, path: String, http_client: Arc) -> Self { + Self { + gateway, + path, + http_client, + } + } + + /// Send a request and wait for a response. + pub async fn ask(&self, msg_type: &str, payload: Vec) -> Result> { + let url_path = format!("/named/{}", self.path); + self.http_client + .ask(self.gateway, &url_path, msg_type, payload) + .await + } + + /// Fire-and-forget message. + pub async fn tell(&self, msg_type: &str, payload: Vec) -> Result<()> { + let url_path = format!("/named/{}", self.path); + self.http_client + .tell(self.gateway, &url_path, msg_type, payload) + .await + } + + /// Streaming request — returns a stream of frames. + pub async fn ask_stream( + &self, + msg_type: &str, + payload: Vec, + ) -> Result> { + let url_path = format!("/named/{}", self.path); + self.http_client + .ask_stream(self.gateway, &url_path, msg_type, payload) + .await + } + + /// Send a full `Message` (single or stream) and receive a `Message` response. + pub async fn send_message(&self, msg: Message) -> Result { + let url_path = format!("/named/{}", self.path); + self.http_client + .send_message_full(self.gateway, &url_path, msg) + .await + } + + /// Get the gateway address this reference routes through. + pub fn gateway(&self) -> SocketAddr { + self.gateway + } + + /// Get the actor path. + pub fn path(&self) -> &str { + &self.path + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::transport::Http2Config; + + #[test] + fn test_connect_actor_ref_accessors() { + let client = Arc::new(Http2Client::new(Http2Config::default())); + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let r = ConnectActorRef::new(addr, "services/counter".into(), client); + assert_eq!(r.gateway(), addr); + assert_eq!(r.path(), "services/counter"); + } + + #[test] + fn test_connect_actor_ref_clone() { + let client = Arc::new(Http2Client::new(Http2Config::default())); + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let r = ConnectActorRef::new(addr, "services/counter".into(), client); + let r2 = r.clone(); + assert_eq!(r.gateway(), r2.gateway()); + assert_eq!(r.path(), r2.path()); + } +} diff --git a/crates/pulsing-actor/src/lib.rs b/crates/pulsing-actor/src/lib.rs index 412265bc9..ca3cadf02 100644 --- a/crates/pulsing-actor/src/lib.rs +++ b/crates/pulsing-actor/src/lib.rs @@ -137,6 +137,7 @@ pub mod actor; pub mod behavior; pub mod circuit_breaker; pub mod cluster; +pub mod connect; pub mod error; pub mod metrics; pub mod policies; diff --git a/crates/pulsing-actor/src/system/handler.rs b/crates/pulsing-actor/src/system/handler.rs index 5520dd143..6b278cf75 100644 --- a/crates/pulsing-actor/src/system/handler.rs +++ b/crates/pulsing-actor/src/system/handler.rs @@ -347,6 +347,64 @@ impl Http2ServerHandler for SystemMessageHandler { serde_json::json!(actors) } + async fn handle_client_resolve(&self, path: &str) -> serde_json::Value { + let cluster_guard = self.cluster.read().await; + if let Some(cluster) = cluster_guard.as_ref() { + let actor_path = match ActorPath::new(path) { + Ok(p) => p, + Err(_) => { + return serde_json::json!({ + "found": false, + "path": path, + "gateway": "", + "instance_count": 0, + }); + } + }; + + let info = cluster.lookup_named_actor(&actor_path).await; + match info { + Some(info) => { + let local_addr = cluster.local_addr(); + serde_json::json!({ + "found": true, + "path": path, + "gateway": local_addr.to_string(), + "instance_count": info.instance_count(), + }) + } + None => { + serde_json::json!({ + "found": false, + "path": path, + "gateway": "", + "instance_count": 0, + }) + } + } + } else { + // Standalone mode: check local registry + let found = self.registry.get_actor_name_by_path(path).is_some(); + serde_json::json!({ + "found": found, + "path": path, + "gateway": "", + "instance_count": if found { 1 } else { 0 }, + }) + } + } + + async fn handle_client_members(&self) -> serde_json::Value { + let cluster_guard = self.cluster.read().await; + if let Some(cluster) = cluster_guard.as_ref() { + let members = cluster.alive_members().await; + let gateways: Vec = members.iter().map(|m| m.addr.to_string()).collect(); + serde_json::json!({ "gateways": gateways }) + } else { + serde_json::json!({ "gateways": [] }) + } + } + fn as_any(&self) -> &dyn std::any::Any { self } diff --git a/crates/pulsing-actor/src/transport/http2/server.rs b/crates/pulsing-actor/src/transport/http2/server.rs index 49a83ac40..51d46f878 100644 --- a/crates/pulsing-actor/src/transport/http2/server.rs +++ b/crates/pulsing-actor/src/transport/http2/server.rs @@ -81,6 +81,17 @@ pub trait Http2ServerHandler: Send + Sync + 'static { serde_json::json!([]) } + /// Handle client resolve request (out-cluster). + async fn handle_client_resolve(&self, path: &str) -> serde_json::Value { + let _ = path; + serde_json::json!({"found": false, "path": path, "gateway": "", "instance_count": 0}) + } + + /// Handle client members request (out-cluster). + async fn handle_client_members(&self) -> serde_json::Value { + serde_json::json!({"gateways": []}) + } + /// Get as Any for downcasting. fn as_any(&self) -> &dyn std::any::Any; @@ -321,6 +332,35 @@ impl Http2Server { return Ok(json_response(StatusCode::OK, body)); } + // Client resolve endpoint (out-cluster) + if path == "/client/resolve" && method == Method::POST { + let body_bytes = match req.collect().await { + Ok(collected) => collected.to_bytes().to_vec(), + Err(e) => { + return Ok(error_response( + StatusCode::BAD_REQUEST, + format!("Failed to read body: {}", e).into_bytes(), + )); + } + }; + let resolve_req: serde_json::Value = + serde_json::from_slice(&body_bytes).unwrap_or_default(); + let actor_path = resolve_req + .get("path") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let result = handler.handle_client_resolve(actor_path).await; + let body = serde_json::to_vec(&result).unwrap_or_default(); + return Ok(json_response(StatusCode::OK, body)); + } + + // Client members endpoint (out-cluster) + if path == "/client/members" && method == Method::GET { + let result = handler.handle_client_members().await; + let body = serde_json::to_vec(&result).unwrap_or_default(); + return Ok(json_response(StatusCode::OK, body)); + } + // Head node API endpoints if path.starts_with("/cluster/head/") { let body_bytes = match req.collect().await { diff --git a/crates/pulsing-py/src/actor.rs b/crates/pulsing-py/src/actor.rs index f39175571..1d4039deb 100644 --- a/crates/pulsing-py/src/actor.rs +++ b/crates/pulsing-py/src/actor.rs @@ -367,7 +367,7 @@ impl PyMessage { impl PyMessage { /// Convert to Rust Message - fn to_message(&self) -> Message { + pub(crate) fn to_message(&self) -> Message { if self.stream_reader.is_some() { Message::single(&self.msg_type, Vec::new()) } else { @@ -376,7 +376,7 @@ impl PyMessage { } /// Create from Rust Message (supports both single and stream) - fn from_rust_message(msg: Message) -> Self { + pub(crate) fn from_rust_message(msg: Message) -> Self { match msg { Message::Single { msg_type, data } => Self { msg_type, @@ -741,7 +741,7 @@ async fn reassemble_zerocopy_stream( /// /// Small zerocopy payloads → `Message::Single`; large ones → `Message::Stream` /// (descriptor-first + chunked data). Non-zerocopy objects → pickle. -fn encode_python_payload(py: Python<'_>, obj: &PyObject) -> PyResult { +pub(crate) fn encode_python_payload(py: Python<'_>, obj: &PyObject) -> PyResult { match zerocopy_mode().as_str() { "off" => Ok(Message::single(SEALED_PY_MSG_TYPE, pickle_object(py, obj)?)), "force" => { @@ -775,7 +775,7 @@ fn encode_zerocopy_message( /// Unified decoder: converts any `Message` (pickle / zerocopy-single / zerocopy-stream / other) /// into a Python object. -async fn decode_message_to_pyobject(msg: Message) -> PyResult { +pub(crate) async fn decode_message_to_pyobject(msg: Message) -> PyResult { match msg { Message::Single { ref msg_type, diff --git a/crates/pulsing-py/src/connect.rs b/crates/pulsing-py/src/connect.rs new file mode 100644 index 000000000..ac3f90684 --- /dev/null +++ b/crates/pulsing-py/src/connect.rs @@ -0,0 +1,188 @@ +//! Python bindings for the out-cluster PulsingConnect. + +use pulsing_actor::connect::{ConnectActorRef, PulsingConnect}; +use pyo3::exceptions::PyRuntimeError; +use pyo3::prelude::*; +use std::net::SocketAddr; +use std::sync::Arc; + +use crate::actor::PyMessage; + +fn to_pyerr(err: pulsing_actor::error::PulsingError) -> PyErr { + crate::errors::pulsing_error_to_py_err(err) +} + +/// Out-cluster connector — connects to a gateway without joining the cluster. +#[pyclass(name = "PulsingConnect")] +pub struct PyPulsingConnect { + inner: Arc, +} + +#[pymethods] +impl PyPulsingConnect { + /// Connect to a single gateway address. + #[staticmethod] + fn connect<'py>(py: Python<'py>, addr: String) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let socket_addr: SocketAddr = addr + .parse() + .map_err(|e: std::net::AddrParseError| PyRuntimeError::new_err(e.to_string()))?; + let conn = PulsingConnect::connect(socket_addr) + .await + .map_err(to_pyerr)?; + Ok(PyPulsingConnect { inner: conn }) + }) + } + + /// Connect to multiple gateway addresses (for failover). + #[staticmethod] + fn connect_multi<'py>(py: Python<'py>, addrs: Vec) -> PyResult> { + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let socket_addrs: Vec = addrs + .iter() + .map(|a| { + a.parse::() + .map_err(|e| PyRuntimeError::new_err(e.to_string())) + }) + .collect::>>()?; + let conn = PulsingConnect::connect_multi(socket_addrs) + .await + .map_err(to_pyerr)?; + Ok(PyPulsingConnect { inner: conn }) + }) + } + + /// Resolve a named actor, returning a ConnectActorRef proxy. + fn resolve<'py>(&self, py: Python<'py>, path: String) -> PyResult> { + let conn = self.inner.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let actor_ref = conn.resolve(&path).await.map_err(to_pyerr)?; + Ok(PyConnectActorRef { + inner: actor_ref, + #[allow(clippy::redundant_clone)] + _conn: conn.clone(), + }) + }) + } + + /// Create a ConnectActorRef for a known path without resolve check. + fn actor_ref<'py>(&self, py: Python<'py>, path: String) -> PyResult> { + let conn = self.inner.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let gateway = conn.active_gateway().await; + let actor_ref = ConnectActorRef::new(gateway, path, conn.http_client().clone()); + Ok(PyConnectActorRef { + inner: actor_ref, + #[allow(clippy::redundant_clone)] + _conn: conn.clone(), + }) + }) + } + + /// Refresh the gateway list from the cluster. + fn refresh_gateways<'py>(&self, py: Python<'py>) -> PyResult> { + let conn = self.inner.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + conn.refresh_gateways().await.map_err(to_pyerr)?; + Ok(()) + }) + } + + /// Get the current active gateway address. + fn active_gateway<'py>(&self, py: Python<'py>) -> PyResult> { + let conn = self.inner.clone(); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + Ok(conn.active_gateway().await.to_string()) + }) + } + + /// Close the connection. + fn close(&self) { + self.inner.close(); + } + + fn __repr__(&self) -> String { + "PulsingConnect(out-cluster)".to_string() + } +} + +/// Python wrapper for ConnectActorRef. +#[pyclass(name = "ConnectActorRef")] +#[derive(Clone)] +pub struct PyConnectActorRef { + inner: ConnectActorRef, + #[allow(dead_code)] + _conn: Arc, +} + +#[pymethods] +impl PyConnectActorRef { + /// Send a message and receive a response (supports pickle protocol). + fn ask<'py>(&self, py: Python<'py>, msg: PyObject) -> PyResult> { + let actor_ref = self.inner.clone(); + + let msg_bound = msg.bind(py); + let actor_msg = if msg_bound.is_instance_of::() { + let py_msg: PyMessage = msg_bound.extract()?; + py_msg.to_message() + } else { + crate::actor::encode_python_payload(py, &msg)? + }; + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let response = actor_ref.send_message(actor_msg).await.map_err(to_pyerr)?; + crate::actor::decode_message_to_pyobject(response).await + }) + } + + /// Fire-and-forget message. + fn tell<'py>(&self, py: Python<'py>, msg: PyObject) -> PyResult> { + let actor_ref = self.inner.clone(); + + let msg_bound = msg.bind(py); + let actor_msg = if msg_bound.is_instance_of::() { + let py_msg: PyMessage = msg_bound.extract()?; + py_msg.to_message() + } else { + crate::actor::encode_python_payload(py, &msg)? + }; + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + match actor_msg { + pulsing_actor::actor::Message::Single { msg_type, data } => { + actor_ref.tell(&msg_type, data).await.map_err(to_pyerr)?; + } + _ => { + return Err(PyRuntimeError::new_err("Streaming not supported for tell")); + } + } + Ok(()) + }) + } + + /// Get the actor path. + #[getter] + fn path(&self) -> String { + self.inner.path().to_string() + } + + /// Get the gateway address. + #[getter] + fn gateway(&self) -> String { + self.inner.gateway().to_string() + } + + fn __repr__(&self) -> String { + format!( + "ConnectActorRef(path='{}', gateway='{}')", + self.inner.path(), + self.inner.gateway() + ) + } +} + +pub fn add_to_module(m: &Bound<'_, pyo3::types::PyModule>) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + Ok(()) +} diff --git a/crates/pulsing-py/src/lib.rs b/crates/pulsing-py/src/lib.rs index 6eaea2298..8249c44d0 100644 --- a/crates/pulsing-py/src/lib.rs +++ b/crates/pulsing-py/src/lib.rs @@ -6,6 +6,7 @@ use pyo3::prelude::*; mod actor; +mod connect; mod errors; mod policies; mod python_error_converter; @@ -44,6 +45,9 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> { // Add load balancing policies policies::add_to_module(m)?; + // Add out-cluster connect + connect::add_to_module(m)?; + // Add version m.add("__version__", env!("CARGO_PKG_VERSION"))?; diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 5362b7ff1..c88b52a84 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -105,6 +105,7 @@ plugins: Actors: Actor 指南 Communication Patterns: 通信范式 Remote Actors: 远程 Actor + Out-Cluster Connect: 集群外连接 Operations: CLI 运维 Reliability: 可靠性 Security: 安全 @@ -115,6 +116,7 @@ plugins: Examples: 示例 Ping-Pong: Ping-Pong Distributed Counter: 分布式计数器 + Subprocess: 子进程 API Reference: API 参考 API Overview: API 概述 Python: Python API @@ -161,6 +163,7 @@ nav: - Actor Basics: guide/actors.md - Communication Patterns: guide/communication_patterns.md - Remote Actors: guide/remote_actors.md + - Out-Cluster Connect: guide/connect.md - Reliability: guide/reliability.md - User Guides: - Operations: guide/operations.md @@ -177,6 +180,7 @@ nav: - Overview: examples/index.md - Ping-Pong: examples/ping_pong.md - Distributed Counter: examples/distributed_counter.md + - Subprocess: examples/subprocess.md - API Reference: - API Overview: api/overview.md - Complete Reference: api_reference.md @@ -197,6 +201,7 @@ nav: - Implementation: - AS Actor Decorator: design/as-actor-decorator.md - Communication Evolution: design/cluster-communication-evolution.md + - Out-Cluster Connect: design/out-cluster-connect.md extra: generator: false diff --git a/docs/src/api/overview.md b/docs/src/api/overview.md index 630a2a46c..a8f722f76 100644 --- a/docs/src/api/overview.md +++ b/docs/src/api/overview.md @@ -15,6 +15,7 @@ Pulsing is built around the [Actor Model](https://en.wikipedia.org/wiki/Actor_mo - **Zero External Dependencies**: Pure Rust + Tokio implementation - **Built-in Service Discovery**: SWIM/Gossip protocol for cluster management - **Streaming Support**: Native support for streaming requests/responses +- **Subprocess-Compatible Execution**: `pulsing.subprocess` can keep stdlib semantics while optionally routing commands through Pulsing - **Multi-Language**: Python-first with Rust core, extensible to other languages ## Quick Start @@ -161,6 +162,26 @@ async def handle(msg): await reader.start() ``` +### Subprocess + +Use a stdlib-compatible synchronous API for command execution: + +```python +import pulsing.subprocess as subprocess + +result = subprocess.run(["echo", "hello"], capture_output=True, text=True) +remote = subprocess.run( + ["hostname"], + capture_output=True, + text=True, + resources={"num_cpus": 1}, +) +``` + +Without `resources`, calls behave like Python's native `subprocess`. With +`resources` and `USE_POLSING_SUBPROCESS=1`, commands are executed through the +Pulsing backend. See [Subprocess Example](../examples/subprocess.md). + ### Under the Hood #### ActorSystem (Explicit Management) diff --git a/docs/src/api/overview.zh.md b/docs/src/api/overview.zh.md index e32e8f0a0..4eeff2d06 100644 --- a/docs/src/api/overview.zh.md +++ b/docs/src/api/overview.zh.md @@ -15,6 +15,7 @@ Pulsing 基于 [Actor 模型](https://en.wikipedia.org/wiki/Actor_model)构建 - **零外部依赖**:纯 Rust + Tokio 实现 - **内置服务发现**:SWIM/Gossip 协议管理集群 - **流式支持**:原生支持流式请求/响应 +- **兼容 subprocess 的执行能力**:`pulsing.subprocess` 保持标准库语义,并可按需通过 Pulsing 执行命令 - **多语言**:Python 优先,Rust 核心,可扩展到其他语言 ## Quick Start @@ -160,6 +161,26 @@ async def handle(msg): await reader.start() ``` +### Subprocess + +可以使用与标准库兼容的同步 API 执行命令: + +```python +import pulsing.subprocess as subprocess + +result = subprocess.run(["echo", "hello"], capture_output=True, text=True) +remote = subprocess.run( + ["hostname"], + capture_output=True, + text=True, + resources={"num_cpus": 1}, +) +``` + +不传 `resources` 时,行为等同于 Python 原生 `subprocess`。 +传入 `resources` 且设置 `USE_POLSING_SUBPROCESS=1` 时,命令会通过 +Pulsing 后端执行。详见 [子进程示例](../examples/subprocess.zh.md)。 + ### Under the Hood #### ActorSystem(显式管理) diff --git a/docs/src/api/python.md b/docs/src/api/python.md index 74197c15b..82ccda48a 100644 --- a/docs/src/api/python.md +++ b/docs/src/api/python.md @@ -33,3 +33,12 @@ pip install -e . ## Queue Module ::: pulsing.streaming + +## Subprocess Module + +`pulsing.subprocess` provides a subprocess-compatible synchronous API. +Without `resources`, calls fall back to Python's native `subprocess`. +When `resources` is provided and `USE_POLSING_SUBPROCESS=1` is set, the +command is executed through Pulsing's backend. + +::: pulsing.subprocess diff --git a/docs/src/api/python.zh.md b/docs/src/api/python.zh.md index e88208be9..37a00e33e 100644 --- a/docs/src/api/python.zh.md +++ b/docs/src/api/python.zh.md @@ -33,3 +33,12 @@ pip install -e . ## 队列模块 ::: pulsing.streaming + +## Subprocess 模块 + +`pulsing.subprocess` 提供与标准库 `subprocess` 兼容的同步 API。 +不传 `resources` 时会回退到 Python 原生 `subprocess`; +传入 `resources` 且设置 `USE_POLSING_SUBPROCESS=1` 时, +命令会通过 Pulsing 后端执行。 + +::: pulsing.subprocess diff --git a/docs/src/design/out-cluster-connect.md b/docs/src/design/out-cluster-connect.md new file mode 100644 index 000000000..21e2634f4 --- /dev/null +++ b/docs/src/design/out-cluster-connect.md @@ -0,0 +1,403 @@ +# Out-Cluster Connect 设计文档 + +## 概述 + +本文档描述 Pulsing 的 **Out-Cluster 通信模式**设计。该模式允许不参与集群成员协议的外部连接器,通过连接集群中的任意节点(作为 gateway),透明地访问集群内的所有 actor。 + +### 背景 + +当前 Pulsing 只支持 **In-Cluster 通信模式**:每个 `ActorSystem` 必须作为集群成员加入(参与 Gossip / Head 模式),才能 resolve 和访问远程 actor。这意味着即使只是一个简单的调用者(如 CLI 工具、Web 后端、Notebook),也必须承担完整的集群成员开销。 + +### 动机 + +| 场景 | 为什么需要 Out-Cluster | +|------|-------------------------| +| **轻量调用者** | Notebook、CLI 工具、Web 后端只需要调用集群中的 actor,不需要自身被发现 | +| **短生命周期进程** | 请求式调用不应频繁触发集群成员抖动(join/leave) | +| **安全隔离** | 外部调用者不应获知集群内部拓扑 | +| **跨网络访问** | 客户端可能在不同的网络区域,无法被集群内节点反向探测 | + +--- + +## 设计目标 + +1. **API 一致性** — 连接器使用与 in-cluster 相同的 `resolve` + `proxy.method()` + `ask/tell/stream` 接口 +2. **零集群开销** — 连接器不参与 Gossip、不注册为节点、不运行故障检测 +3. **透明路由** — 连接器通过 gateway 节点访问集群内任意 actor,无需知道目标节点地址 +4. **高可用** — 支持多 gateway、自动故障切换 +5. **最小侵入** — 集群侧无需架构性改动,复用现有 HTTP/2 传输和路由机制 + +--- + +## 架构设计 + +### In-Cluster vs Out-Cluster 对比 + +```mermaid +graph TB + subgraph InCluster["In-Cluster 模式(现有)"] + N1["Node A
ActorSystem
+ Gossip"] + N2["Node B
ActorSystem
+ Gossip"] + N3["Node C
ActorSystem
+ Gossip"] + N1 <-->|"Gossip + RPC"| N2 + N2 <-->|"Gossip + RPC"| N3 + N1 <-->|"Gossip + RPC"| N3 + end + + subgraph OutCluster["Out-Cluster 模式(新增)"] + C1["Connect
(无 Gossip)"] + C2["Connect
(无 Gossip)"] + GW["Gateway
(集群节点)"] + NA["Node A"] + NB["Node B"] + C1 -->|"HTTP/2 RPC"| GW + C2 -->|"HTTP/2 RPC"| GW + GW <-->|"集群内路由"| NA + GW <-->|"集群内路由"| NB + end + + style InCluster fill:#e3f2fd,stroke:#1976d2 + style OutCluster fill:#e8f5e9,stroke:#388e3c +``` + +### 核心思路:集群节点即 Gateway + +**不引入独立的 gateway 组件。** 集群内每个节点的 HTTP/2 server 已经具备处理 `/named/{path}` 和 `/actors/{id}` 请求的能力。对于外部连接器,集群节点天然就是 gateway——它接收请求,在本地或通过集群内路由转发到目标 actor。 + +连接器只需要: + +1. 一个 HTTP/2 client(复用现有 `Http2Client`) +2. 一个 resolve RPC 协议(新增 `/client/resolve` 端点) +3. `ConnectProxy` 包装(与 `ActorProxy` 接口一致) + +--- + +## 协议设计 + +### 新增端点 + +在现有 HTTP/2 server 上新增以下端点,专为 out-cluster 连接器服务: + +| 端点 | 方法 | 说明 | +|------|------|------| +| `/client/resolve` | POST | 解析命名 actor,返回路由信息 | +| `/client/members` | GET | 获取可用 gateway 节点列表(用于故障切换) | + +现有端点直接复用,无需修改: + +| 端点 | 说明 | +|------|------| +| `POST /named/{path}` | 发送消息给命名 actor(gateway 自动路由) | +| `POST /actors/{id}` | 发送消息给指定 actor | + +### Resolve 协议 + +```mermaid +sequenceDiagram + participant C as Connect + participant GW as Gateway Node + participant Cluster as Cluster Registry + + C->>GW: POST /client/resolve
{"path": "services/llm/router"} + GW->>Cluster: NamingBackend.resolve(path) + Cluster-->>GW: [instances on node_a, node_b] + GW-->>C: {"found": true,
"gateway": "gw_addr",
"path": "services/llm/router",
"instance_count": 2} + + Note over C: Connect 拿到路由信息
后续调用经 gateway 转发 + + C->>GW: POST /named/services/llm/router
x-message-mode: ask
[payload] + GW->>Cluster: 路由到目标节点 + Cluster-->>GW: [response] + GW-->>C: [response] +``` + +关键设计决策:**Server-side resolve**。连接器不知道也不需要知道目标 actor 的真实位置。所有路由决策在 gateway 侧完成。 + +### Resolve 请求/响应格式 + +```rust +/// Resolve 请求 +#[derive(Serialize, Deserialize)] +struct ResolveRequest { + path: String, +} + +/// Resolve 响应 +#[derive(Serialize, Deserialize)] +pub struct ResolveResponse { + pub found: bool, + pub path: String, + pub gateway: String, + pub instance_count: usize, +} +``` + +### Gateway 列表协议 + +```mermaid +sequenceDiagram + participant C as Connect + participant GW as Gateway Node + + C->>GW: GET /client/members + GW-->>C: {"gateways": ["addr1:port", "addr2:port", "addr3:port"]} + + Note over C: Connect 缓存 gateway 列表
用于故障切换和负载均衡 +``` + +连接器初始连接到一个 gateway 后,可获取其他可用 gateway 节点地址,实现故障切换。 + +--- + +## Connect SDK 设计 + +### Python API + +```python +from pulsing.connect import Connect + +# 连接到集群任意节点 +conn = await Connect.to("10.0.1.1:8080") + +# 支持多 gateway(故障切换) +conn = await Connect.to(["10.0.1.1:8080", "10.0.1.2:8080"]) + +# resolve 命名 actor +counter = await conn.resolve("services/counter") + +# 调用方法(与 ActorProxy 一致) +result = await counter.increment() +value = await counter.get_value() + +# 远程 spawn actor(在集群节点上创建) +calc = await conn.spawn(Calculator, init_value=100, name="services/calc") +result = await calc.multiply(6, 7) + +# 流式调用 +async for token in llm.generate(prompt="Hello"): + print(token, end="") + +# 关闭连接 +await conn.close() +``` + +### 与 In-Cluster API 对比 + +```python +# === In-Cluster(现有) === +await init() # 启动完整 ActorSystem,加入集群 +counter = await Counter.spawn(init=10) # 可以 spawn +ref = await resolve("services/counter") # 可以 resolve +result = await ref.increment() # 可以 ask/tell + +# === Out-Cluster(新增) === +conn = await Connect.to("10.0.1.1:8080") # 轻量连接,不加入集群 +counter = await conn.resolve("services/counter") # 可以 resolve +result = await counter.increment() # 可以调用方法(完全一致) +counter = await conn.spawn(Counter, name="services/new_counter") # 远程 spawn +``` + +| 能力 | In-Cluster | Out-Cluster Connect | +|------|:---:|:---:| +| `resolve` 命名 actor | ✅ | ✅ | +| 同步方法调用 | ✅ | ✅ | +| 异步方法调用 | ✅ | ✅ | +| 流式调用 (streaming) | ✅ | ✅ | +| `spawn` actor(远程) | ✅ | ✅ | +| `spawn` actor(本地) | ✅ | ❌ | +| 被其他 actor 发现 | ✅ | ❌ | +| 参与 Gossip | ✅ | ❌ | +| 占用端口 | ✅ | ❌ | + +### 远程 Spawn + +`Connect.spawn()` 允许通过 gateway 在集群节点上创建新 actor。内部复用集群已有的 `PythonActorService`(每个节点自动运行的 actor 创建服务),无需新增 gateway 端点: + +```python +# 在集群上远程 spawn,返回 ConnectProxy +calc = await conn.spawn(Calculator, init_value=100, name="services/calc") +result = await calc.multiply(6, 7) # 42 + +# 也可以 spawn 后通过 resolve 获取 +await conn.spawn(Worker, name="services/worker") +worker = await conn.resolve("services/worker") +``` + +spawn 流程: + +1. `Connect.spawn()` 通过 gateway 向 `system/python_actor_service` 发送 `CreateActor` 消息 +2. 目标节点的 `PythonActorService` 在本地创建 actor 实例 +3. 返回 `ConnectProxy`,后续调用与 `resolve()` 获取的 proxy 完全一致 + +**前提条件**:目标节点必须已注册对应的 `@remote` 类(即目标节点的代码中包含该类的定义)。 + +### 内部实现 + +```mermaid +graph TB + subgraph ConnectSDK["Python Connect SDK"] + Connect["Connect
to() / close() / resolve() / spawn()"] + Proxy["ConnectProxy
method() → ask/stream"] + Ref["ConnectActorRef
gateway_addr + path"] + end + + subgraph RustCore["Rust Core"] + PC["PulsingConnect
连接管理 / 故障切换"] + CAR["ConnectActorRef
ask / tell / ask_stream"] + H2C["Http2Client
连接池 / 重试"] + end + + subgraph Gateway["Gateway Node"] + H2S["Http2Server"] + Handler["SystemMessageHandler"] + Registry["NamingBackend"] + end + + Connect --> PC + Connect --> Proxy + Proxy --> Ref + Ref --> CAR + PC --> H2C + CAR --> H2C + H2C -->|"HTTP/2"| H2S + H2S --> Handler + Handler --> Registry + + style ConnectSDK fill:#e8f5e9,stroke:#388e3c + style RustCore fill:#fff3e0,stroke:#f57c00 + style Gateway fill:#e3f2fd,stroke:#1976d2 +``` + +--- + +## Rust 层实现 + +### PulsingConnect + +```rust +/// Out-cluster 连接器(不参与集群成员协议) +pub struct PulsingConnect { + active_gateway: RwLock, + gateways: RwLock>, + http_client: Arc, +} + +impl PulsingConnect { + pub async fn connect(gateway_addr: SocketAddr) -> Result>; + pub async fn connect_multi(gateway_addrs: Vec) -> Result>; + pub async fn resolve(&self, path: &str) -> Result; + pub async fn refresh_gateways(&self) -> Result<()>; + pub async fn failover(&self) -> Result<()>; + pub async fn active_gateway(&self) -> SocketAddr; + pub fn close(&self); +} +``` + +### ConnectActorRef + +```rust +/// 面向 out-cluster 连接器的 actor 引用 +pub struct ConnectActorRef { + gateway: SocketAddr, + path: String, + http_client: Arc, +} + +impl ConnectActorRef { + pub async fn ask(&self, msg_type: &str, payload: Vec) -> Result>; + pub async fn tell(&self, msg_type: &str, payload: Vec) -> Result<()>; + pub async fn ask_stream(&self, msg_type: &str, payload: Vec) + -> Result>; + pub async fn send_message(&self, msg: Message) -> Result; +} +``` + +### Gateway 侧实现 + +Gateway 侧在 `Http2ServerHandler` trait 上新增两个默认方法,由 `SystemMessageHandler` 提供实际实现: + +```rust +// Http2ServerHandler trait 新增默认方法 +async fn handle_client_resolve(&self, path: &str) -> serde_json::Value; +async fn handle_client_members(&self) -> serde_json::Value; + +// SystemMessageHandler 实现 +impl SystemMessageHandler { + async fn handle_client_resolve(&self, path: &str) -> serde_json::Value { + // 集群模式:通过 NamingBackend 查找 + // 单机模式:通过本地 registry 查找 + } + + async fn handle_client_members(&self) -> serde_json::Value { + // 返回所有存活成员的地址列表 + } +} +``` + +--- + +## 连接管理与故障转移 + +### 多 Gateway 策略 + +```mermaid +flowchart TD + C["Connect"] --> LB["Gateway 选择"] + LB --> G1["Gateway 1 ✅"] + LB --> G2["Gateway 2 ✅"] + LB --> G3["Gateway 3 ❌"] + + G1 --> Cluster["Cluster"] + G2 --> Cluster + + style G3 fill:#ffcdd2,stroke:#c62828 +``` + +连接器维护一个 gateway 列表,采用以下策略: + +1. **初始连接**:尝试连接列表中的第一个可用 gateway +2. **定期刷新**:通过 `GET /client/members` 更新 gateway 列表 +3. **故障切换**:当前 gateway 不可达时,自动切换到下一个 +4. **健康检查**:可选的周期性 ping(复用 HTTP/2 keepalive) + +### 引用失效处理 + +当持有的 `ConnectActorRef` 指向的 actor 已迁移或停止: + +1. Gateway 路由时发现目标不可达,返回特定错误码 +2. 连接器可选择重新 resolve 或向上层报错 +3. 不自动重试(避免隐藏错误),由应用层决策 + +--- + +## 业界参考 + +| 框架 | 模式 | 与本设计的关系 | +|------|------|---------------| +| **Akka ClusterClient** | 连接 ClusterReceptionist | 类似思路,但已废弃。教训:避免紧耦合 | +| **Orleans IClusterClient** | 通过 Gateway Silo 连接 | 最接近的参考。Orleans 推荐 co-hosted 以减少网络开销 | +| **Ray Client** | gRPC 连接 head 节点 | 单 Proxy Actor 瓶颈(70 QPS)。我们通过多 gateway 避免 | +| **Dapr Sidecar** | HTTP/gRPC 代理 | sidecar 模式解耦了客户端版本。但部署复杂度更高 | + +--- + +## 设计决策记录 + +| 决策 | 选择 | 理由 | +|------|------|------| +| Gateway 实现方式 | 集群节点即 gateway | 零新增组件,复用现有 HTTP/2 server | +| Resolve 位置 | Server-side | 不暴露集群拓扑,安全性更好 | +| 传输协议 | 复用 HTTP/2 | 与 in-cluster 一致,零额外端口 | +| 连接器能力范围 | resolve + spawn + method call + streaming | spawn 复用 PythonActorService,无需新端点 | +| 引用失效策略 | 报错,不自动重试 | 避免隐藏错误,由应用层控制 | +| 命名:connect vs client | `Connect` / `PulsingConnect` | "connect" 更准确地描述了建立连接的语义,而非客户端-服务端的角色关系 | + +--- + +## 参见 + +- [Out-Cluster Connect 用户指南](../guide/connect.md) — 使用教程与最佳实践 +- [Cluster Networking](cluster-networking.md) — 集群组网设计 +- [HTTP2 Transport](http2-transport.md) — HTTP/2 传输层设计 +- [Actor Addressing](actor-addressing.md) — Actor 寻址设计 +- [Node Discovery](node-discovery.md) — 节点发现设计 diff --git a/docs/src/examples/index.md b/docs/src/examples/index.md index bd9318903..3919bfe14 100644 --- a/docs/src/examples/index.md +++ b/docs/src/examples/index.md @@ -151,6 +151,33 @@ class WorkerPool: return stats ``` +### Subprocess + +Run shell commands with a stdlib-compatible API: + +```python +import pulsing.subprocess as subprocess + +result = subprocess.run(["echo", "hello"], capture_output=True, text=True) +print(result.stdout.strip()) + +remote = subprocess.run( + ["hostname"], + capture_output=True, + text=True, + resources={"num_cpus": 1}, +) +print(remote.stdout.strip()) +``` + +Run the full example: + +```bash +python examples/python/subprocess_example.py +python examples/python/subprocess_example.py --resources +USE_POLSING_SUBPROCESS=1 python examples/python/subprocess_example.py --resources +``` + ## LLM Inference Examples ### Simple LLM Service @@ -274,6 +301,7 @@ cd examples/agent/langgraph && ./run_distributed.sh - [Ping-Pong](ping_pong.md) - Basic actor communication - [Distributed Counter](distributed_counter.md) - Shared state across nodes +- [Subprocess](subprocess.md) - Subprocess-compatible command execution - [LLM Inference](llm_inference.md) - Building inference services - [AutoGen Integration](../agent/autogen.md) - Distributed AutoGen agents - [LangGraph Integration](../agent/langgraph.md) - Distributed LangGraph workflows @@ -285,6 +313,7 @@ Most examples can be run directly: ```bash # Run a single example python examples/hello_world.py +python examples/python/subprocess_example.py # Run distributed examples (need multiple terminals) # Terminal 1: diff --git a/docs/src/examples/index.zh.md b/docs/src/examples/index.zh.md index 4917f83b9..d1c97d944 100644 --- a/docs/src/examples/index.zh.md +++ b/docs/src/examples/index.zh.md @@ -145,6 +145,33 @@ class WorkerPool: return await asyncio.gather(*futures) ``` +### Subprocess + +使用与标准库兼容的 API 执行命令: + +```python +import pulsing.subprocess as subprocess + +result = subprocess.run(["echo", "hello"], capture_output=True, text=True) +print(result.stdout.strip()) + +remote = subprocess.run( + ["hostname"], + capture_output=True, + text=True, + resources={"num_cpus": 1}, +) +print(remote.stdout.strip()) +``` + +运行完整示例: + +```bash +python examples/python/subprocess_example.py +python examples/python/subprocess_example.py --resources +USE_POLSING_SUBPROCESS=1 python examples/python/subprocess_example.py --resources +``` + ## LLM 推理示例 ### 简单 LLM 服务 @@ -237,6 +264,7 @@ cd examples/agent/langgraph && ./run_distributed.sh - [Ping-Pong](ping_pong.zh.md) - 基本 Actor 通信 - [分布式计数器](distributed_counter.zh.md) - 跨节点共享状态 +- [子进程](subprocess.zh.md) - 兼容 `subprocess` 的命令执行 - [LLM 推理](llm_inference.zh.md) - 构建推理服务 - [AutoGen 集成](../agent/autogen.zh.md) - 分布式 AutoGen Agent - [LangGraph 集成](../agent/langgraph.zh.md) - 分布式 LangGraph 工作流 @@ -248,6 +276,7 @@ cd examples/agent/langgraph && ./run_distributed.sh ```bash # 运行单个示例 python examples/hello_world.py +python examples/python/subprocess_example.py # 运行分布式示例(需要多个终端) # 终端 1: diff --git a/docs/src/examples/subprocess.md b/docs/src/examples/subprocess.md new file mode 100644 index 000000000..3903c7a9b --- /dev/null +++ b/docs/src/examples/subprocess.md @@ -0,0 +1,53 @@ +# Subprocess + +Run shell commands through a stdlib-compatible API. + +`pulsing.subprocess` mirrors Python's `subprocess` module and adds an optional +Pulsing-backed execution path when you want to attach resource requirements. + +## Run + +```bash +python examples/python/subprocess_example.py +python examples/python/subprocess_example.py --resources +USE_POLSING_SUBPROCESS=1 python examples/python/subprocess_example.py --resources +``` + +## Backend Selection + +- Without `resources`, calls go directly to Python's native `subprocess`. +- With `resources=...` but without `USE_POLSING_SUBPROCESS=1`, the example still uses the native backend. +- Only when both `resources` is provided and `USE_POLSING_SUBPROCESS=1` is set does execution switch to the Pulsing backend. + +## Code + +```python +import pulsing.subprocess as subprocess + +PIPE = subprocess.PIPE +extra = {"resources": {"num_cpus": 2}} + +result = subprocess.run( + ["echo", "hello from run()"], + capture_output=True, + text=True, + check=True, + **extra, +) + +proc = subprocess.Popen(["cat"], stdin=PIPE, stdout=PIPE, text=True, **extra) +stdout, _ = proc.communicate(input="pipe test") +``` + +The full example also covers: + +- `check_output()` for one-shot command capture +- `Popen(...).communicate()` with stdin/stdout/stderr +- `TimeoutExpired` handling +- Multi-turn shell sessions through a persistent `/bin/sh` + +## Key Points + +- Import it as `import pulsing.subprocess as subprocess` to keep the same calling style as the stdlib. +- The resource-backed path is opt-in, so existing `subprocess`-style code can migrate incrementally. +- In resource-backed mode, Pulsing is initialized lazily by the module itself. diff --git a/docs/src/examples/subprocess.zh.md b/docs/src/examples/subprocess.zh.md new file mode 100644 index 000000000..1c14f12e6 --- /dev/null +++ b/docs/src/examples/subprocess.zh.md @@ -0,0 +1,53 @@ +# 子进程 + +通过与标准库兼容的 API 执行命令。 + +`pulsing.subprocess` 对齐 Python 的 `subprocess` 接口,并在需要声明资源时, +提供一个可选的 Pulsing 后端执行路径。 + +## 运行 + +```bash +python examples/python/subprocess_example.py +python examples/python/subprocess_example.py --resources +USE_POLSING_SUBPROCESS=1 python examples/python/subprocess_example.py --resources +``` + +## 后端选择 + +- 不传 `resources` 时,调用直接走 Python 原生 `subprocess`。 +- 传了 `resources=...` 但没有设置 `USE_POLSING_SUBPROCESS=1` 时,示例仍然使用原生后端。 +- 只有同时满足“传入 `resources`”和“设置 `USE_POLSING_SUBPROCESS=1`”两个条件时,才会切到 Pulsing 后端。 + +## 代码 + +```python +import pulsing.subprocess as subprocess + +PIPE = subprocess.PIPE +extra = {"resources": {"num_cpus": 2}} + +result = subprocess.run( + ["echo", "hello from run()"], + capture_output=True, + text=True, + check=True, + **extra, +) + +proc = subprocess.Popen(["cat"], stdin=PIPE, stdout=PIPE, text=True, **extra) +stdout, _ = proc.communicate(input="pipe test") +``` + +完整示例还覆盖了: + +- `check_output()` 的单次命令输出采集 +- 带 stdin/stdout/stderr 的 `Popen(...).communicate()` +- `TimeoutExpired` 超时处理 +- 基于持久 `/bin/sh` 的多轮 shell 会话 + +## 要点 + +- 建议使用 `import pulsing.subprocess as subprocess`,保持和标准库一致的调用方式。 +- 资源调度路径是显式开启的,已有 `subprocess` 风格代码可以渐进迁移。 +- 在资源后端模式下,模块会懒初始化 Pulsing,无需调用方提前 `await pul.init()`。 diff --git a/docs/src/guide/connect.md b/docs/src/guide/connect.md new file mode 100644 index 000000000..9ee8a9fef --- /dev/null +++ b/docs/src/guide/connect.md @@ -0,0 +1,244 @@ +# Out-Cluster Connect + +Access actors in a Pulsing cluster **without joining the cluster**. + +`pulsing.connect` provides a lightweight connector that talks to any cluster node (acting as a gateway) and transparently routes requests to any actor. The connector does not participate in gossip, does not register as a node, and does not run fault detection — keeping it extremely lightweight. + +## When to Use + +| Scenario | Why Connect? | +|----------|-------------| +| **Notebook / CLI** | You only need to call actors, not host them | +| **Web backend** | Short-lived requests shouldn't trigger cluster join/leave churn | +| **Cross-network** | The caller may be in a different network zone | +| **Security isolation** | External callers should not see internal cluster topology | + +!!! tip + If your process also needs to **host** actors (be discoverable, respond to messages), use the regular `pul.init()` + cluster join instead. `Connect` is for **callers only**. + +--- + +## Quick Start + +```python +from pulsing.connect import Connect + +# Connect to any cluster node (acts as gateway) +conn = await Connect.to("10.0.1.1:8080") + +# Resolve a named actor +counter = await conn.resolve("services/counter") + +# Call methods — same syntax as in-cluster ActorProxy +value = await counter.increment(5) +print(value) # 5 + +# Close when done +await conn.close() +``` + +That's it. No `init()`, no cluster configuration, no ports to open. + +--- + +## Communication Patterns + +### Sync Methods + +```python +conn = await Connect.to("10.0.1.1:8080") +calc = await conn.resolve("services/calculator") + +result = await calc.multiply(6, 7) # 42 +await calc.add(10) # stateful call +value = await calc.get() # 10 +``` + +### Async Methods + +Async methods on the remote actor work transparently: + +```python +svc = await conn.resolve("services/ai") + +# Async method — just await as usual +result = await svc.slow_process(data) +greeting = await svc.greet("world") # "hello world" +``` + +### Streaming (Async Generators) + +For actors that yield results incrementally (e.g., LLM token generation): + +```python +llm = await conn.resolve("services/llm") + +# Stream tokens as they're generated +async for token in llm.generate(prompt="Tell me a story"): + print(token, end="", flush=True) +``` + +!!! note + Streaming uses `async for proxy.method(args)` directly — **no `await`** before `async for`. The method call returns an async iterable. + +### Remote Spawn + +Spawn new actors on the cluster from the connector — the actor runs on a cluster node, not locally: + +```python +from pulsing.connect import Connect +from myapp.actors import Calculator, Worker + +conn = await Connect.to("10.0.1.1:8080") + +# Spawn with constructor arguments +calc = await conn.spawn(Calculator, init_value=100, name="services/calc") +result = await calc.multiply(6, 7) # 42 + +# Spawn an async actor +worker = await conn.spawn(Worker, name="services/worker") +status = await worker.process("task_data") +``` + +!!! note + The `@remote`-decorated class must be registered on the target cluster node. This means the cluster process must have imported the class definition. + +After spawning, the actor is fully accessible via `resolve()` too: + +```python +# Another connector (or the same one) can resolve the spawned actor +calc = await conn.resolve("services/calc") +await calc.add(50) +``` + +### Concurrent Calls + +Multiple calls can be made concurrently: + +```python +import asyncio + +svc = await conn.resolve("services/worker") +results = await asyncio.gather( + svc.process("task_a"), + svc.process("task_b"), + svc.process("task_c"), +) +``` + +--- + +## Multiple Gateways (High Availability) + +Pass a list of addresses for automatic failover: + +```python +conn = await Connect.to([ + "10.0.1.1:8080", + "10.0.1.2:8080", + "10.0.1.3:8080", +]) + +# If the active gateway goes down, the connector fails over +# to the next available one. +``` + +Refresh the gateway list from the cluster at any time: + +```python +await conn.refresh_gateways() +``` + +--- + +## Typed Resolve + +If you have the actor class definition available, pass it to `resolve()` for method validation: + +```python +from pulsing.connect import Connect +from myapp.actors import Calculator + +conn = await Connect.to("10.0.1.1:8080") +calc = await conn.resolve("services/calc", cls=Calculator) + +# Typo raises AttributeError immediately, not a remote error +calc.mulitply(6, 7) # AttributeError: No method 'mulitply' +``` + +--- + +## Error Handling + +### Actor Errors + +Errors raised inside the remote actor propagate to the caller: + +```python +from pulsing.exceptions import PulsingActorError + +try: + await calc.will_fail() +except PulsingActorError as e: + print(f"Actor error: {e}") +``` + +### Streaming Errors + +If an actor raises mid-stream, items received before the error are still available: + +```python +items = [] +try: + async for item in svc.partial_stream(10): + items.append(item) +except PulsingActorError as e: + print(f"Stream error after {len(items)} items: {e}") +``` + +### Resolve Errors + +Resolving a non-existent actor raises an error: + +```python +try: + await conn.resolve("services/nonexistent") +except Exception as e: + print(f"Not found: {e}") +``` + +--- + +## In-Cluster vs Out-Cluster + +| Capability | In-Cluster (`pul.init()`) | Out-Cluster (`Connect.to()`) | +|------------|:---:|:---:| +| `resolve` named actors | ✅ | ✅ | +| Sync method calls | ✅ | ✅ | +| Async method calls | ✅ | ✅ | +| Streaming | ✅ | ✅ | +| `spawn` actors (remote) | ✅ | ✅ | +| `spawn` actors (local) | ✅ | ❌ | +| Be discoverable by others | ✅ | ❌ | +| Gossip membership | ✅ | ❌ | +| Open a port | ✅ | ❌ | + +**Same actor code, same call syntax.** Only how you obtain the proxy differs. + +--- + +## Best Practices + +1. **Close connections** — Always call `await conn.close()` when done to release resources. +2. **Use multiple gateways** — Pass a list of addresses for production deployments. +3. **Handle errors** — Wrap remote calls in try-except blocks for robustness. +4. **Use typed resolve** — Pass `cls=` when you have the actor class for early error detection. +5. **Don't over-stream** — Use `await` for single results, `async for` only for actual streams. + +--- + +## Next Steps + +- [Remote Actors](remote_actors.md) — In-cluster actor communication +- [Communication Patterns](communication_patterns.md) — Sync, async, and streaming patterns +- [Design Doc: Out-Cluster Connect](../design/out-cluster-connect.md) — Architecture and protocol details diff --git a/docs/src/guide/connect.zh.md b/docs/src/guide/connect.zh.md new file mode 100644 index 000000000..2970f2bab --- /dev/null +++ b/docs/src/guide/connect.zh.md @@ -0,0 +1,243 @@ +# 集群外连接 (Out-Cluster Connect) + +**不加入集群**,直接访问 Pulsing 集群中的 actor。 + +`pulsing.connect` 提供一个轻量连接器,连接到集群的任意节点(作为 gateway),透明地路由请求到目标 actor。连接器不参与 gossip、不注册为节点、不运行故障检测——保持极致轻量。 + +## 适用场景 + +| 场景 | 为什么用 Connect? | +|------|-------------------| +| **Notebook / CLI** | 只需要调用 actor,不需要托管 actor | +| **Web 后端** | 短生命周期请求不应触发集群成员变动 | +| **跨网络** | 调用方可能在不同的网络区域 | +| **安全隔离** | 外部调用方不应看到集群内部拓扑 | + +!!! tip + 如果你的进程也需要**托管** actor(被发现、响应消息),请使用常规的 `pul.init()` + 集群加入。`Connect` 仅适用于**调用方**。 + +--- + +## 快速开始 + +```python +from pulsing.connect import Connect + +# 连接到集群任意节点(作为 gateway) +conn = await Connect.to("10.0.1.1:8080") + +# 解析命名 actor +counter = await conn.resolve("services/counter") + +# 调用方法——与集群内 ActorProxy 语法完全一致 +value = await counter.increment(5) +print(value) # 5 + +# 完成后关闭连接 +await conn.close() +``` + +就这么简单。不需要 `init()`,不需要集群配置,不需要开放端口。 + +--- + +## 通信范式 + +### 同步方法 + +```python +conn = await Connect.to("10.0.1.1:8080") +calc = await conn.resolve("services/calculator") + +result = await calc.multiply(6, 7) # 42 +await calc.add(10) # 有状态调用 +value = await calc.get() # 10 +``` + +### 异步方法 + +远程 actor 的异步方法透明工作: + +```python +svc = await conn.resolve("services/ai") + +# 异步方法——正常 await 即可 +result = await svc.slow_process(data) +greeting = await svc.greet("world") # "hello world" +``` + +### 流式传输(异步生成器) + +适用于增量返回结果的 actor(如 LLM token 生成): + +```python +llm = await conn.resolve("services/llm") + +# 逐 token 流式接收 +async for token in llm.generate(prompt="讲个故事"): + print(token, end="", flush=True) +``` + +!!! note + 流式使用 `async for proxy.method(args)` 直接迭代——`async for` 前面**不需要 `await`**。方法调用直接返回异步可迭代对象。 + +### 远程 Spawn + +通过连接器在集群节点上创建新 actor——actor 运行在集群节点上,而非本地: + +```python +from pulsing.connect import Connect +from myapp.actors import Calculator, Worker + +conn = await Connect.to("10.0.1.1:8080") + +# 带构造参数 spawn +calc = await conn.spawn(Calculator, init_value=100, name="services/calc") +result = await calc.multiply(6, 7) # 42 + +# spawn 异步 actor +worker = await conn.spawn(Worker, name="services/worker") +status = await worker.process("task_data") +``` + +!!! note + `@remote` 修饰的类必须在目标集群节点上已注册。即集群进程必须已导入该类定义。 + +spawn 后的 actor 也可以通过 `resolve()` 访问: + +```python +# 其他连接器(或同一个)可以 resolve 已 spawn 的 actor +calc = await conn.resolve("services/calc") +await calc.add(50) +``` + +### 并发调用 + +支持并发发起多个调用: + +```python +import asyncio + +svc = await conn.resolve("services/worker") +results = await asyncio.gather( + svc.process("task_a"), + svc.process("task_b"), + svc.process("task_c"), +) +``` + +--- + +## 多 Gateway(高可用) + +传入地址列表实现自动故障切换: + +```python +conn = await Connect.to([ + "10.0.1.1:8080", + "10.0.1.2:8080", + "10.0.1.3:8080", +]) + +# 当前 gateway 不可用时,连接器自动切换到下一个 +``` + +随时从集群刷新 gateway 列表: + +```python +await conn.refresh_gateways() +``` + +--- + +## 类型化 Resolve + +如果有 actor 类定义,传入 `cls=` 参数可提前验证方法名: + +```python +from pulsing.connect import Connect +from myapp.actors import Calculator + +conn = await Connect.to("10.0.1.1:8080") +calc = await conn.resolve("services/calc", cls=Calculator) + +# 拼写错误会立即抛出 AttributeError,而不是远程错误 +calc.mulitply(6, 7) # AttributeError: No method 'mulitply' +``` + +--- + +## 错误处理 + +### Actor 错误 + +远程 actor 内部抛出的错误会传播到调用方: + +```python +from pulsing.exceptions import PulsingActorError + +try: + await calc.will_fail() +except PulsingActorError as e: + print(f"Actor 错误: {e}") +``` + +### 流式错误 + +如果 actor 在流中途抛出异常,已接收的数据仍然可用: + +```python +items = [] +try: + async for item in svc.partial_stream(10): + items.append(item) +except PulsingActorError as e: + print(f"流式错误,已收到 {len(items)} 项: {e}") +``` + +### Resolve 错误 + +解析不存在的 actor 会抛出异常: + +```python +try: + await conn.resolve("services/nonexistent") +except Exception as e: + print(f"未找到: {e}") +``` + +--- + +## 集群内 vs 集群外 + +| 能力 | 集群内 (`pul.init()`) | 集群外 (`Connect.to()`) | +|------|:---:|:---:| +| `resolve` 命名 actor | ✅ | ✅ | +| 同步方法调用 | ✅ | ✅ | +| 异步方法调用 | ✅ | ✅ | +| 流式传输 | ✅ | ✅ | +| `spawn` actor(远程) | ✅ | ✅ | +| `spawn` actor(本地) | ✅ | ❌ | +| 被其他 actor 发现 | ✅ | ❌ | +| Gossip 成员协议 | ✅ | ❌ | +| 占用端口 | ✅ | ❌ | + +**同样的 actor 代码,同样的调用语法。** 只有获取 proxy 的方式不同。 + +--- + +## 最佳实践 + +1. **关闭连接** — 用完后务必调用 `await conn.close()` 释放资源。 +2. **使用多 gateway** — 生产环境传入多个地址以实现高可用。 +3. **处理错误** — 在 try-except 中包裹远程调用以增强健壮性。 +4. **使用类型化 resolve** — 传入 `cls=` 参数可提前发现拼写错误。 +5. **区分流式与非流式** — 单个结果用 `await`,真正的流用 `async for`。 + +--- + +## 下一步 + +- [远程 Actor](remote_actors.md) — 集群内 actor 通信 +- [通信范式](communication_patterns.md) — 同步、异步、流式模式 +- [设计文档:Out-Cluster Connect](../design/out-cluster-connect.md) — 架构与协议细节 diff --git a/docs/src/index.md b/docs/src/index.md index 658bd4d34..2e881f666 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -75,6 +75,14 @@ A distributed actor runtime built in Rust, designed for Python. Connect AI agent [:octicons-arrow-right-24: Ray + Pulsing](quickstart/migrate_from_ray.md) +- :material-console-line:{ .lg .middle } **Subprocess with Resources** + + --- + + Keep the Python `subprocess` call style and optionally route commands through Pulsing when you attach `resources`. + + [:octicons-arrow-right-24: Subprocess Example](examples/subprocess.md) + --- @@ -119,6 +127,7 @@ asyncio.run(main()) | What is Pulsing / who is it for? | [Overview](overview.md) | | Understand the Actor model | [Actor Basics](guide/actors.md) | | Build a cluster | [Remote Actors](guide/remote_actors.md) | +| Run commands with `pulsing.subprocess` | [Subprocess Example](examples/subprocess.md) | | Operate your system | [CLI Operations](guide/operations.md) | | Architecture and design | [Architecture & Design](design/architecture.md) | | API details | [API Overview](api/overview.md) | diff --git a/docs/src/index.zh.md b/docs/src/index.zh.md index db78f0dd8..fc7b46ece 100644 --- a/docs/src/index.zh.md +++ b/docs/src/index.zh.md @@ -75,6 +75,14 @@ Actor 运行时。流式优先。零依赖。内置发现。 [:octicons-arrow-right-24: Ray + Pulsing](quickstart/migrate_from_ray.zh.md) +- :material-console-line:{ .lg .middle } **带资源约束的子进程** + + --- + + 保持 Python `subprocess` 的调用方式,并在附带 `resources` 时按需切到 Pulsing 执行后端。 + + [:octicons-arrow-right-24: 子进程示例](examples/subprocess.zh.md) + --- @@ -119,6 +127,7 @@ asyncio.run(main()) | Pulsing 是什么 / 适合谁? | [概述](overview.zh.md) | | 理解 Actor 模型 | [Actor 基础](guide/actors.zh.md) | | 构建集群 | [远程 Actor](guide/remote_actors.zh.md) | +| 使用 `pulsing.subprocess` 执行命令 | [子进程示例](examples/subprocess.zh.md) | | 运维系统 | [CLI 运维](guide/operations.zh.md) | | 架构与设计 | [架构与设计](design/architecture.zh.md) | | API 详情 | [API 概述](api/overview.zh.md) | diff --git a/docs/src/overview.md b/docs/src/overview.md index 8ba0896aa..223ca346d 100644 --- a/docs/src/overview.md +++ b/docs/src/overview.md @@ -17,6 +17,7 @@ In one sentence: turn any Python class into a distributed Actor with `@remote` | **LLM inference services** | Scalable backends with streaming, OpenAI-compatible API, and optional vLLM/Transformers workers. | | **Distributed agents** | Multi-agent systems with native integration for AutoGen and LangGraph; same code runs locally or across machines. | | **Enhance Ray communication** | Add streaming, actor discovery, and cross-cluster calls to Ray actors via `pul.mount()`. Use Ray for scheduling, Pulsing for communication. | +| **Resource-aware subprocess execution** | Keep a `subprocess`-compatible API while optionally routing commands through Pulsing when `resources` are provided. | | **Custom distributed apps** | Build services and workers that discover each other via built-in gossip or a head node, over a single HTTP/2 port. | --- @@ -38,6 +39,7 @@ You don't need to be a distributed systems expert to get value — the API is de - **Zero external dependencies** — Pure Rust core + Tokio; no etcd, NATS, or Redis. Cluster discovery uses built-in gossip or an optional head node. - **Location transparency** — Same API for local and remote actors: `await actor.method()` whether the actor is on this process or another machine. - **Python first** — `@pul.remote` turns a class into an Actor; `spawn()` and `resolve()` for creation and discovery; native async/await and streaming. +- **Incremental adoption** — `pulsing.subprocess` keeps the stdlib call style and lets you opt into Pulsing-backed execution only when needed. - **Single port** — Actor RPC and cluster protocol share one HTTP/2 port per node, simplifying deployment and firewalls. --- @@ -46,3 +48,4 @@ You don't need to be a distributed systems expert to get value — the API is de - **[Quick Start](quickstart/index.md)** — Run your first Actor in minutes, then go stateful and distributed. - **[Ray + Pulsing](quickstart/migrate_from_ray.md)** — Use Pulsing as Ray's communication layer, or use the standalone API. +- **[Subprocess Example](examples/subprocess.md)** — Run commands through a subprocess-compatible API with optional resource scheduling. diff --git a/docs/src/overview.zh.md b/docs/src/overview.zh.md index 0bf8eb32b..b224f4af6 100644 --- a/docs/src/overview.zh.md +++ b/docs/src/overview.zh.md @@ -17,6 +17,7 @@ Pulsing 是一个用 Rust 构建、为 Python 设计的分布式 Actor 运行时 | **LLM 推理服务** | 可扩展的推理后端、流式输出、OpenAI 兼容 API,以及可选的 vLLM/Transformers Worker。 | | **分布式 Agent** | 多智能体系统,原生集成 AutoGen 与 LangGraph;同一套代码可在本机或跨机运行。 | | **增强 Ray 通信** | 通过 `pul.mount()` 为 Ray Actor 增加流式、发现和跨集群调用能力。Ray 负责调度,Pulsing 负责通信。 | +| **带资源约束的子进程执行** | 保持 `subprocess` 兼容调用方式,并在传入 `resources` 时按需切到 Pulsing 后端。 | | **自定义分布式应用** | 通过内置 Gossip 或 Head 节点组网,单端口 HTTP/2,构建服务与 Worker。 | --- @@ -38,6 +39,7 @@ Pulsing 是一个用 Rust 构建、为 Python 设计的分布式 Actor 运行时 - **零外部依赖** — 核心纯 Rust + Tokio;不依赖 etcd、NATS、Redis。集群发现采用内置 Gossip 或可选 Head 节点。 - **位置透明** — 本地与远程 Actor 同一套 API:`await actor.method()` 无论 Actor 在本进程还是远程。 - **Python 优先** — `@pul.remote` 将类变成 Actor;`spawn()` / `resolve()` 用于创建与发现;原生 async/await 与流式。 +- **渐进式接入** — `pulsing.subprocess` 保持标准库调用风格,只在需要时显式切换到 Pulsing 后端。 - **单端口** — 每节点一个 HTTP/2 端口同时承载 Actor RPC 与集群协议,便于部署与防火墙配置。 --- @@ -46,3 +48,4 @@ Pulsing 是一个用 Rust 构建、为 Python 设计的分布式 Actor 运行时 - **[快速开始](quickstart/index.zh.md)** — 几分钟内跑起第一个 Actor,再进阶到有状态与分布式。 - **[Ray + Pulsing](quickstart/migrate_from_ray.zh.md)** — 用 Pulsing 作为 Ray 的通信层,或使用 Pulsing 独立 API。 +- **[子进程示例](examples/subprocess.zh.md)** — 通过兼容 `subprocess` 的 API 执行命令,并可选启用资源调度。 diff --git a/examples/README.md b/examples/README.md index 9f1ce3dab..b6a60cc14 100644 --- a/examples/README.md +++ b/examples/README.md @@ -65,10 +65,13 @@ cd examples/agent/langgraph && ./run_distributed.sh | `named_actors.py` | 服务发现 | | `cluster.py` | 集群通信 | | `remote_actor_example.py` | @remote 装饰器 | +| `subprocess_example.py` | `subprocess` 兼容 API | ```bash python examples/python/ping_pong.py python examples/python/cluster.py --port 8000 +python examples/python/subprocess_example.py +USE_POLSING_SUBPROCESS=1 python examples/python/subprocess_example.py --resources ``` ### ⭐⭐ CLI 工具 (`inspect/`) @@ -131,6 +134,7 @@ cargo run --example behavior_fsm -p pulsing-actor | AI 辩论/讨论 | `agent/pulsing/mbti_discussion.py` | | 并行任务竞争 | `agent/pulsing/parallel_ideas_async.py` | | 集群部署 | `python/cluster.py` | +| 子进程资源调度 | `python/subprocess_example.py` | | 学习 CLI 工具 | `inspect/demo_service.py` | | 接入 AutoGen | `agent/autogen/` | | 接入 LangGraph | `agent/langgraph/` | diff --git a/examples/python/README.md b/examples/python/README.md index 95b968ca1..2c23b5261 100644 --- a/examples/python/README.md +++ b/examples/python/README.md @@ -31,6 +31,8 @@ python examples/python/ping_pong.py # Basic communication python examples/python/message_patterns.py # RPC and streaming python examples/python/named_actors.py # Service discovery python examples/python/cluster.py # Multi-node (see --help) +python examples/python/subprocess_example.py # Native subprocess-compatible API +USE_POLSING_SUBPROCESS=1 python examples/python/subprocess_example.py --resources # Pulsing backend ``` ## API 选择 diff --git a/python/pulsing/connect/__init__.py b/python/pulsing/connect/__init__.py new file mode 100644 index 000000000..4e984a95b --- /dev/null +++ b/python/pulsing/connect/__init__.py @@ -0,0 +1,20 @@ +""" +Pulsing Connect — Out-Cluster Connector SDK + +Lightweight connector for accessing actors in a Pulsing cluster without +joining the cluster membership. Connects to any cluster node (acting as +a gateway) and transparently routes requests. + +Usage:: + + from pulsing.connect import Connect + + conn = await Connect.to("10.0.1.1:8080") + counter = await conn.resolve("services/counter") + result = await counter.increment() + await conn.close() +""" + +from .proxy import Connect, ConnectProxy + +__all__ = ["Connect", "ConnectProxy"] diff --git a/python/pulsing/connect/proxy.py b/python/pulsing/connect/proxy.py new file mode 100644 index 000000000..a23d84bc6 --- /dev/null +++ b/python/pulsing/connect/proxy.py @@ -0,0 +1,318 @@ +"""High-level Python connector for out-cluster actor access.""" + +import asyncio +import inspect +import logging +from typing import Any + +from pulsing._core import ConnectActorRef, PulsingConnect +from pulsing.core.protocol import _check_response, _wrap_call + +logger = logging.getLogger(__name__) + + +class ConnectProxy: + """Proxy for a remote actor accessed through the out-cluster connector. + + Supports the same method-call syntax as in-cluster ActorProxy:: + + counter = await conn.resolve("services/counter") + result = await counter.increment() + + async for token in await llm.generate(prompt="Hello"): + print(token, end="") + """ + + def __init__( + self, + actor_ref: ConnectActorRef, + method_names: list[str] | None = None, + async_methods: set[str] | None = None, + ): + self._ref = actor_ref + self._method_names = set(method_names) if method_names else None + self._async_methods = async_methods + + def __getattr__(self, name: str): + if name.startswith("_"): + raise AttributeError(f"Cannot access private attribute: {name}") + if self._method_names is not None and name not in self._method_names: + raise AttributeError(f"No method '{name}'") + is_async = self._async_methods is None or name in self._async_methods + return _ConnectMethodCaller(self._ref, name, is_async=is_async) + + @property + def ref(self) -> ConnectActorRef: + return self._ref + + @property + def path(self) -> str: + return self._ref.path + + @property + def gateway(self) -> str: + return self._ref.gateway + + +class _ConnectMethodCaller: + """Dispatches a method call to the remote actor via the gateway.""" + + def __init__( + self, actor_ref: ConnectActorRef, method_name: str, is_async: bool = False + ): + self._ref = actor_ref + self._method = method_name + self._is_async = is_async + + def __call__(self, *args, **kwargs): + if self._is_async: + return _ConnectAsyncMethodCall(self._ref, self._method, args, kwargs) + return self._sync_call(*args, **kwargs) + + def __await__(self): + return self().__await__() + + async def _sync_call(self, *args, **kwargs) -> Any: + call_msg = _wrap_call(self._method, args, kwargs, False) + resp = await self._ref.ask(call_msg) + return _connect_check_response(resp, self._ref) + + +class _ConnectAsyncMethodCall: + """Async method call supporting both await (final result) and async-for (streaming).""" + + def __init__( + self, actor_ref: ConnectActorRef, method_name: str, args: tuple, kwargs: dict + ): + self._ref = actor_ref + self._method = method_name + self._args = args + self._kwargs = kwargs + self._stream_reader = None + self._final_result = None + self._got_result = False + + async def _ensure_stream(self): + if self._stream_reader is not None or self._got_result: + return + + from pulsing._core import Message + + call_msg = _wrap_call(self._method, self._args, self._kwargs, True) + resp = await self._ref.ask(call_msg) + result = _connect_check_response(resp, self._ref) + + if isinstance(result, Message) and result.is_stream: + self._stream_reader = result.stream_reader() + else: + self._final_result = result + self._got_result = True + + def __aiter__(self): + return self + + async def __anext__(self): + from pulsing.exceptions import PulsingActorError + + await self._ensure_stream() + if self._got_result: + raise StopAsyncIteration + try: + item = await self._stream_reader.__anext__() + if isinstance(item, dict): + if "__error__" in item: + raise PulsingActorError( + item["__error__"], actor_name=self._ref.path + ) + if item.get("__final__"): + self._final_result = item.get("__result__") + self._got_result = True + raise StopAsyncIteration + if "__yield__" in item: + return item["__yield__"] + return item + except StopAsyncIteration: + raise + + def __await__(self): + return self._await_result().__await__() + + async def _await_result(self): + async for _ in self: + pass + if self._got_result: + return self._final_result + return None + + +def _connect_check_response(resp, ref) -> Any: + """Unwrap response, raise on errors.""" + from pulsing._core import Message + from pulsing.exceptions import PulsingActorError + + if isinstance(resp, Message): + if resp.is_stream: + return resp + try: + resp = resp.to_json() + except ValueError: + import pickle + + resp = pickle.loads(resp.payload) + if isinstance(resp, dict): + if "__error__" in resp: + raise PulsingActorError(resp["__error__"], actor_name=ref.path) + if "__result__" in resp: + return resp["__result__"] + if "error" in resp: + raise PulsingActorError(resp["error"], actor_name=ref.path) + if "result" in resp: + return resp["result"] + return resp + + +class Connect: + """High-level out-cluster connector. + + Usage:: + + # Single gateway + conn = await Connect.to("10.0.1.1:8080") + + # Multiple gateways (failover) + conn = await Connect.to(["10.0.1.1:8080", "10.0.1.2:8080"]) + + # Resolve and call actors + counter = await conn.resolve("services/counter") + result = await counter.increment() + + # Streaming + llm = await conn.resolve("services/llm") + async for token in await llm.generate(prompt="Hello"): + print(token, end="") + + await conn.close() + """ + + def __init__(self, inner: PulsingConnect): + self._inner = inner + + @classmethod + async def to(cls, addrs: "str | list[str]") -> "Connect": + """Connect to a Pulsing cluster gateway. + + Args: + addrs: Single gateway address or list of addresses for failover. + """ + if isinstance(addrs, str): + inner = await PulsingConnect.connect(addrs) + else: + inner = await PulsingConnect.connect_multi(addrs) + return cls(inner) + + async def resolve( + self, + name: str, + *, + cls: type | None = None, + ) -> ConnectProxy: + """Resolve a named actor and return a proxy. + + Args: + name: Actor name/path (e.g., "services/counter"). + cls: Optional class for typed proxy (validates method names). + """ + if "/" not in name: + name = f"actors/{name}" + + actor_ref = await self._inner.resolve(name) + + if cls is not None: + methods, async_methods = _extract_methods(cls) + return ConnectProxy(actor_ref, methods, async_methods) + return ConnectProxy(actor_ref) + + async def spawn( + self, + actor_cls, + *args, + name: str | None = None, + public: bool = True, + **kwargs, + ) -> ConnectProxy: + """Spawn a @remote actor on the cluster via the gateway. + + The actor class must already be registered (via @remote) on the + target cluster node. The actor is created on the gateway node + and can be accessed through the returned proxy. + + Args: + actor_cls: A @remote-decorated class (ActorClass) or plain class. + name: Optional actor name/path. Auto-generated if omitted. + public: Whether the actor is discoverable via resolve (default True). + *args, **kwargs: Constructor arguments for the actor. + + Returns: + ConnectProxy for the newly created actor. + """ + from pulsing._core import Message + from pulsing.core.remote import ActorClass + from pulsing.core.protocol import _normalize_actor_name + + if isinstance(actor_cls, ActorClass): + class_name = actor_cls._class_name + original_cls = actor_cls._cls + methods = list(actor_cls._methods) + async_methods = actor_cls._async_methods + else: + original_cls = actor_cls + class_name = f"{actor_cls.__module__}.{actor_cls.__name__}" + methods, async_methods = _extract_methods(actor_cls) + + actor_name = _normalize_actor_name(original_cls.__name__, name) + + service_ref = await self._inner.actor_ref("system/python_actor_service") + + msg = Message.from_json( + "CreateActor", + { + "class_name": class_name, + "actor_name": actor_name, + "args": list(args), + "kwargs": kwargs, + "public": public, + }, + ) + + resp = await service_ref.ask(msg) + + if isinstance(resp, Message): + if resp.msg_type == "Error": + data = resp.to_json() + raise RuntimeError(f"Remote spawn failed: {data.get('error')}") + elif isinstance(resp, dict) and "error" in resp: + raise RuntimeError(f"Remote spawn failed: {resp['error']}") + + actor_ref = await self._inner.actor_ref(actor_name) + return ConnectProxy(actor_ref, methods, async_methods) + + async def refresh_gateways(self): + """Refresh the gateway list from the cluster.""" + await self._inner.refresh_gateways() + + async def close(self): + """Close the connector.""" + self._inner.close() + + +def _extract_methods(cls: type) -> tuple[list[str], set[str]]: + """Extract public method names and async method set from a class.""" + methods = [] + async_methods = set() + for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): + if name.startswith("_"): + continue + methods.append(name) + if inspect.iscoroutinefunction(method) or inspect.isasyncgenfunction(method): + async_methods.add(name) + return methods, async_methods diff --git a/python/pulsing/subprocess/ray_spawn.py b/python/pulsing/subprocess/ray_spawn.py index 2e8436c24..77d626be0 100644 --- a/python/pulsing/subprocess/ray_spawn.py +++ b/python/pulsing/subprocess/ray_spawn.py @@ -89,6 +89,7 @@ def _replace_addr_with_routable_ip(addr: str, *, force_local: bool = False) -> s # For single-machine Ray testing, keep localhost try: import ray + if ray.is_initialized(): # Check if Ray cluster is local (single node) nodes = ray.nodes() diff --git a/tests/python/test_connect.py b/tests/python/test_connect.py new file mode 100644 index 000000000..b456dc9c3 --- /dev/null +++ b/tests/python/test_connect.py @@ -0,0 +1,321 @@ +""" +Tests for out-cluster communication via pulsing.connect. + +Verifies that an external connector (not a cluster member) can: +1. Resolve named actors through a gateway node +2. Call sync methods (ask pattern) +3. Call async methods (ask pattern) +4. Stream results from async generators +5. Handle errors propagated from the actor +6. Make concurrent calls through the gateway +7. Spawn actors remotely on the cluster +""" + +import asyncio + +import pytest +import pulsing as pul +from pulsing.core import remote + + +# ============================================================================ +# Test Actors — deployed inside the cluster +# ============================================================================ + + +@remote +class ConnCalc: + """Stateful calculator with sync methods.""" + + def __init__(self, init_value=0): + self.value = init_value + + def add(self, n): + self.value += n + return self.value + + def get(self): + return self.value + + def multiply(self, a, b): + return a * b + + def will_fail(self): + raise ValueError("intentional error") + + +@remote +class ConnAsyncService: + """Service with async methods.""" + + async def slow_add(self, a, b): + await asyncio.sleep(0.01) + return a + b + + async def greet(self, name): + await asyncio.sleep(0.01) + return f"hello {name}" + + +@remote +class ConnStreamService: + """Service with async generators for streaming.""" + + async def count_up(self, n): + for i in range(n): + yield i + + async def echo_stream(self, items): + for item in items: + await asyncio.sleep(0.001) + yield item + + async def partial_fail(self, n): + for i in range(n): + if i == 3: + raise RuntimeError("stream error at 3") + yield i + + +# ============================================================================ +# Fixture: cluster node + out-cluster connector +# ============================================================================ + + +@pytest.fixture +async def cluster_and_connect(): + """Start an ActorSystem (gateway) with PythonActorService and create a connector.""" + from pulsing._core import ActorSystem, SystemConfig + from pulsing.core.service import PythonActorService, PYTHON_ACTOR_SERVICE_NAME + + loop = asyncio.get_running_loop() + + config = SystemConfig.with_addr("127.0.0.1:0") + system = await ActorSystem.create(config, loop) + gateway_addr = system.addr + + # Start PythonActorService so that remote spawn works + service = PythonActorService(system) + await system.spawn(service, name=PYTHON_ACTOR_SERVICE_NAME, public=True) + + await ConnCalc.spawn(system=system, name="services/calc", public=True) + await ConnAsyncService.spawn(system=system, name="services/async_svc", public=True) + await ConnStreamService.spawn( + system=system, name="services/stream_svc", public=True + ) + + await asyncio.sleep(0.1) + + from pulsing.connect import Connect + + conn = await Connect.to(gateway_addr) + + yield system, conn + + await conn.close() + await system.shutdown() + + +# ============================================================================ +# Sync Method Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_connect_sync_multiply(cluster_and_connect): + """Calling a sync method that returns immediately.""" + _, conn = cluster_and_connect + calc = await conn.resolve("services/calc") + result = await calc.multiply(6, 7) + assert result == 42 + + +@pytest.mark.asyncio +async def test_connect_sync_stateful(cluster_and_connect): + """Stateful sync method calls preserve actor state across requests.""" + _, conn = cluster_and_connect + calc = await conn.resolve("services/calc") + + assert await calc.add(10) == 10 + assert await calc.add(5) == 15 + assert await calc.get() == 15 + + +# ============================================================================ +# Async Method Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_connect_async_add(cluster_and_connect): + """Calling an async method through the gateway.""" + _, conn = cluster_and_connect + svc = await conn.resolve("services/async_svc") + result = await svc.slow_add(3, 4) + assert result == 7 + + +@pytest.mark.asyncio +async def test_connect_async_string_result(cluster_and_connect): + """Async method returning a string.""" + _, conn = cluster_and_connect + svc = await conn.resolve("services/async_svc") + result = await svc.greet("world") + assert result == "hello world" + + +# ============================================================================ +# Streaming Tests (async generator via async for) +# ============================================================================ + + +@pytest.mark.asyncio +async def test_connect_stream_count(cluster_and_connect): + """Streaming integers from an async generator.""" + _, conn = cluster_and_connect + svc = await conn.resolve("services/stream_svc") + + items = [] + async for item in svc.count_up(5): + items.append(item) + + assert items == [0, 1, 2, 3, 4] + + +@pytest.mark.asyncio +async def test_connect_stream_echo(cluster_and_connect): + """Streaming mixed-type items.""" + _, conn = cluster_and_connect + svc = await conn.resolve("services/stream_svc") + + items = [] + async for item in svc.echo_stream(["a", "b", "c"]): + items.append(item) + + assert items == ["a", "b", "c"] + + +@pytest.mark.asyncio +async def test_connect_stream_error_mid_stream(cluster_and_connect): + """Error mid-stream is propagated to the caller.""" + _, conn = cluster_and_connect + from pulsing.exceptions import PulsingActorError + + svc = await conn.resolve("services/stream_svc") + items = [] + with pytest.raises(PulsingActorError, match="stream error at 3"): + async for item in svc.partial_fail(5): + items.append(item) + + assert items == [0, 1, 2] + + +# ============================================================================ +# Error Handling Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_connect_method_error(cluster_and_connect): + """Actor-side ValueError propagates to the connector caller.""" + _, conn = cluster_and_connect + from pulsing.exceptions import PulsingActorError + + calc = await conn.resolve("services/calc") + with pytest.raises(PulsingActorError, match="intentional error"): + await calc.will_fail() + + +@pytest.mark.asyncio +async def test_connect_resolve_nonexistent(cluster_and_connect): + """Resolving a non-existent actor raises an error.""" + _, conn = cluster_and_connect + + with pytest.raises(Exception): + await conn.resolve("services/does_not_exist") + + +# ============================================================================ +# Concurrent Access Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_connect_concurrent_calls(cluster_and_connect): + """Multiple concurrent calls through the gateway all succeed.""" + _, conn = cluster_and_connect + svc = await conn.resolve("services/async_svc") + + coros = [svc.slow_add(i, i) for i in range(10)] + results = await asyncio.gather(*coros) + + expected = [i + i for i in range(10)] + assert sorted(results) == expected + + +# ============================================================================ +# Remote Spawn Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_connect_spawn_and_call(cluster_and_connect): + """Spawn an actor remotely and call its methods.""" + _, conn = cluster_and_connect + + calc = await conn.spawn(ConnCalc, name="services/spawned_calc") + + assert await calc.multiply(3, 4) == 12 + assert await calc.add(10) == 10 + assert await calc.get() == 10 + + +@pytest.mark.asyncio +async def test_connect_spawn_with_args(cluster_and_connect): + """Spawn with constructor arguments.""" + _, conn = cluster_and_connect + + calc = await conn.spawn(ConnCalc, init_value=100, name="services/calc_with_args") + + assert await calc.get() == 100 + assert await calc.add(50) == 150 + + +@pytest.mark.asyncio +async def test_connect_spawn_async_actor(cluster_and_connect): + """Spawn an actor with async methods and call them.""" + _, conn = cluster_and_connect + + svc = await conn.spawn(ConnAsyncService, name="services/spawned_async") + + result = await svc.slow_add(10, 20) + assert result == 30 + + +@pytest.mark.asyncio +async def test_connect_spawn_stream_actor(cluster_and_connect): + """Spawn an actor with streaming methods and stream from it.""" + _, conn = cluster_and_connect + + svc = await conn.spawn(ConnStreamService, name="services/spawned_stream") + + items = [] + async for item in svc.count_up(4): + items.append(item) + + assert items == [0, 1, 2, 3] + + +@pytest.mark.asyncio +async def test_connect_spawn_then_resolve(cluster_and_connect): + """Spawn an actor, then resolve it by name from the same connector.""" + _, conn = cluster_and_connect + + await conn.spawn(ConnCalc, init_value=42, name="services/resolvable") + + resolved = await conn.resolve("services/resolvable") + assert await resolved.get() == 42 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])