Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions crates/pulsing-actor/src/connect/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
//! Out-cluster connector for accessing actors without joining the cluster.
//!
//! `PulsingConnect` connects to a cluster gateway node via HTTP/2 and
//! transparently routes requests to any actor in the cluster. It does not
//! participate in gossip, does not register as a node, and does not run
//! fault detection — keeping the connector extremely lightweight.
//!
//! # Example
//!
//! ```rust,ignore
//! let conn = PulsingConnect::connect("10.0.1.1:8080".parse()?).await?;
//! let counter = conn.resolve("services/counter").await?;
//! let result = counter.ask("increment", vec![]).await?;
//! conn.close();
//! ```

mod reference;

pub use reference::ConnectActorRef;

use crate::error::{PulsingError, Result, RuntimeError};
use crate::transport::{Http2Client, Http2Config};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::RwLock;

/// Resolve response from the gateway.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResolveResponse {
pub found: bool,
pub path: String,
pub gateway: String,
pub instance_count: usize,
}

/// Gateway member list response.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MembersResponse {
pub gateways: Vec<String>,
}

/// Out-cluster connector — connects to a gateway node without joining the cluster.
pub struct PulsingConnect {
active_gateway: RwLock<SocketAddr>,
gateways: RwLock<Vec<SocketAddr>>,
http_client: Arc<Http2Client>,
}

