From cff241ec7b7e759c1359466319b947cbf2486bcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arkadiusz=20J=C4=99drzejewski?= Date: Fri, 13 Mar 2026 14:39:27 +0100 Subject: [PATCH] hmon: parametrizable background health thread Expose parameters to background thread. --- Cargo.lock | 32 +- Cargo.toml | 9 +- MODULE.bazel | 2 +- MODULE.bazel.lock | 3 +- src/health_monitoring_lib/BUILD | 3 + src/health_monitoring_lib/Cargo.toml | 1 + .../cpp/health_monitor.cpp | 17 + .../cpp/include/score/hm/health_monitor.h | 5 + .../cpp/include/score/hm/thread.h | 91 +++++ .../cpp/tests/health_monitor_test.cpp | 4 + src/health_monitoring_lib/cpp/thread.cpp | 123 ++++++ .../rust/deadline/ffi.rs | 7 + src/health_monitoring_lib/rust/ffi.rs | 76 +++- .../rust/health_monitor.rs | 18 +- .../rust/heartbeat/ffi.rs | 1 + src/health_monitoring_lib/rust/lib.rs | 1 + src/health_monitoring_lib/rust/logic/ffi.rs | 6 + src/health_monitoring_lib/rust/thread_ffi.rs | 373 ++++++++++++++++++ src/health_monitoring_lib/rust/worker.rs | 46 ++- 19 files changed, 784 insertions(+), 34 deletions(-) create mode 100644 src/health_monitoring_lib/cpp/include/score/hm/thread.h create mode 100644 src/health_monitoring_lib/cpp/thread.cpp create mode 100644 src/health_monitoring_lib/rust/thread_ffi.rs diff --git a/Cargo.lock b/Cargo.lock index aae4276a..8fc3b0c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -126,7 +126,7 @@ checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" [[package]] name = "containers" version = "0.1.0" -source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.0.4#d36362e03664f65117145d6fc90e38505d54a900" +source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.1.1#7227b67c45dff719901d8f3763d5a37fb55aa0ad" [[package]] name = "find-msvc-tools" @@ -159,6 +159,7 @@ dependencies = [ "score_log", "score_testing_macros", "stdout_logger", + "thread", ] [[package]] @@ -262,6 +263,16 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "pal" +version = "0.0.1" +source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.1.1#7227b67c45dff719901d8f3763d5a37fb55aa0ad" +dependencies = [ + "containers", + "libc", + "score_log", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -332,7 +343,7 @@ checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" [[package]] name = "score_log" version = "0.0.1" -source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.0.4#d36362e03664f65117145d6fc90e38505d54a900" +source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.1.1#7227b67c45dff719901d8f3763d5a37fb55aa0ad" dependencies = [ "score_log_fmt", "score_log_fmt_macro", @@ -341,12 +352,12 @@ dependencies = [ [[package]] name = "score_log_fmt" version = "0.0.1" -source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.0.4#d36362e03664f65117145d6fc90e38505d54a900" +source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.1.1#7227b67c45dff719901d8f3763d5a37fb55aa0ad" [[package]] name = "score_log_fmt_macro" version = "0.0.1" -source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.0.4#d36362e03664f65117145d6fc90e38505d54a900" +source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.1.1#7227b67c45dff719901d8f3763d5a37fb55aa0ad" dependencies = [ "proc-macro2", "quote", @@ -357,7 +368,7 @@ dependencies = [ [[package]] name = "score_testing_macros" version = "0.0.1" -source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.0.4#d36362e03664f65117145d6fc90e38505d54a900" +source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.1.1#7227b67c45dff719901d8f3763d5a37fb55aa0ad" dependencies = [ "quote", "stdout_logger", @@ -450,7 +461,7 @@ checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] name = "stdout_logger" version = "0.0.1" -source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.0.4#d36362e03664f65117145d6fc90e38505d54a900" +source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.1.1#7227b67c45dff719901d8f3763d5a37fb55aa0ad" dependencies = [ "score_log", ] @@ -472,6 +483,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thread" +version = "0.0.1" +source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.1.1#7227b67c45dff719901d8f3763d5a37fb55aa0ad" +dependencies = [ + "pal", + "score_log", +] + [[package]] name = "thread_local" version = "1.1.9" diff --git a/Cargo.toml b/Cargo.toml index e9e7c923..00e88c2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,11 @@ signal-hook = "0.3.18" monitor_rs = { path = "src/launch_manager_daemon/health_monitor_lib/rust_bindings" } # Temporary API health_monitoring_lib = { path = "src/health_monitoring_lib" } -score_log = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.0.4" } -score_testing_macros = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.0.4" } -stdout_logger = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.0.4" } -containers = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.0.4" } +score_log = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.1.1" } +score_testing_macros = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.1.1" } +stdout_logger = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.1.1" } +containers = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.1.1" } +thread = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.1.1" } [workspace.lints.clippy] std_instead_of_core = "warn" diff --git a/MODULE.bazel b/MODULE.bazel index 3176d7f7..92b7cb87 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -133,7 +133,7 @@ pip.parse( ) use_repo(pip, "score_lifecycle_pip") -bazel_dep(name = "score_baselibs_rust", version = "0.1.0") +bazel_dep(name = "score_baselibs_rust", version = "0.1.1") bazel_dep(name = "score_baselibs", version = "0.2.4") bazel_dep(name = "score_logging", version = "0.1.0") diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 5956895c..f59651ba 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -876,7 +876,8 @@ "https://raw.githubusercontent.com/eclipse-score/bazel_registry/main/modules/score_baselibs/0.2.4/MODULE.bazel": "800f8e36675392f13a5baf0a29ed1e9813cf7fdc28645a16e9ea9571e503c5f2", "https://raw.githubusercontent.com/eclipse-score/bazel_registry/main/modules/score_baselibs/0.2.4/source.json": "11cff5bb6678024efecbe0c1ce64580cf16be37d14af8b0d2acc9037d7a4a710", "https://raw.githubusercontent.com/eclipse-score/bazel_registry/main/modules/score_baselibs_rust/0.1.0/MODULE.bazel": "e9f8781fa23b58a7c4815d662d82a9a472d8dddc306f08cba3853928f3b760fe", - "https://raw.githubusercontent.com/eclipse-score/bazel_registry/main/modules/score_baselibs_rust/0.1.0/source.json": "f569a33fda1de61ccb962e19b72af745d93ba3f543c98b6ee72f81c9b890185f", + "https://raw.githubusercontent.com/eclipse-score/bazel_registry/main/modules/score_baselibs_rust/0.1.1/MODULE.bazel": "374ced8641b32f0f36bdf017e02a82ebbc89578f38b59f5f83b94775b434773a", + "https://raw.githubusercontent.com/eclipse-score/bazel_registry/main/modules/score_baselibs_rust/0.1.1/source.json": "b556f52b7f13372b0bfc9da92a3eb0951c2bae724ec4346301855146c93884c2", "https://raw.githubusercontent.com/eclipse-score/bazel_registry/main/modules/score_bazel_cpp_toolchains/0.2.2/MODULE.bazel": "343a1892b1d5c616e0b4cbecfb5e548fa69328d22bb4fd5862bdd3cfa902142b", "https://raw.githubusercontent.com/eclipse-score/bazel_registry/main/modules/score_bazel_cpp_toolchains/0.2.2/source.json": "624c1addd22fff7fc894d0571d35c8e47cc2d3ff9e75b15b8fb1cff021391a30", "https://raw.githubusercontent.com/eclipse-score/bazel_registry/main/modules/score_bazel_platforms/0.0.2/MODULE.bazel": "32f0cbc08bb1c60279448d666aead6b5a000374a8a67f08822b258bf00a6a183", diff --git a/src/health_monitoring_lib/BUILD b/src/health_monitoring_lib/BUILD index 89943f59..002486ea 100644 --- a/src/health_monitoring_lib/BUILD +++ b/src/health_monitoring_lib/BUILD @@ -17,6 +17,7 @@ load("@score_baselibs//:bazel/unit_tests.bzl", "cc_gtest_unit_test") COMMON_DEPS = [ "@score_baselibs_rust//src/containers:containers", + "@score_baselibs_rust//src/thread:thread", "@score_baselibs_rust//src/log/score_log:score_log", "//src/launch_manager_daemon/health_monitor_lib/rust_bindings:monitor_rs", ] @@ -31,11 +32,13 @@ CC_SOURCES = [ "cpp/heartbeat_monitor.cpp", "cpp/logic_monitor.cpp", "cpp/health_monitor.cpp", + "cpp/thread.cpp", ] CC_HDRS = [ "cpp/include/score/hm/common.h", "cpp/include/score/hm/tag.h", + "cpp/include/score/hm/thread.h", "cpp/include/score/hm/deadline/deadline_monitor.h", "cpp/include/score/hm/heartbeat/heartbeat_monitor.h", "cpp/include/score/hm/logic/logic_monitor.h", diff --git a/src/health_monitoring_lib/Cargo.toml b/src/health_monitoring_lib/Cargo.toml index e38bc5b5..0c3ad00d 100644 --- a/src/health_monitoring_lib/Cargo.toml +++ b/src/health_monitoring_lib/Cargo.toml @@ -14,6 +14,7 @@ path = "rust/lib.rs" workspace = true [dependencies] +thread.workspace = true score_log.workspace = true score_testing_macros.workspace = true containers.workspace = true diff --git a/src/health_monitoring_lib/cpp/health_monitor.cpp b/src/health_monitoring_lib/cpp/health_monitor.cpp index 9984d250..3b14466d 100644 --- a/src/health_monitoring_lib/cpp/health_monitor.cpp +++ b/src/health_monitoring_lib/cpp/health_monitor.cpp @@ -28,6 +28,7 @@ FFICode health_monitor_builder_destroy(FFIHandle health_monitor_builder_handle); FFICode health_monitor_builder_build(FFIHandle health_monitor_builder_handle, const uint64_t* supervisor_cycle_ms, const uint64_t* internal_cycle_ms, + FFIHandle thread_parameters_handle, FFIHandle* health_monitor_handle_out); FFICode health_monitor_builder_add_deadline_monitor(FFIHandle health_monitor_builder_handle, const MonitorTag* monitor_tag, @@ -129,6 +130,12 @@ HealthMonitorBuilder HealthMonitorBuilder::with_supervisor_api_cycle(std::chrono return std::move(*this); } +HealthMonitorBuilder HealthMonitorBuilder::thread_parameters(score::hm::ThreadParameters&& thread_parameters) && +{ + thread_parameters_ = std::move(thread_parameters); + return std::move(*this); +} + score::cpp::expected HealthMonitorBuilder::build() && { auto health_monitor_builder_handle = health_monitor_builder_handle_.drop_by_rust(); @@ -146,10 +153,20 @@ score::cpp::expected HealthMonitorBuilder::build() && internal_processing_cycle_ms = &internal_processing_cycle_ms_.value(); } + // Handle thread parameters. + FFIHandle thread_parameters_handle{nullptr}; + if (thread_parameters_.has_value()) + { + auto rust_handle{thread_parameters_.value().drop_by_rust()}; + SCORE_LANGUAGE_FUTURECPP_ASSERT(rust_handle.has_value()); + thread_parameters_handle = rust_handle.value(); + } + FFIHandle health_monitor_handle{nullptr}; auto result{health_monitor_builder_build(health_monitor_builder_handle.value(), supervisor_api_cycle_ms, internal_processing_cycle_ms, + thread_parameters_handle, &health_monitor_handle)}; if (result != kSuccess) { diff --git a/src/health_monitoring_lib/cpp/include/score/hm/health_monitor.h b/src/health_monitoring_lib/cpp/include/score/hm/health_monitor.h index e9792888..886a62ae 100644 --- a/src/health_monitoring_lib/cpp/include/score/hm/health_monitor.h +++ b/src/health_monitoring_lib/cpp/include/score/hm/health_monitor.h @@ -18,6 +18,7 @@ #include #include #include +#include namespace score::hm { @@ -59,6 +60,9 @@ class HealthMonitorBuilder final /// This duration determines how often the health monitor checks deadlines. HealthMonitorBuilder with_internal_processing_cycle(std::chrono::milliseconds cycle_duration) &&; + /// Sets the monitoring thread parameters. + HealthMonitorBuilder thread_parameters(score::hm::ThreadParameters&& thread_parameters) &&; + /// Build a new `HealthMonitor` instance based on provided parameters. score::cpp::expected build() &&; @@ -67,6 +71,7 @@ class HealthMonitorBuilder final std::optional supervisor_api_cycle_ms_; std::optional internal_processing_cycle_ms_; + std::optional thread_parameters_; }; class HealthMonitor final diff --git a/src/health_monitoring_lib/cpp/include/score/hm/thread.h b/src/health_monitoring_lib/cpp/include/score/hm/thread.h new file mode 100644 index 00000000..748ab5d3 --- /dev/null +++ b/src/health_monitoring_lib/cpp/include/score/hm/thread.h @@ -0,0 +1,91 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ +#ifndef SCORE_HM_THREAD_H +#define SCORE_HM_THREAD_H + +#include "common.h" +#include +#include + +namespace score::hm +{ + +class HealthMonitorBuilder; + +/// Scheduler policy. +enum class SchedulerPolicy : int32_t +{ + Other, + Fifo, + RoundRobin, +}; + +/// Get min thread priority for given policy. +int32_t scheduler_policy_priority_min(SchedulerPolicy scheduler_policy); + +/// Get max thread priority for given policy. +int32_t scheduler_policy_priority_max(SchedulerPolicy scheduler_policy); + +class SchedulerParameters final +{ + public: + /// Create a new `SchedulerParameters`. + /// Priority must be in allowed range for the scheduler policy. + SchedulerParameters(SchedulerPolicy policy, int32_t priority); + + /// Scheduler policy. + SchedulerPolicy policy() const; + + /// Thread priority. + int32_t priority() const; + + private: + SchedulerPolicy policy_; + int32_t priority_; +}; + +/// Thread parameters. +class ThreadParameters final : public internal::RustDroppable +{ + public: + /// Create a new `ThreadParameters` containing default values. + ThreadParameters(); + + /// Scheduler parameters, including scheduler policy and thread priority. + ThreadParameters scheduler_parameters(SchedulerParameters scheduler_parameters) &&; + + /// Set thread affinity - array of CPU core IDs that the thread can run on. + ThreadParameters affinity(const std::vector& affinity) &&; + + /// Set stack size. + ThreadParameters stack_size(size_t stack_size) &&; + + protected: + std::optional _drop_by_rust_impl() + { + return thread_parameters_handle_.drop_by_rust(); + } + + private: + internal::DroppableFFIHandle thread_parameters_handle_; + + // Allow to hide `drop_by_rust` implementation. + friend class internal::RustDroppable; + + // Allow `HealthMonitorBuilder` to access `drop_by_rust` implementation. + friend class score::hm::HealthMonitorBuilder; +}; + +} // namespace score::hm + +#endif // SCORE_HM_THREAD_H diff --git a/src/health_monitoring_lib/cpp/tests/health_monitor_test.cpp b/src/health_monitoring_lib/cpp/tests/health_monitor_test.cpp index 53872260..c31c7f5f 100644 --- a/src/health_monitoring_lib/cpp/tests/health_monitor_test.cpp +++ b/src/health_monitoring_lib/cpp/tests/health_monitor_test.cpp @@ -56,12 +56,16 @@ TEST_F(HealthMonitorTest, TestName) auto logic_monitor_builder = logic::LogicMonitorBuilder{from_state}.add_state(from_state, std::vector{to_state}).add_state(to_state, {}); + // Thread parameters. + auto thread_parameters{ThreadParameters().affinity(std::vector{0})}; + auto hmon_result{HealthMonitorBuilder() .add_deadline_monitor(deadline_monitor_tag, std::move(deadline_monitor_builder)) .add_heartbeat_monitor(heartbeat_monitor_tag, std::move(heartbeat_monitor_builder)) .add_logic_monitor(logic_monitor_tag, std::move(logic_monitor_builder)) .with_internal_processing_cycle(std::chrono::milliseconds(50)) .with_supervisor_api_cycle(std::chrono::milliseconds(50)) + .thread_parameters(std::move(thread_parameters)) .build()}; EXPECT_TRUE(hmon_result.has_value()); auto hm{std::move(hmon_result.value())}; diff --git a/src/health_monitoring_lib/cpp/thread.cpp b/src/health_monitoring_lib/cpp/thread.cpp new file mode 100644 index 00000000..d720513e --- /dev/null +++ b/src/health_monitoring_lib/cpp/thread.cpp @@ -0,0 +1,123 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "score/hm/thread.h" +#include + +namespace +{ +extern "C" { + +using namespace score::hm; +using namespace score::hm::internal; + +// Functions below must match functions defined in `crate::thread_ffi`. + +FFICode scheduler_policy_priority_min(SchedulerPolicy scheduler_policy, int32_t* priority_out); +FFICode scheduler_policy_priority_max(SchedulerPolicy scheduler_policy, int32_t* priority_out); +FFICode thread_parameters_create(FFIHandle* thread_parameters_handle_out); +FFICode thread_parameters_destroy(FFIHandle thread_parameters_handle); +FFICode thread_parameters_scheduler_parameters(FFIHandle thread_parameters_handle, + SchedulerPolicy policy, + int32_t priority); +FFICode thread_parameters_affinity(FFIHandle thread_parameters_handle, const size_t* affinity, size_t num_affinity); +FFICode thread_parameters_stack_size(FFIHandle thread_parameters_handle, size_t stack_size); +} + +FFIHandle thread_parameters_create_wrapper() +{ + FFIHandle handle{nullptr}; + auto result{thread_parameters_create(&handle)}; + SCORE_LANGUAGE_FUTURECPP_ASSERT(result == kSuccess); + return handle; +} +} // namespace + +namespace score::hm +{ + +int32_t scheduler_policy_priority_min(SchedulerPolicy scheduler_policy) +{ + int32_t priority{0}; + auto result{::scheduler_policy_priority_min(scheduler_policy, &priority)}; + SCORE_LANGUAGE_FUTURECPP_ASSERT(result == kSuccess); + return priority; +} + +int32_t scheduler_policy_priority_max(SchedulerPolicy scheduler_policy) +{ + int32_t priority{0}; + auto result{::scheduler_policy_priority_max(scheduler_policy, &priority)}; + SCORE_LANGUAGE_FUTURECPP_ASSERT(result == kSuccess); + return priority; +} + +SchedulerParameters::SchedulerParameters(SchedulerPolicy policy, int32_t priority) + : policy_{policy}, priority_{priority} +{ + auto min{scheduler_policy_priority_min(policy)}; + auto max{scheduler_policy_priority_max(policy)}; + SCORE_LANGUAGE_FUTURECPP_ASSERT(priority >= min && priority <= max); +} + +SchedulerPolicy SchedulerParameters::policy() const +{ + return policy_; +} + +int32_t SchedulerParameters::priority() const +{ + return priority_; +} + +ThreadParameters::ThreadParameters() + : thread_parameters_handle_{thread_parameters_create_wrapper(), &thread_parameters_destroy} +{ +} + +ThreadParameters ThreadParameters::scheduler_parameters(SchedulerParameters scheduler_parameters) && +{ + auto rust_handle{thread_parameters_handle_.as_rust_handle()}; + SCORE_LANGUAGE_FUTURECPP_PRECONDITION(rust_handle.has_value()); + + auto policy{scheduler_parameters.policy()}; + auto priority{scheduler_parameters.priority()}; + auto result{thread_parameters_scheduler_parameters(rust_handle.value(), policy, priority)}; + SCORE_LANGUAGE_FUTURECPP_ASSERT(result == kSuccess); + + return std::move(*this); +} + +ThreadParameters ThreadParameters::affinity(const std::vector& affinity) && +{ + auto rust_handle{thread_parameters_handle_.as_rust_handle()}; + SCORE_LANGUAGE_FUTURECPP_PRECONDITION(rust_handle.has_value()); + + auto result{thread_parameters_affinity(rust_handle.value(), affinity.data(), affinity.size())}; + SCORE_LANGUAGE_FUTURECPP_ASSERT(result == kSuccess); + + return std::move(*this); +} + +ThreadParameters ThreadParameters::stack_size(size_t stack_size) && +{ + auto rust_handle{thread_parameters_handle_.as_rust_handle()}; + SCORE_LANGUAGE_FUTURECPP_PRECONDITION(rust_handle.has_value()); + + auto result{thread_parameters_stack_size(rust_handle.value(), stack_size)}; + SCORE_LANGUAGE_FUTURECPP_ASSERT(result == kSuccess); + + return std::move(*this); +} + +} // namespace score::hm diff --git a/src/health_monitoring_lib/rust/deadline/ffi.rs b/src/health_monitoring_lib/rust/deadline/ffi.rs index 416f24e5..b36f88a1 100644 --- a/src/health_monitoring_lib/rust/deadline/ffi.rs +++ b/src/health_monitoring_lib/rust/deadline/ffi.rs @@ -336,6 +336,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_deadline_monitor( @@ -385,6 +386,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_deadline_monitor( @@ -441,6 +443,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_deadline_monitor( @@ -487,6 +490,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_deadline_monitor( @@ -537,6 +541,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_deadline_monitor( @@ -586,6 +591,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_deadline_monitor( @@ -642,6 +648,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_deadline_monitor( diff --git a/src/health_monitoring_lib/rust/ffi.rs b/src/health_monitoring_lib/rust/ffi.rs index 27948340..87afaea4 100644 --- a/src/health_monitoring_lib/rust/ffi.rs +++ b/src/health_monitoring_lib/rust/ffi.rs @@ -16,6 +16,7 @@ use crate::health_monitor::{HealthMonitor, HealthMonitorBuilder, HealthMonitorEr use crate::heartbeat::HeartbeatMonitorBuilder; use crate::logic::LogicMonitorBuilder; use crate::tag::MonitorTag; +use crate::thread_ffi::ThreadParametersCpp; use core::mem::ManuallyDrop; use core::ops::{Deref, DerefMut}; use core::time::Duration; @@ -111,6 +112,7 @@ pub extern "C" fn health_monitor_builder_build( health_monitor_builder_handle: FFIHandle, supervisor_cycle_ms: *const u64, internal_cycle_ms: *const u64, + thread_parameters_handle: FFIHandle, health_monitor_handle_out: *mut FFIHandle, ) -> FFICode { if health_monitor_builder_handle.is_null() || health_monitor_handle_out.is_null() { @@ -136,6 +138,15 @@ pub extern "C" fn health_monitor_builder_build( health_monitor_builder.with_supervisor_api_cycle_internal(supervisor_cycle_ms); } + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that the pointer was created by a call to `thread_parameters_create`. + // It is assumed that the pointer was not consumed by a call to `thread_parameters_destroy`. + if !thread_parameters_handle.is_null() { + let thread_parameters = unsafe { Box::from_raw(thread_parameters_handle as *mut ThreadParametersCpp) }; + health_monitor_builder.thread_parameters_internal(thread_parameters.build()); + } + // Build instance. match health_monitor_builder.build() { Ok(health_monitor) => { @@ -398,6 +409,7 @@ mod tests { logic_monitor_destroy, }; use crate::tag::{MonitorTag, StateTag}; + use crate::thread_ffi::thread_parameters_create; use core::ptr::null_mut; fn def_logic_monitor_builder() -> FFIHandle { @@ -467,6 +479,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); assert!(!health_monitor_handle.is_null()); @@ -499,6 +512,7 @@ mod tests { health_monitor_builder_handle, &supervisor_cycle_ms as *const _, &internal_cycle_ms as *const _, + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); assert!(!health_monitor_handle.is_null()); @@ -523,6 +537,7 @@ mod tests { health_monitor_builder_handle, &supervisor_cycle_ms as *const _, &internal_cycle_ms as *const _, + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); assert!(health_monitor_handle.is_null()); @@ -542,6 +557,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); assert_eq!(health_monitor_builder_build_result, FFICode::WrongState); @@ -557,6 +573,7 @@ mod tests { null_mut(), null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); assert!(health_monitor_handle.is_null()); @@ -569,14 +586,51 @@ mod tests { let _ = health_monitor_builder_create(&mut health_monitor_builder_handle as *mut FFIHandle); - let health_monitor_builder_build_result = - health_monitor_builder_build(health_monitor_builder_handle, null_mut(), null_mut(), null_mut()); + let health_monitor_builder_build_result = health_monitor_builder_build( + health_monitor_builder_handle, + null_mut(), + null_mut(), + null_mut(), + null_mut(), + ); assert_eq!(health_monitor_builder_build_result, FFICode::NullParameter); // Clean-up. health_monitor_builder_destroy(health_monitor_builder_handle); } + #[test] + fn health_monitor_builder_build_thread_parameters() { + let mut health_monitor_builder_handle: FFIHandle = null_mut(); + let mut health_monitor_handle: FFIHandle = null_mut(); + let mut deadline_monitor_builder_handle = null_mut(); + let mut thread_parameters_handle = null_mut(); + + let _ = health_monitor_builder_create(&mut health_monitor_builder_handle as *mut FFIHandle); + let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); + let _ = deadline_monitor_builder_create(&mut deadline_monitor_builder_handle as *mut FFIHandle); + let _ = health_monitor_builder_add_deadline_monitor( + health_monitor_builder_handle, + &deadline_monitor_tag as *const MonitorTag, + deadline_monitor_builder_handle, + ); + + let _ = thread_parameters_create(&mut thread_parameters_handle as *mut FFIHandle); + + let health_monitor_builder_build_result = health_monitor_builder_build( + health_monitor_builder_handle, + null_mut(), + null_mut(), + thread_parameters_handle, + &mut health_monitor_handle as *mut FFIHandle, + ); + assert!(!health_monitor_handle.is_null()); + assert_eq!(health_monitor_builder_build_result, FFICode::Success); + + // Clean-up. + health_monitor_destroy(health_monitor_handle); + } + #[test] fn health_monitor_builder_add_deadline_monitor_succeeds() { let mut health_monitor_builder_handle: FFIHandle = null_mut(); @@ -857,6 +911,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -893,6 +948,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -938,6 +994,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -972,6 +1029,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1005,6 +1063,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1038,6 +1097,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1074,6 +1134,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1119,6 +1180,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1153,6 +1215,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1186,6 +1249,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1218,6 +1282,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1253,6 +1318,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1297,6 +1363,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1330,6 +1397,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1362,6 +1430,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1377,6 +1446,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn health_monitor_start_succeeds() { let mut health_monitor_builder_handle: FFIHandle = null_mut(); let mut health_monitor_handle: FFIHandle = null_mut(); @@ -1395,6 +1465,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); @@ -1430,6 +1501,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); diff --git a/src/health_monitoring_lib/rust/health_monitor.rs b/src/health_monitoring_lib/rust/health_monitor.rs index 7e2821b0..3a40df39 100644 --- a/src/health_monitoring_lib/rust/health_monitor.rs +++ b/src/health_monitoring_lib/rust/health_monitor.rs @@ -23,6 +23,7 @@ use crate::worker::{MonitoringLogic, UniqueThreadRunner}; use containers::fixed_capacity::FixedCapacityVec; use core::time::Duration; use std::collections::HashMap; +use thread::ThreadParameters; /// Health monitor errors. #[derive(PartialEq, Eq, Debug, ScoreDebug)] @@ -43,6 +44,7 @@ pub struct HealthMonitorBuilder { logic_monitor_builders: HashMap, supervisor_api_cycle: Duration, internal_processing_cycle: Duration, + thread_parameters: ThreadParameters, } impl HealthMonitorBuilder { @@ -54,6 +56,7 @@ impl HealthMonitorBuilder { logic_monitor_builders: HashMap::new(), supervisor_api_cycle: Duration::from_millis(500), internal_processing_cycle: Duration::from_millis(100), + thread_parameters: ThreadParameters::default(), } } @@ -113,6 +116,14 @@ impl HealthMonitorBuilder { self } + /// Set the monitoring thread parameters. + /// + /// - `thread_parameters` - monitoring thread parameters. + pub fn thread_parameters(mut self, thread_parameters: ThreadParameters) -> Self { + self.thread_parameters_internal(thread_parameters); + self + } + /// Build a new [`HealthMonitor`] instance based on provided parameters. pub fn build(self) -> Result { // Check cycle values. @@ -164,7 +175,7 @@ impl HealthMonitorBuilder { deadline_monitors, heartbeat_monitors, logic_monitors, - worker: UniqueThreadRunner::new(self.internal_processing_cycle), + worker: UniqueThreadRunner::new(self.internal_processing_cycle, self.thread_parameters), supervisor_api_cycle: self.supervisor_api_cycle, }) } @@ -198,6 +209,10 @@ impl HealthMonitorBuilder { pub(crate) fn with_internal_processing_cycle_internal(&mut self, cycle_duration: Duration) { self.internal_processing_cycle = cycle_duration; } + + pub(crate) fn thread_parameters_internal(&mut self, thread_parameters: ThreadParameters) { + self.thread_parameters = thread_parameters; + } } /// Monitor ownership state in the [`HealthMonitor`]. @@ -585,6 +600,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn health_monitor_start_succeeds() { let deadline_monitor_tag = MonitorTag::from("deadline_monitor"); let deadline_monitor_builder = DeadlineMonitorBuilder::new(); diff --git a/src/health_monitoring_lib/rust/heartbeat/ffi.rs b/src/health_monitoring_lib/rust/heartbeat/ffi.rs index 1f0324b1..cf5265ec 100644 --- a/src/health_monitoring_lib/rust/heartbeat/ffi.rs +++ b/src/health_monitoring_lib/rust/heartbeat/ffi.rs @@ -165,6 +165,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_heartbeat_monitor( diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 69050efe..b2b8e169 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -18,6 +18,7 @@ mod log; mod protected_memory; mod supervisor_api_client; mod tag; +mod thread_ffi; mod worker; pub mod deadline; diff --git a/src/health_monitoring_lib/rust/logic/ffi.rs b/src/health_monitoring_lib/rust/logic/ffi.rs index 6be6e445..b067668e 100644 --- a/src/health_monitoring_lib/rust/logic/ffi.rs +++ b/src/health_monitoring_lib/rust/logic/ffi.rs @@ -352,6 +352,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_logic_monitor( @@ -400,6 +401,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_logic_monitor( @@ -456,6 +458,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_logic_monitor( @@ -504,6 +507,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_logic_monitor( @@ -554,6 +558,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_logic_monitor( @@ -613,6 +618,7 @@ mod tests { health_monitor_builder_handle, null_mut(), null_mut(), + null_mut(), &mut health_monitor_handle as *mut FFIHandle, ); let _ = health_monitor_get_logic_monitor( diff --git a/src/health_monitoring_lib/rust/thread_ffi.rs b/src/health_monitoring_lib/rust/thread_ffi.rs new file mode 100644 index 00000000..2b2e3ebe --- /dev/null +++ b/src/health_monitoring_lib/rust/thread_ffi.rs @@ -0,0 +1,373 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::ffi::{FFIBorrowed, FFICode, FFIHandle}; +use thread::{SchedulerParameters, SchedulerPolicy, ThreadParameters}; + +/// C++ interface proxy for [`ThreadParameters`]. +#[derive(Default)] +pub(crate) struct ThreadParametersCpp { + scheduler_parameters: Option, + affinity: Option>, + stack_size: Option, +} + +impl ThreadParametersCpp { + fn new() -> Self { + Self::default() + } + + fn scheduler_parameters(&mut self, scheduler_parameters: SchedulerParameters) { + self.scheduler_parameters = Some(scheduler_parameters); + } + + fn affinity(&mut self, affinity: &[usize]) { + self.affinity = Some(Box::from(affinity)); + } + + fn stack_size(&mut self, stack_size: usize) { + self.stack_size = Some(stack_size); + } + + pub(crate) fn build(self) -> ThreadParameters { + let mut thread_parameters = ThreadParameters::new(); + if let Some(scheduler_parameters) = self.scheduler_parameters { + thread_parameters = thread_parameters.scheduler_parameters(scheduler_parameters); + } + if let Some(affinity) = self.affinity { + thread_parameters = thread_parameters.affinity(&affinity); + } + if let Some(stack_size) = self.stack_size { + thread_parameters = thread_parameters.stack_size(stack_size); + } + + thread_parameters + } +} + +#[unsafe(no_mangle)] +pub extern "C" fn scheduler_policy_priority_min(scheduler_policy: SchedulerPolicy, priority_out: *mut i32) -> FFICode { + if priority_out.is_null() { + return FFICode::NullParameter; + } + + // SAFETY: validity of the pointer is ensured. + unsafe { + *priority_out = scheduler_policy.priority_min(); + } + FFICode::Success +} + +#[unsafe(no_mangle)] +pub extern "C" fn scheduler_policy_priority_max(scheduler_policy: SchedulerPolicy, priority_out: *mut i32) -> FFICode { + if priority_out.is_null() { + return FFICode::NullParameter; + } + + // SAFETY: validity of the pointer is ensured. + unsafe { + *priority_out = scheduler_policy.priority_max(); + } + FFICode::Success +} + +#[unsafe(no_mangle)] +pub extern "C" fn thread_parameters_create(thread_parameters_handle_out: *mut FFIHandle) -> FFICode { + if thread_parameters_handle_out.is_null() { + return FFICode::NullParameter; + } + + let thread_parameters = ThreadParametersCpp::new(); + unsafe { + *thread_parameters_handle_out = Box::into_raw(Box::new(thread_parameters)).cast(); + } + + FFICode::Success +} + +#[unsafe(no_mangle)] +pub extern "C" fn thread_parameters_destroy(thread_parameters_handle: FFIHandle) -> FFICode { + if thread_parameters_handle.is_null() { + return FFICode::NullParameter; + } + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that the pointer was created by a call to `thread_parameters_create`. + // It is assumed that the pointer was not consumed by a call to `health_monitor_builder_build`. + unsafe { + let _ = Box::from_raw(thread_parameters_handle as *mut ThreadParametersCpp); + } + + FFICode::Success +} + +#[unsafe(no_mangle)] +pub extern "C" fn thread_parameters_scheduler_parameters( + thread_parameters_handle: FFIHandle, + policy: SchedulerPolicy, + priority: i32, +) -> FFICode { + if thread_parameters_handle.is_null() { + return FFICode::NullParameter; + } + + // Make sure priority is in allowed range. + let allowed_priority_range = policy.priority_min()..=policy.priority_max(); + if !allowed_priority_range.contains(&priority) { + return FFICode::InvalidArgument; + } + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that the pointer was created by a call to `thread_parameters_create`. + // It is assumed that the pointer was not consumed by a call to `thread_parameters_destroy` or `health_monitor_builder_build`. + let mut thread_parameters = + FFIBorrowed::new(unsafe { Box::from_raw(thread_parameters_handle as *mut ThreadParametersCpp) }); + + let scheduler_parameters = SchedulerParameters::new(policy, priority); + thread_parameters.scheduler_parameters(scheduler_parameters); + + FFICode::Success +} + +#[unsafe(no_mangle)] +pub extern "C" fn thread_parameters_affinity( + thread_parameters_handle: FFIHandle, + affinity: *const usize, + num_affinity: usize, +) -> FFICode { + if thread_parameters_handle.is_null() { + return FFICode::NullParameter; + } + // Null is only allowed when `num_affinity` equals 0! + if affinity.is_null() && num_affinity > 0 { + return FFICode::NullParameter; + } + + // SAFETY: + // `affinity` must contain a valid continuous array. + // Number of elements must match `num_affinity`. + // Null is allowed when `num_affinity` equals 0. + let affinity = if num_affinity > 0 { + unsafe { core::slice::from_raw_parts(affinity, num_affinity) } + } else { + &[] + }; + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that the pointer was created by a call to `thread_parameters_create`. + // It is assumed that the pointer was not consumed by a call to `thread_parameters_destroy` or `health_monitor_builder_build`. + let mut thread_parameters = + FFIBorrowed::new(unsafe { Box::from_raw(thread_parameters_handle as *mut ThreadParametersCpp) }); + + thread_parameters.affinity(affinity); + + FFICode::Success +} + +#[unsafe(no_mangle)] +pub extern "C" fn thread_parameters_stack_size(thread_parameters_handle: FFIHandle, stack_size: usize) -> FFICode { + if thread_parameters_handle.is_null() { + return FFICode::NullParameter; + } + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that the pointer was created by a call to `thread_parameters_create`. + // It is assumed that the pointer was not consumed by a call to `thread_parameters_destroy` or `health_monitor_builder_build`. + let mut thread_parameters = + FFIBorrowed::new(unsafe { Box::from_raw(thread_parameters_handle as *mut ThreadParametersCpp) }); + + thread_parameters.stack_size(stack_size); + + FFICode::Success +} + +#[score_testing_macros::test_mod_with_log] +#[cfg(all(test, not(loom)))] +mod tests { + use crate::ffi::{FFICode, FFIHandle}; + use crate::thread_ffi::{ + scheduler_policy_priority_max, scheduler_policy_priority_min, thread_parameters_affinity, + thread_parameters_create, thread_parameters_destroy, thread_parameters_scheduler_parameters, + thread_parameters_stack_size, + }; + use core::mem::MaybeUninit; + use core::ptr::null_mut; + use thread::SchedulerPolicy; + + #[test] + #[cfg_attr(miri, ignore)] + fn scheduler_policy_priority_min_succeeds() { + let policy = SchedulerPolicy::Fifo; + let mut priority = MaybeUninit::uninit(); + let scheduler_policy_priority_min_result = scheduler_policy_priority_min(policy, priority.as_mut_ptr()); + assert_eq!(scheduler_policy_priority_min_result, FFICode::Success); + assert_eq!(unsafe { priority.assume_init() }, 1); + } + + #[test] + fn scheduler_policy_priority_min_null_priority() { + let policy = SchedulerPolicy::Fifo; + let priority = null_mut(); + let scheduler_policy_priority_min_result = scheduler_policy_priority_min(policy, priority); + assert_eq!(scheduler_policy_priority_min_result, FFICode::NullParameter); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn scheduler_policy_priority_max_succeeds() { + let policy = SchedulerPolicy::Fifo; + let mut priority = MaybeUninit::uninit(); + let scheduler_policy_priority_max_result = scheduler_policy_priority_max(policy, priority.as_mut_ptr()); + assert_eq!(scheduler_policy_priority_max_result, FFICode::Success); + assert_eq!(unsafe { priority.assume_init() }, 99); + } + + #[test] + fn scheduler_policy_priority_max_null_priority() { + let policy = SchedulerPolicy::Fifo; + let priority = null_mut(); + let scheduler_policy_priority_max_result = scheduler_policy_priority_max(policy, priority); + assert_eq!(scheduler_policy_priority_max_result, FFICode::NullParameter); + } + + #[test] + fn thread_parameters_create_succeeds() { + let mut thread_parameters_handle: FFIHandle = null_mut(); + + let thread_parameters_create_result = thread_parameters_create(&mut thread_parameters_handle as *mut FFIHandle); + assert!(!thread_parameters_handle.is_null()); + assert_eq!(thread_parameters_create_result, FFICode::Success); + + // Clean-up. + // NOTE: `thread_parameters_destroy` positive path is already tested here. + let thread_parameters_destroy_result = thread_parameters_destroy(thread_parameters_handle); + assert_eq!(thread_parameters_destroy_result, FFICode::Success); + } + + #[test] + fn thread_parameters_destroy_null_handle() { + let thread_parameters_destroy_result = thread_parameters_destroy(null_mut()); + assert_eq!(thread_parameters_destroy_result, FFICode::NullParameter); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn thread_parameters_scheduler_parameters_succeeds() { + let mut thread_parameters_handle: FFIHandle = null_mut(); + + let _ = thread_parameters_create(&mut thread_parameters_handle as *mut FFIHandle); + + let thread_parameters_scheduler_parameters_result = + thread_parameters_scheduler_parameters(thread_parameters_handle, SchedulerPolicy::Fifo, 50); + assert_eq!(thread_parameters_scheduler_parameters_result, FFICode::Success); + + // Clean-up. + thread_parameters_destroy(thread_parameters_handle); + } + + #[test] + fn thread_parameters_scheduler_parameters_null_handle() { + let thread_parameters_scheduler_parameters_result = + thread_parameters_scheduler_parameters(null_mut(), SchedulerPolicy::Fifo, 50); + assert_eq!(thread_parameters_scheduler_parameters_result, FFICode::NullParameter); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn thread_parameters_scheduler_parameters_invalid_priority() { + let mut thread_parameters_handle: FFIHandle = null_mut(); + + let _ = thread_parameters_create(&mut thread_parameters_handle as *mut FFIHandle); + + let thread_parameters_scheduler_parameters_result = + thread_parameters_scheduler_parameters(thread_parameters_handle, SchedulerPolicy::Other, 50); + assert_eq!(thread_parameters_scheduler_parameters_result, FFICode::InvalidArgument); + + // Clean-up. + thread_parameters_destroy(thread_parameters_handle); + } + + #[test] + fn thread_parameters_affinity_succeeds() { + let mut thread_parameters_handle: FFIHandle = null_mut(); + + let _ = thread_parameters_create(&mut thread_parameters_handle as *mut FFIHandle); + + let affinity = [1, 2, 3]; + let thread_parameters_affinity_result = + thread_parameters_affinity(thread_parameters_handle, affinity.as_ptr(), affinity.len()); + assert_eq!(thread_parameters_affinity_result, FFICode::Success); + + // Clean-up. + thread_parameters_destroy(thread_parameters_handle); + } + + #[test] + fn thread_parameters_affinity_null_handle() { + let affinity = [1, 2, 3]; + let thread_parameters_affinity_result = + thread_parameters_affinity(null_mut(), affinity.as_ptr(), affinity.len()); + assert_eq!(thread_parameters_affinity_result, FFICode::NullParameter); + } + + #[test] + fn thread_parameters_affinity_null_affinity_zero_elements() { + let mut thread_parameters_handle: FFIHandle = null_mut(); + + let _ = thread_parameters_create(&mut thread_parameters_handle as *mut FFIHandle); + + let thread_parameters_affinity_result = thread_parameters_affinity(thread_parameters_handle, null_mut(), 0); + assert_eq!(thread_parameters_affinity_result, FFICode::Success); + + // Clean-up. + thread_parameters_destroy(thread_parameters_handle); + } + + #[test] + fn thread_parameters_affinity_null_affinity_many_elements() { + let mut thread_parameters_handle: FFIHandle = null_mut(); + + let _ = thread_parameters_create(&mut thread_parameters_handle as *mut FFIHandle); + + let affinity = [1, 2, 3]; + let thread_parameters_affinity_result = + thread_parameters_affinity(thread_parameters_handle, null_mut(), affinity.len()); + assert_eq!(thread_parameters_affinity_result, FFICode::NullParameter); + + // Clean-up. + thread_parameters_destroy(thread_parameters_handle); + } + + #[test] + fn thread_parameters_stack_size_succeeds() { + let mut thread_parameters_handle: FFIHandle = null_mut(); + + let _ = thread_parameters_create(&mut thread_parameters_handle as *mut FFIHandle); + + let thread_parameters_stack_size_result = thread_parameters_stack_size(thread_parameters_handle, 1024 * 1024); + assert_eq!(thread_parameters_stack_size_result, FFICode::Success); + + // Clean-up. + thread_parameters_destroy(thread_parameters_handle); + } + + #[test] + fn thread_parameters_stack_size_null_handle() { + let thread_parameters_stack_size_result = thread_parameters_stack_size(null_mut(), 1024 * 1024); + assert_eq!(thread_parameters_stack_size_result, FFICode::NullParameter); + } +} diff --git a/src/health_monitoring_lib/rust/worker.rs b/src/health_monitoring_lib/rust/worker.rs index afeda537..e8acbbec 100644 --- a/src/health_monitoring_lib/rust/worker.rs +++ b/src/health_monitoring_lib/rust/worker.rs @@ -18,6 +18,7 @@ use core::sync::atomic::{AtomicBool, Ordering}; use core::time::Duration; use std::sync::Arc; use std::time::Instant; +use thread::{spawn, JoinHandle, ThreadParameters}; pub(super) struct MonitoringLogic { monitors: FixedCapacityVec, @@ -91,17 +92,19 @@ impl MonitoringLogic { /// A struct that manages a unique thread for running monitoring logic periodically. pub struct UniqueThreadRunner { - handle: Option>, + handle: Option>, should_stop: Arc, internal_duration_cycle: Duration, + thread_parameters: ThreadParameters, } impl UniqueThreadRunner { - pub(super) fn new(internal_duration_cycle: Duration) -> Self { + pub(super) fn new(internal_duration_cycle: Duration, thread_parameters: ThreadParameters) -> Self { Self { handle: None, should_stop: Arc::new(AtomicBool::new(false)), internal_duration_cycle, + thread_parameters, } } @@ -113,27 +116,30 @@ impl UniqueThreadRunner { let should_stop = self.should_stop.clone(); let interval = self.internal_duration_cycle; - std::thread::spawn(move || { - info!("Monitoring thread started."); - let hmon_starting_point = Instant::now(); - let mut next_sleep_time = interval; + spawn( + move || { + info!("Monitoring thread started."); + let hmon_starting_point = Instant::now(); + let mut next_sleep_time = interval; - // TODO Add some checks and log if cyclicly here is not met. - while !should_stop.load(Ordering::Relaxed) { - std::thread::sleep(next_sleep_time); + // TODO Add some checks and log if cyclicly here is not met. + while !should_stop.load(Ordering::Relaxed) { + std::thread::sleep(next_sleep_time); - let now = Instant::now(); + let now = Instant::now(); - if !monitoring_logic.run(hmon_starting_point) { - info!("Monitoring logic failed, stopping thread."); - break; - } + if !monitoring_logic.run(hmon_starting_point) { + info!("Monitoring logic failed, stopping thread."); + break; + } - next_sleep_time = interval - now.elapsed(); - } + next_sleep_time = interval - now.elapsed(); + } - info!("Monitoring thread exiting."); - }) + info!("Monitoring thread exiting."); + }, + self.thread_parameters.clone(), + ) }); } @@ -179,6 +185,7 @@ mod tests { use core::time::Duration; use std::sync::Arc; use std::time::Instant; + use thread::ThreadParameters; #[derive(Clone)] struct MockSupervisorAPIClient { @@ -332,7 +339,8 @@ mod tests { alive_mock.clone(), ); - let mut worker = UniqueThreadRunner::new(Duration::from_millis(10)); + let thread_parameters = ThreadParameters::default(); + let mut worker = UniqueThreadRunner::new(Duration::from_millis(10), thread_parameters); worker.start(logic); let mut deadline = deadline_monitor