From b4d953db9c7fa15f998c6bbde8bb6d71b246cffa Mon Sep 17 00:00:00 2001 From: Raphael Date: Tue, 17 Mar 2026 10:22:57 +0100 Subject: [PATCH 1/4] feat: added runtime usage client --- crates/taurus/src/client/mod.rs | 1 + crates/taurus/src/client/runtime_usage.rs | 39 +++++++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 crates/taurus/src/client/runtime_usage.rs diff --git a/crates/taurus/src/client/mod.rs b/crates/taurus/src/client/mod.rs index 51d602a..940ca04 100644 --- a/crates/taurus/src/client/mod.rs +++ b/crates/taurus/src/client/mod.rs @@ -1 +1,2 @@ pub mod runtime_status; +pub mod runtime_usage; diff --git a/crates/taurus/src/client/runtime_usage.rs b/crates/taurus/src/client/runtime_usage.rs new file mode 100644 index 0000000..a638c66 --- /dev/null +++ b/crates/taurus/src/client/runtime_usage.rs @@ -0,0 +1,39 @@ +use code0_flow::flow_service::retry::create_channel_with_retry; +use tonic::transport::Channel; +use tucana::{ + aquila::{RuntimeUsageRequest, runtime_usage_service_client::RuntimeUsageServiceClient}, + shared::RuntimeUsage, +}; + +pub struct TaurusRuntimeUsageService { + channel: Channel, +} + +impl TaurusRuntimeUsageService { + pub async fn from_url(aquila_url: String) -> Self { + let channel = create_channel_with_retry("Aquila", aquila_url).await; + TaurusRuntimeUsageService { channel } + } + + pub async fn update_runtime_usage(&self, runtime_usage: RuntimeUsage) { + log::info!("Updating the current Runtime Status!"); + let mut client = RuntimeUsageServiceClient::new(self.channel.clone()); + + let request = RuntimeUsageRequest { + runtime_usage: vec![runtime_usage], + }; + + match client.update(request).await { + Ok(response) => { + log::info!( + "Was the update of the RuntimeStatus accepted by Sagittarius? {}", + response.into_inner().success + ); + } + Err(err) => { + log::error!("Failed to update RuntimeStatus: {:?}", err); + } + } + } +} + From 8ab7639d1ea77ab248a9abc79adb6fc05a42e826 Mon Sep 17 00:00:00 2001 From: Raphael Date: Tue, 17 Mar 2026 10:23:12 +0100 Subject: [PATCH 2/4] feat: execution will now calculate total time and send it to Aquila --- crates/taurus/src/main.rs | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index 1058a49..7f9b70d 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -2,6 +2,7 @@ mod client; mod config; use crate::client::runtime_status::TaurusRuntimeStatusService; +use crate::client::runtime_usage::TaurusRuntimeUsageService; use crate::config::Config; use code0_flow::flow_service::FlowUpdateService; @@ -11,6 +12,7 @@ use futures_lite::StreamExt; use log::error; use prost::Message; use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; use taurus_core::context::context::Context; use taurus_core::context::executor::Executor; use taurus_core::context::registry::FunctionStore; @@ -18,9 +20,12 @@ use taurus_core::context::signal::Signal; use tokio::signal; use tonic_health::pb::health_server::HealthServer; use tucana::shared::value::Kind; -use tucana::shared::{ExecutionFlow, NodeFunction, RuntimeFeature, Translation, Value}; +use tucana::shared::{ + ExecutionFlow, NodeFunction, RuntimeFeature, RuntimeUsage, Translation, Value, +}; -fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal { +fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> (Signal, RuntimeUsage) { + let now = SystemTime::now(); let mut context = Context::default(); let node_functions: HashMap = flow @@ -29,7 +34,23 @@ fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal { .map(|node| (node.database_id, node)) .collect(); - Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true) + let signal = + Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true); + let timestamp = match now.duration_since(UNIX_EPOCH) { + Ok(time) => time.as_millis(), + Err(err) => { + log::error!("cannot get current system time: {:?}", err); + 0 + } + }; + + ( + signal, + RuntimeUsage { + flow_id: flow.flow_id, + duration: timestamp as i64, + }, + ) } #[tokio::main] @@ -43,6 +64,7 @@ async fn main() { let config = Config::new(); let store = FunctionStore::default(); let mut runtime_status_service: Option = None; + let mut runtime_usage_service: Option = None; let client = match async_nats::connect(config.nats_url.clone()).await { Ok(client) => { @@ -92,6 +114,9 @@ async fn main() { .send() .await; + let usage_service = TaurusRuntimeUsageService::from_url(config.aquila_url.clone()).await; + runtime_usage_service = Some(usage_service); + let status_service = TaurusRuntimeStatusService::from_url( config.aquila_url.clone(), "taurus".into(), @@ -143,7 +168,8 @@ async fn main() { }; let flow_id = flow.flow_id; - let value = match handle_message(flow, &store) { + let result = handle_message(flow, &store); + let value = match result.0 { Signal::Failure(error) => { log::error!( "RuntimeError occurred, execution failed because: {:?}", @@ -173,6 +199,10 @@ async fn main() { log::info!("For the flow_id {} returing the value {:?}", flow_id, value); + if let Some(usage_service) = &runtime_usage_service { + usage_service.update_runtime_usage(result.1).await; + } + // Send a response to the reply subject if let Some(reply) = msg.reply { match client.publish(reply, value.encode_to_vec().into()).await { From 75fc231e50157ab9bded662411f75f181bd22b24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Tue, 17 Mar 2026 10:29:14 +0100 Subject: [PATCH 3/4] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- crates/taurus/src/main.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index 7f9b70d..6897f83 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -12,7 +12,7 @@ use futures_lite::StreamExt; use log::error; use prost::Message; use std::collections::HashMap; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use taurus_core::context::context::Context; use taurus_core::context::executor::Executor; use taurus_core::context::registry::FunctionStore; @@ -25,7 +25,7 @@ use tucana::shared::{ }; fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> (Signal, RuntimeUsage) { - let now = SystemTime::now(); + let start = Instant::now(); let mut context = Context::default(); let node_functions: HashMap = flow @@ -36,19 +36,13 @@ fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> (Signal, Runtim let signal = Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true); - let timestamp = match now.duration_since(UNIX_EPOCH) { - Ok(time) => time.as_millis(), - Err(err) => { - log::error!("cannot get current system time: {:?}", err); - 0 - } - }; + let duration_millis = start.elapsed().as_millis() as i64; ( signal, RuntimeUsage { flow_id: flow.flow_id, - duration: timestamp as i64, + duration: duration_millis, }, ) } From b2731fe04796c8f0c5e760188f4856ba8c47be9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Tue, 17 Mar 2026 10:29:34 +0100 Subject: [PATCH 4/4] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- crates/taurus/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index 6897f83..2d26e87 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -193,10 +193,6 @@ async fn main() { log::info!("For the flow_id {} returing the value {:?}", flow_id, value); - if let Some(usage_service) = &runtime_usage_service { - usage_service.update_runtime_usage(result.1).await; - } - // Send a response to the reply subject if let Some(reply) = msg.reply { match client.publish(reply, value.encode_to_vec().into()).await { @@ -204,6 +200,10 @@ async fn main() { Err(err) => log::error!("Failed to send response: {:?}", err), } } + + if let Some(usage_service) = &runtime_usage_service { + usage_service.update_runtime_usage(result.1).await; + } } log::info!("NATS worker loop ended");