impl PulsingConnect {
/// Connect to a single gateway address.
pub async fn connect(gateway_addr: SocketAddr) -> Result<Arc<Self>> {
Self::connect_multi(vec![gateway_addr]).await
}

/// Connect with multiple gateway addresses for failover.
pub async fn connect_multi(gateway_addrs: Vec<SocketAddr>) -> Result<Arc<Self>> {
if gateway_addrs.is_empty() {
return Err(PulsingError::from(RuntimeError::Other(
"At least one gateway address is required".into(),
)));
}

let http_client = Arc::new(Http2Client::new(Http2Config::default()));
http_client.start_background_tasks();

let active = gateway_addrs[0];
let conn = Arc::new(Self {
active_gateway: RwLock::new(active),
gateways: RwLock::new(gateway_addrs),
http_client,
});

// Try to refresh gateway list from the cluster
let _ = conn.refresh_gateways().await;

Ok(conn)
}

/// Resolve a named actor via the gateway.
pub async fn resolve(&self, path: &str) -> Result<ConnectActorRef> {
let gateway = *self.active_gateway.read().await;
let body = serde_json::to_vec(&serde_json::json!({"path": path}))
.map_err(|e| PulsingError::from(RuntimeError::Other(e.to_string())))?;

let resp_bytes = self.ask_gateway(gateway, "/client/resolve", &body).await?;

let resp: ResolveResponse = serde_json::from_slice(&resp_bytes)
.map_err(|e| PulsingError::from(RuntimeError::Other(e.to_string())))?;

if !resp.found {
return Err(PulsingError::from(RuntimeError::named_actor_not_found(
path,
)));
}

Ok(ConnectActorRef::new(
gateway,
path.to_string(),
self.http_client.clone(),
))
}

/// Refresh the gateway list from the current active gateway.
pub async fn refresh_gateways(&self) -> Result<()> {
let gateway = *self.active_gateway.read().await;
let resp_bytes = self.ask_gateway(gateway, "/client/members", &[]).await?;

let resp: MembersResponse = serde_json::from_slice(&resp_bytes)
.map_err(|e| PulsingError::from(RuntimeError::Other(e.to_string())))?;

let mut new_gateways = Vec::new();
for addr_str in &resp.gateways {
if let Ok(addr) = addr_str.parse::<SocketAddr>() {
new_gateways.push(addr);
}
}

if !new_gateways.is_empty() {
*self.gateways.write().await = new_gateways;
}

Ok(())
}

/// Switch to the next available gateway (for failover).
pub async fn failover(&self) -> Result<()> {
let current = *self.active_gateway.read().await;
let gateways = self.gateways.read().await;

let next = gateways
.iter()
.find(|&&addr| addr != current)
.ok_or_else(|| {
PulsingError::from(RuntimeError::Other(
"No alternative gateway available".into(),
))
})?;

*self.active_gateway.write().await = *next;
Ok(())
}

/// Get the current active gateway address.
pub async fn active_gateway(&self) -> SocketAddr {
*self.active_gateway.read().await
}

/// Get all known gateway addresses.
pub async fn gateway_list(&self) -> Vec<SocketAddr> {
self.gateways.read().await.clone()
}

/// Shutdown the connector.
pub fn close(&self) {
self.http_client.shutdown();
}

/// Internal: send a GET/POST to a gateway management endpoint.
async fn ask_gateway(&self, gateway: SocketAddr, path: &str, body: &[u8]) -> Result<Vec<u8>> {
self.http_client
.ask(gateway, path, "client", body.to_vec())
.await
}

/// Get a reference to the underlying HTTP/2 client.
pub fn http_client(&self) -> &Arc<Http2Client> {
&self.http_client
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_resolve_response_deserialize() {
let json = r#"{"found":true,"path":"services/counter","gateway":"10.0.1.1:8080","instance_count":2}"#;
let resp: ResolveResponse = serde_json::from_str(json).unwrap();
assert!(resp.found);
assert_eq!(resp.path, "services/counter");
assert_eq!(resp.instance_count, 2);
}

#[test]
fn test_members_response_deserialize() {
let json = r#"{"gateways":["10.0.1.1:8080","10.0.1.2:8080"]}"#;
let resp: MembersResponse = serde_json::from_str(json).unwrap();
assert_eq!(resp.gateways.len(), 2);
}

#[tokio::test]
async fn test_connect_empty_addrs() {
let result = PulsingConnect::connect_multi(vec![]).await;
assert!(result.is_err());
}
}
103 changes: 103 additions & 0 deletions crates/pulsing-actor/src/connect/reference.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//! Actor reference for out-cluster connectors.
//!
//! `ConnectActorRef` routes all requests through a gateway node using the
//! `/named/{path}` endpoint. The gateway handles internal cluster routing
//! transparently.

use crate::actor::Message;
use crate::error::Result;
use crate::transport::{Http2Client, StreamFrame, StreamHandle};
use std::net::SocketAddr;
use std::sync::Arc;

/// Actor reference used by out-cluster connectors.
///
/// All requests are routed through the gateway node. The connector never
/// communicates directly with the target actor's node.
#[derive(Clone)]
pub struct ConnectActorRef {
gateway: SocketAddr,
path: String,
http_client: Arc<Http2Client>,
}

impl ConnectActorRef {
pub fn new(gateway: SocketAddr, path: String, http_client: Arc<Http2Client>) -> Self {
Self {
gateway,
path,
http_client,
}
}

/// Send a request and wait for a response.
pub async fn ask(&self, msg_type: &str, payload: Vec<u8>) -> Result<Vec<u8>> {
let url_path = format!("/named/{}", self.path);
self.http_client
.ask(self.gateway, &url_path, msg_type, payload)
.await
}

/// Fire-and-forget message.
pub async fn tell(&self, msg_type: &str, payload: Vec<u8>) -> Result<()> {
let url_path = format!("/named/{}", self.path);
self.http_client
.tell(self.gateway, &url_path, msg_type, payload)
.await
}

/// Streaming request — returns a stream of frames.
pub async fn ask_stream(
&self,
msg_type: &str,
payload: Vec<u8>,
) -> Result<StreamHandle<StreamFrame>> {
let url_path = format!("/named/{}", self.path);
self.http_client
.ask_stream(self.gateway, &url_path, msg_type, payload)
.await
}

/// Send a full `Message` (single or stream) and receive a `Message` response.
pub async fn send_message(&self, msg: Message) -> Result<Message> {
let url_path = format!("/named/{}", self.path);
self.http_client
.send_message_full(self.gateway, &url_path, msg)
.await
}

/// Get the gateway address this reference routes through.
pub fn gateway(&self) -> SocketAddr {
self.gateway
}

/// Get the actor path.
pub fn path(&self) -> &str {
&self.path
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::transport::Http2Config;

#[test]
fn test_connect_actor_ref_accessors() {
let client = Arc::new(Http2Client::new(Http2Config::default()));
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let r = ConnectActorRef::new(addr, "services/counter".into(), client);
assert_eq!(r.gateway(), addr);
assert_eq!(r.path(), "services/counter");
}

#[test]
fn test_connect_actor_ref_clone() {
let client = Arc::new(Http2Client::new(Http2Config::default()));
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let r = ConnectActorRef::new(addr, "services/counter".into(), client);
let r2 = r.clone();
assert_eq!(r.gateway(), r2.gateway());
assert_eq!(r.path(), r2.path());
}
}
1 change: 1 addition & 0 deletions crates/pulsing-actor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub mod actor;
pub mod behavior;
pub mod circuit_breaker;
pub mod cluster;
pub mod connect;
pub mod error;
pub mod metrics;
pub mod policies;
Expand Down
58 changes: 58 additions & 0 deletions crates/pulsing-actor/src/system/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,64 @@ impl Http2ServerHandler for SystemMessageHandler {
serde_json::json!(actors)
}

async fn handle_client_resolve(&self, path: &str) -> serde_json::Value {
let cluster_guard = self.cluster.read().await;
if let Some(cluster) = cluster_guard.as_ref() {
let actor_path = match ActorPath::new(path) {
Ok(p) => p,
Err(_) => {
return serde_json::json!({
"found": false,
"path": path,
"gateway": "",
"instance_count": 0,
});
}
};

let info = cluster.lookup_named_actor(&actor_path).await;
match info {
Some(info) => {
let local_addr = cluster.local_addr();
serde_json::json!({
"found": true,
"path": path,
"gateway": local_addr.to_string(),
"instance_count": info.instance_count(),
})
}
None => {
serde_json::json!({
"found": false,
"path": path,
"gateway": "",
"instance_count": 0,
})
}
}
} else {
// Standalone mode: check local registry
let found = self.registry.get_actor_name_by_path(path).is_some();
serde_json::json!({
"found": found,
"path": path,
"gateway": "",
"instance_count": if found { 1 } else { 0 },
})
}
}

async fn handle_client_members(&self) -> serde_json::Value {
let cluster_guard = self.cluster.read().await;
if let Some(cluster) = cluster_guard.as_ref() {
let members = cluster.alive_members().await;
let gateways: Vec<String> = members.iter().map(|m| m.addr.to_string()).collect();
serde_json::json!({ "gateways": gateways })
} else {
serde_json::json!({ "gateways": [] })
}
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
Expand Down
Loading
Loading