From b9aeac4c2fd9841b24b8787a2cb8a6aa0f0801a8 Mon Sep 17 00:00:00 2001 From: Raphael Date: Tue, 17 Mar 2026 09:38:22 +0100 Subject: [PATCH 1/4] feat: added runtime status client --- crates/taurus/src/client/mod.rs | 1 + crates/taurus/src/client/runtime_status.rs | 77 ++++++++++++++++++++++ crates/taurus/src/main.rs | 1 + 3 files changed, 79 insertions(+) create mode 100644 crates/taurus/src/client/mod.rs create mode 100644 crates/taurus/src/client/runtime_status.rs diff --git a/crates/taurus/src/client/mod.rs b/crates/taurus/src/client/mod.rs new file mode 100644 index 0000000..51d602a --- /dev/null +++ b/crates/taurus/src/client/mod.rs @@ -0,0 +1 @@ +pub mod runtime_status; diff --git a/crates/taurus/src/client/runtime_status.rs b/crates/taurus/src/client/runtime_status.rs new file mode 100644 index 0000000..81c84f7 --- /dev/null +++ b/crates/taurus/src/client/runtime_status.rs @@ -0,0 +1,77 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use tonic::transport::Channel; +use tucana::{ + aquila::{ + RuntimeStatusUpdateRequest, runtime_status_service_client::RuntimeStatusServiceClient, + runtime_status_update_request::Status, + }, + shared::{ExecutionRuntimeStatus, RuntimeFeature}, +}; + +pub struct TaurusRuntimeStatusService { + channel: Channel, + identifier: String, + features: Vec, +} + +impl TaurusRuntimeStatusService { + pub async fn from_url( + aquila_url: String, + identifier: String, + features: Vec, + ) -> Self { + let channel = create_channel_with_retry("Aquila", aquila_url).await; + Self::new(channel, identifier, features) + } + + pub fn new( + channel: Channel, + identifier: String, + features: Vec, + ) -> Self { + TaurusRuntimeStatusService { + channel, + identifier, + features, + } + } + + pub async fn update_runtime_status( + &self, + status: tucana::shared::execution_runtime_status::Status, + ) { + log::info!("Updating the current Runtime Status!"); + let mut client = RuntimeStatusServiceClient::new(self.channel.clone()); + + let now = SystemTime::now(); + let timestamp = match now.duration_since(UNIX_EPOCH) { + Ok(time) => time.as_secs(), + Err(err) => { + log::error!("cannot get current system time: {:?}", err); + 0 + } + }; + + let request = RuntimeStatusUpdateRequest { + status: Some(Status::ExecutionRuntimeStatus(ExecutionRuntimeStatus { + status: status.into(), + timestamp: timestamp as i64, + identifier: self.identifier.clone(), + features: self.features.clone(), + })), + }; + + match client.update(request).await { + Ok(response) => { + log::info!( + "Was the update of the RuntimeStatus accepted by Sagittarius? {}", + response.into_inner().success + ); + } + Err(err) => { + log::error!("Failed to update RuntimeStatus: {:?}", err); + } + } + } +} diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index 5da9b9e..12504b4 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -1,4 +1,5 @@ mod config; +mod client; use crate::config::Config; use code0_flow::flow_service::FlowUpdateService; From 1b579e572241a0aaf1f6db1d8b915583bc33e5d9 Mon Sep 17 00:00:00 2001 From: Raphael Date: Tue, 17 Mar 2026 09:55:47 +0100 Subject: [PATCH 2/4] feat: taruus will send status updates to aquila only if hes in dynamic mode --- crates/taurus/src/client/runtime_status.rs | 7 ++--- crates/taurus/src/main.rs | 32 ++++++++++++++++++++-- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/crates/taurus/src/client/runtime_status.rs b/crates/taurus/src/client/runtime_status.rs index 81c84f7..b04f126 100644 --- a/crates/taurus/src/client/runtime_status.rs +++ b/crates/taurus/src/client/runtime_status.rs @@ -1,5 +1,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; +use code0_flow::flow_service::retry::create_channel_with_retry; use tonic::transport::Channel; use tucana::{ aquila::{ @@ -25,11 +26,7 @@ impl TaurusRuntimeStatusService { Self::new(channel, identifier, features) } - pub fn new( - channel: Channel, - identifier: String, - features: Vec, - ) -> Self { + pub fn new(channel: Channel, identifier: String, features: Vec) -> Self { TaurusRuntimeStatusService { channel, identifier, diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index 12504b4..c215738 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -1,6 +1,7 @@ -mod config; mod client; +mod config; +use crate::client::runtime_status::TaurusRuntimeStatusService; use crate::config::Config; use code0_flow::flow_service::FlowUpdateService; @@ -17,7 +18,7 @@ use taurus_core::context::signal::Signal; use tokio::signal; use tonic_health::pb::health_server::HealthServer; use tucana::shared::value::Kind; -use tucana::shared::{ExecutionFlow, NodeFunction, Value}; +use tucana::shared::{ExecutionFlow, NodeFunction, RuntimeFeature, Translation, Value}; fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal { let mut context = Context::default(); @@ -41,6 +42,7 @@ async fn main() { let config = Config::new(); let store = FunctionStore::default(); + let mut runtime_status_service: Option = None; let client = match async_nats::connect(config.nats_url.clone()).await { Ok(client) => { @@ -89,6 +91,27 @@ async fn main() { .await .send() .await; + + let client = TaurusRuntimeStatusService::from_url( + config.aquila_url.clone(), + "taurus".into(), + vec![RuntimeFeature { + name: vec![Translation { + code: "en-US".to_string(), + content: "Runtime".to_string(), + }], + description: vec![Translation { + code: "en-US".to_string(), + content: "Will execute incoming flows.".to_string(), + }], + }], + ) + .await; + + client + .update_runtime_status(tucana::shared::execution_runtime_status::Status::Running) + .await; + runtime_status_service = Some(client); } let mut worker_task = tokio::spawn(async move { @@ -213,5 +236,10 @@ async fn main() { } } + if let Some(ser) = &runtime_status_service { + ser.update_runtime_status(tucana::shared::execution_runtime_status::Status::Stopped) + .await; + }; + log::info!("Taurus shutdown complete"); } From 5fe5608aac64da1fba73db75c40f0d62a2f3fe60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Tue, 17 Mar 2026 10:03:33 +0100 Subject: [PATCH 3/4] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- crates/taurus/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index c215738..24fb7d4 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -92,7 +92,7 @@ async fn main() { .send() .await; - let client = TaurusRuntimeStatusService::from_url( + let status_service = TaurusRuntimeStatusService::from_url( config.aquila_url.clone(), "taurus".into(), vec![RuntimeFeature { @@ -108,10 +108,10 @@ async fn main() { ) .await; - client + status_service .update_runtime_status(tucana::shared::execution_runtime_status::Status::Running) .await; - runtime_status_service = Some(client); + runtime_status_service = Some(status_service); } let mut worker_task = tokio::spawn(async move { From 39498d1e923b4f25dc5e4fd0b07bfe3a4bc77319 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Tue, 17 Mar 2026 10:04:04 +0100 Subject: [PATCH 4/4] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- crates/taurus/src/main.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index 24fb7d4..1058a49 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -236,8 +236,9 @@ async fn main() { } } - if let Some(ser) = &runtime_status_service { - ser.update_runtime_status(tucana::shared::execution_runtime_status::Status::Stopped) + if let Some(status_service) = &runtime_status_service { + status_service + .update_runtime_status(tucana::shared::execution_runtime_status::Status::Stopped) .await; };