diff --git a/crates/taurus/src/client/mod.rs b/crates/taurus/src/client/mod.rs new file mode 100644 index 0000000..51d602a --- /dev/null +++ b/crates/taurus/src/client/mod.rs @@ -0,0 +1 @@ +pub mod runtime_status; diff --git a/crates/taurus/src/client/runtime_status.rs b/crates/taurus/src/client/runtime_status.rs new file mode 100644 index 0000000..b04f126 --- /dev/null +++ b/crates/taurus/src/client/runtime_status.rs @@ -0,0 +1,74 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use code0_flow::flow_service::retry::create_channel_with_retry; +use tonic::transport::Channel; +use tucana::{ + aquila::{ + RuntimeStatusUpdateRequest, runtime_status_service_client::RuntimeStatusServiceClient, + runtime_status_update_request::Status, + }, + shared::{ExecutionRuntimeStatus, RuntimeFeature}, +}; + +pub struct TaurusRuntimeStatusService { + channel: Channel, + identifier: String, + features: Vec, +} + +impl TaurusRuntimeStatusService { + pub async fn from_url( + aquila_url: String, + identifier: String, + features: Vec, + ) -> Self { + let channel = create_channel_with_retry("Aquila", aquila_url).await; + Self::new(channel, identifier, features) + } + + pub fn new(channel: Channel, identifier: String, features: Vec) -> Self { + TaurusRuntimeStatusService { + channel, + identifier, + features, + } + } + + pub async fn update_runtime_status( + &self, + status: tucana::shared::execution_runtime_status::Status, + ) { + log::info!("Updating the current Runtime Status!"); + let mut client = RuntimeStatusServiceClient::new(self.channel.clone()); + + let now = SystemTime::now(); + let timestamp = match now.duration_since(UNIX_EPOCH) { + Ok(time) => time.as_secs(), + Err(err) => { + log::error!("cannot get current system time: {:?}", err); + 0 + } + }; + + let request = RuntimeStatusUpdateRequest { + status: Some(Status::ExecutionRuntimeStatus(ExecutionRuntimeStatus { + status: status.into(), + timestamp: timestamp as i64, + identifier: self.identifier.clone(), + features: self.features.clone(), + })), + }; + + match client.update(request).await { + Ok(response) => { + log::info!( + "Was the update of the RuntimeStatus accepted by Sagittarius? {}", + response.into_inner().success + ); + } + Err(err) => { + log::error!("Failed to update RuntimeStatus: {:?}", err); + } + } + } +} diff --git a/crates/taurus/src/main.rs b/crates/taurus/src/main.rs index 5da9b9e..1058a49 100644 --- a/crates/taurus/src/main.rs +++ b/crates/taurus/src/main.rs @@ -1,5 +1,7 @@ +mod client; mod config; +use crate::client::runtime_status::TaurusRuntimeStatusService; use crate::config::Config; use code0_flow::flow_service::FlowUpdateService; @@ -16,7 +18,7 @@ use taurus_core::context::signal::Signal; use tokio::signal; use tonic_health::pb::health_server::HealthServer; use tucana::shared::value::Kind; -use tucana::shared::{ExecutionFlow, NodeFunction, Value}; +use tucana::shared::{ExecutionFlow, NodeFunction, RuntimeFeature, Translation, Value}; fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal { let mut context = Context::default(); @@ -40,6 +42,7 @@ async fn main() { let config = Config::new(); let store = FunctionStore::default(); + let mut runtime_status_service: Option = None; let client = match async_nats::connect(config.nats_url.clone()).await { Ok(client) => { @@ -88,6 +91,27 @@ async fn main() { .await .send() .await; + + let status_service = TaurusRuntimeStatusService::from_url( + config.aquila_url.clone(), + "taurus".into(), + vec![RuntimeFeature { + name: vec![Translation { + code: "en-US".to_string(), + content: "Runtime".to_string(), + }], + description: vec![Translation { + code: "en-US".to_string(), + content: "Will execute incoming flows.".to_string(), + }], + }], + ) + .await; + + status_service + .update_runtime_status(tucana::shared::execution_runtime_status::Status::Running) + .await; + runtime_status_service = Some(status_service); } let mut worker_task = tokio::spawn(async move { @@ -212,5 +236,11 @@ async fn main() { } } + if let Some(status_service) = &runtime_status_service { + status_service + .update_runtime_status(tucana::shared::execution_runtime_status::Status::Stopped) + .await; + }; + log::info!("Taurus shutdown complete"); }