Skip to content

Commit 8ab7639

Browse files
committed
feat: execution will now calculate total time and send it to Aquila
1 parent b4d953d commit 8ab7639

1 file changed

Lines changed: 34 additions & 4 deletions

File tree

crates/taurus/src/main.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod client;
22
mod config;
33

44
use crate::client::runtime_status::TaurusRuntimeStatusService;
5+
use crate::client::runtime_usage::TaurusRuntimeUsageService;
56
use crate::config::Config;
67
use code0_flow::flow_service::FlowUpdateService;
78

@@ -11,16 +12,20 @@ use futures_lite::StreamExt;
1112
use log::error;
1213
use prost::Message;
1314
use std::collections::HashMap;
15+
use std::time::{SystemTime, UNIX_EPOCH};
1416
use taurus_core::context::context::Context;
1517
use taurus_core::context::executor::Executor;
1618
use taurus_core::context::registry::FunctionStore;
1719
use taurus_core::context::signal::Signal;
1820
use tokio::signal;
1921
use tonic_health::pb::health_server::HealthServer;
2022
use tucana::shared::value::Kind;
21-
use tucana::shared::{ExecutionFlow, NodeFunction, RuntimeFeature, Translation, Value};
23+
use tucana::shared::{
24+
ExecutionFlow, NodeFunction, RuntimeFeature, RuntimeUsage, Translation, Value,
25+
};
2226

23-
fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal {
27+
fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> (Signal, RuntimeUsage) {
28+
let now = SystemTime::now();
2429
let mut context = Context::default();
2530

2631
let node_functions: HashMap<i64, NodeFunction> = flow
@@ -29,7 +34,23 @@ fn handle_message(flow: ExecutionFlow, store: &FunctionStore) -> Signal {
2934
.map(|node| (node.database_id, node))
3035
.collect();
3136

32-
Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true)
37+
let signal =
38+
Executor::new(store, node_functions).execute(flow.starting_node_id, &mut context, true);
39+
let timestamp = match now.duration_since(UNIX_EPOCH) {
40+
Ok(time) => time.as_millis(),
41+
Err(err) => {
42+
log::error!("cannot get current system time: {:?}", err);
43+
0
44+
}
45+
};
46+
47+
(
48+
signal,
49+
RuntimeUsage {
50+
flow_id: flow.flow_id,
51+
duration: timestamp as i64,
52+
},
53+
)
3354
}
3455

3556
#[tokio::main]
@@ -43,6 +64,7 @@ async fn main() {
4364
let config = Config::new();
4465
let store = FunctionStore::default();
4566
let mut runtime_status_service: Option<TaurusRuntimeStatusService> = None;
67+
let mut runtime_usage_service: Option<TaurusRuntimeUsageService> = None;
4668

4769
let client = match async_nats::connect(config.nats_url.clone()).await {
4870
Ok(client) => {
@@ -92,6 +114,9 @@ async fn main() {
92114
.send()
93115
.await;
94116

117+
let usage_service = TaurusRuntimeUsageService::from_url(config.aquila_url.clone()).await;
118+
runtime_usage_service = Some(usage_service);
119+
95120
let status_service = TaurusRuntimeStatusService::from_url(
96121
config.aquila_url.clone(),
97122
"taurus".into(),
@@ -143,7 +168,8 @@ async fn main() {
143168
};
144169

145170
let flow_id = flow.flow_id;
146-
let value = match handle_message(flow, &store) {
171+
let result = handle_message(flow, &store);
172+
let value = match result.0 {
147173
Signal::Failure(error) => {
148174
log::error!(
149175
"RuntimeError occurred, execution failed because: {:?}",
@@ -173,6 +199,10 @@ async fn main() {
173199

174200
log::info!("For the flow_id {} returing the value {:?}", flow_id, value);
175201

202+
if let Some(usage_service) = &runtime_usage_service {
203+
usage_service.update_runtime_usage(result.1).await;
204+
}
205+
176206
// Send a response to the reply subject
177207
if let Some(reply) = msg.reply {
178208
match client.publish(reply, value.encode_to_vec().into()).await {

0 commit comments

Comments
 (0)