Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/cli/src/commands/binary_system/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ impl CliCommand for GetStatsCmd {
format!("{}", stats.consumer_groups_count).as_str(),
]);

table.add_row(vec![
"Threads Count",
format!("{}", stats.threads_count).as_str(),
]);
table.add_row(vec![
"Free Disk Space",
stats.free_disk_space.as_bytes_u64().to_string().as_str(),
]);
table.add_row(vec![
"Total Disk Space",
stats.total_disk_space.as_bytes_u64().to_string().as_str(),
]);

table.add_row(vec!["OS Name", stats.os_name.as_str()]);
table.add_row(vec!["OS Version", stats.os_version.as_str()]);
table.add_row(vec!["Kernel Version", stats.kernel_version.as_str()]);
Expand Down Expand Up @@ -208,6 +221,16 @@ impl CliCommand for GetStatsCmd {
stats.consumer_groups_count
));

list.push(format!("Threads Count|{}", stats.threads_count));
list.push(format!(
"Free Disk Space|{}",
stats.free_disk_space.as_bytes_u64()
));
list.push(format!(
"Total Disk Space|{}",
stats.total_disk_space.as_bytes_u64()
));

list.push(format!("OS Name|{}", stats.os_name));
list.push(format!("OS Version|{}", stats.os_version));
list.push(format!("Kernel Version|{}", stats.kernel_version));
Expand Down
38 changes: 38 additions & 0 deletions core/common/src/traits/binary_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,41 @@ pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
}
}

let mut threads_count = 0u32;
if current_position + 4 <= payload.len() {
threads_count = u32::from_le_bytes(
payload[current_position..current_position + 4]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
);
current_position += 4;
}

let mut free_disk_space: IggyByteSize = 0.into();
if current_position + 8 <= payload.len() {
free_disk_space = u64::from_le_bytes(
payload[current_position..current_position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
current_position += 8;
}

let mut total_disk_space: IggyByteSize = 0.into();
if current_position + 8 <= payload.len() {
total_disk_space = u64::from_le_bytes(
payload[current_position..current_position + 8]
.try_into()
.map_err(|_| IggyError::InvalidNumberEncoding)?,
)
.into();
#[allow(unused_assignments)]
{
current_position += 8;
}
}

Ok(Stats {
process_id,
cpu_usage,
Expand All @@ -366,6 +401,9 @@ pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
iggy_server_version,
iggy_server_semver,
cache_metrics,
threads_count,
free_disk_space,
total_disk_space,
})
}

Expand Down
9 changes: 9 additions & 0 deletions core/common/src/types/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ pub struct Stats {
/// Cache metrics per partition
#[serde(with = "cache_metrics_serializer")]
pub cache_metrics: HashMap<CacheMetricsKey, CacheMetrics>,
/// The number of threads in the server process.
pub threads_count: u32,
/// The available (free) disk space for the data directory.
pub free_disk_space: IggyByteSize,
/// The total disk space for the data directory.
pub total_disk_space: IggyByteSize,
}

/// Key for identifying a specific partition's cache metrics
Expand Down Expand Up @@ -181,6 +187,9 @@ impl Default for Stats {
iggy_server_version: "unknown_iggy_version".to_string(),
iggy_server_semver: None,
cache_metrics: HashMap::new(),
threads_count: 0,
free_disk_space: 0.into(),
total_disk_space: 0.into(),
}
}
}
130 changes: 126 additions & 4 deletions core/integration/tests/cli/system/test_stats_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ use async_trait::async_trait;
use iggy::prelude::Client;
use iggy::prelude::Identifier;
use iggy::prelude::IggyExpiry;
use iggy::prelude::IggyMessage;
use iggy::prelude::MaxTopicSize;
use iggy::prelude::Partitioning;
use iggy_cli::commands::binary_system::stats::GetStatsOutput;
use iggy_common::Stats;
use predicates::str::{contains, starts_with};
use serial_test::parallel;
use std::str::FromStr;

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum TestStatsCmdOutput {
Expand Down Expand Up @@ -97,7 +101,10 @@ impl IggyCmdTestCase for TestStatsCmd {
.stdout(contains("Segments Count | 5"))
.stdout(contains("Message Count | 0"))
// Note: Client count can vary due to connection lifecycle; at least 2 expected
.stdout(contains("Consumer Groups Count | 0"));
.stdout(contains("Consumer Groups Count | 0"))
.stdout(contains("Threads Count"))
.stdout(contains("Free Disk Space"))
.stdout(contains("Total Disk Space"));
}
TestStatsCmdOutput::Set(GetStatsOutput::List) => {
command_state
Expand All @@ -107,7 +114,10 @@ impl IggyCmdTestCase for TestStatsCmd {
.stdout(contains("Partitions Count|5"))
.stdout(contains("Segments Count|5"))
.stdout(contains("Message Count|0"))
.stdout(contains("Consumer Groups Count|0"));
.stdout(contains("Consumer Groups Count|0"))
.stdout(contains("Threads Count|"))
.stdout(contains("Free Disk Space|"))
.stdout(contains("Total Disk Space|"));
}
TestStatsCmdOutput::Set(GetStatsOutput::Json) => {
command_state
Expand All @@ -117,7 +127,10 @@ impl IggyCmdTestCase for TestStatsCmd {
.stdout(contains(r#""partitions_count": 5"#))
.stdout(contains(r#""segments_count": 5"#))
.stdout(contains(r#""messages_count": 0"#))
.stdout(contains(r#""consumer_groups_count": 0"#));
.stdout(contains(r#""consumer_groups_count": 0"#))
.stdout(contains(r#""threads_count":"#))
.stdout(contains(r#""free_disk_space":"#))
.stdout(contains(r#""total_disk_space":"#));
}
TestStatsCmdOutput::Set(GetStatsOutput::Toml) => {
command_state
Expand All @@ -127,7 +140,10 @@ impl IggyCmdTestCase for TestStatsCmd {
.stdout(contains("partitions_count = 5"))
.stdout(contains("segments_count = 5"))
.stdout(contains("messages_count = 0"))
.stdout(contains("consumer_groups_count = 0"));
.stdout(contains("consumer_groups_count = 0"))
.stdout(contains("threads_count ="))
.stdout(contains("free_disk_space ="))
.stdout(contains("total_disk_space ="));
}
}
}
Expand All @@ -143,6 +159,109 @@ impl IggyCmdTestCase for TestStatsCmd {
}
}

struct TestStatsCmdWithMessages {
stream_id: u32,
topic_id: u32,
}

impl TestStatsCmdWithMessages {
fn new() -> Self {
Self {
stream_id: 0,
topic_id: 0,
}
}
}

#[async_trait]
impl IggyCmdTestCase for TestStatsCmdWithMessages {
async fn prepare_server_state(&mut self, client: &dyn Client) {
let stream = client.create_stream("size-test").await;
assert!(stream.is_ok());
let stream_details = stream.unwrap();
self.stream_id = stream_details.id;

let topic = client
.create_topic(
&self.stream_id.try_into().unwrap(),
"topic",
1,
Default::default(),
None,
IggyExpiry::NeverExpire,
MaxTopicSize::ServerDefault,
)
.await;
assert!(topic.is_ok());
let topic_details = topic.unwrap();
self.topic_id = topic_details.id;

let mut messages = (1..=10)
.filter_map(|id| IggyMessage::from_str(format!("Test message {id}").as_str()).ok())
.collect::<Vec<_>>();
let send_status = client
.send_messages(
&self.stream_id.try_into().unwrap(),
&self.topic_id.try_into().unwrap(),
&Partitioning::default(),
&mut messages,
)
.await;
assert!(send_status.is_ok());
}

fn get_command(&self) -> IggyCmdCommand {
IggyCmdCommand::new()
.arg("stats")
.arg("-o")
.arg("json")
.opt("-q")
.with_env_credentials()
}

fn verify_command(&self, command_state: Assert) {
let assert = command_state.success();
let stdout = String::from_utf8_lossy(&assert.get_output().stdout);
let stats: Stats =
serde_json::from_str(&stdout).expect("Failed to parse stats JSON output");

assert!(
stats.messages_count > 0,
"messages_count should be > 0 after sending messages"
);
assert!(
stats.messages_size_bytes.as_bytes_u64() > 0,
"messages_size_bytes should be > 0 after sending messages"
);
assert!(
stats.free_disk_space.as_bytes_u64() > 0,
"free_disk_space should be > 0"
);
assert!(
stats.total_disk_space.as_bytes_u64() > 0,
"total_disk_space should be > 0"
);
assert!(
stats.free_disk_space.as_bytes_u64() <= stats.total_disk_space.as_bytes_u64(),
"free_disk_space should be <= total_disk_space"
);
}

async fn verify_server_state(&self, client: &dyn Client) {
client
.delete_topic(
&self.stream_id.try_into().unwrap(),
&self.topic_id.try_into().unwrap(),
)
.await
.unwrap();
client
.delete_stream(&self.stream_id.try_into().unwrap())
.await
.unwrap();
}
}

#[tokio::test]
#[parallel]
pub async fn should_be_successful() {
Expand Down Expand Up @@ -172,6 +291,9 @@ pub async fn should_be_successful() {
GetStatsOutput::Toml,
)))
.await;
iggy_cmd_test
.execute_test(TestStatsCmdWithMessages::new())
.await;
}

#[tokio::test]
Expand Down
4 changes: 4 additions & 0 deletions core/server/src/binary/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ pub fn map_stats(stats: &Stats) -> Bytes {
bytes.put_f32_le(metrics.hit_ratio);
}

bytes.put_u32_le(stats.threads_count);
bytes.put_u64_le(stats.free_disk_space.as_bytes_u64());
bytes.put_u64_le(stats.total_disk_space.as_bytes_u64());

bytes.freeze()
}

Expand Down
21 changes: 21 additions & 0 deletions core/server/src/shard/system/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ impl IggyShard {
let disk_usage = process.disk_usage();
stats.read_bytes = disk_usage.total_read_bytes.into();
stats.written_bytes = disk_usage.total_written_bytes.into();

stats.threads_count = process.tasks().map(|t| t.len() as u32).unwrap_or(0);
}

let (streams_count, topics_count, partitions_count, consumer_groups_count, stream_ids) =
Expand Down Expand Up @@ -118,6 +120,25 @@ impl IggyShard {
}
}

match fs2::available_space(&self.config.system.path) {
Ok(space) => stats.free_disk_space = space.into(),
Err(err) => {
tracing::warn!(
"Failed to get available disk space for '{}': {err}",
self.config.system.path
);
}
}
match fs2::total_space(&self.config.system.path) {
Ok(space) => stats.total_disk_space = space.into(),
Err(err) => {
tracing::warn!(
"Failed to get total disk space for '{}': {err}",
self.config.system.path
);
}
}

Ok(stats)
})
}
Expand Down
12 changes: 11 additions & 1 deletion core/server/src/shard/tasks/periodic/sysinfo_printer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,34 @@ async fn print_sysinfo(shard: Rc<IggyShard>) -> Result<(), IggyError> {
/ stats.total_memory.as_bytes_u64() as f64)
* 100f64;

let threads_info = if stats.threads_count > 0 {
format!(", Threads: {}", stats.threads_count)
} else {
String::new()
};

let open_files_info = if let Some(open_files) = get_open_file_descriptors() {
format!(", OpenFDs: {}", open_files)
} else {
String::new()
};

info!(
"CPU: {:.2}%/{:.2}% (IggyUsage/Total), Mem: {:.2}%/{}/{}/{} (Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages: {}, Read: {}, Written: {}{}",
"CPU: {:.2}%/{:.2}% (IggyUsage/Total), Mem: {:.2}%/{}/{}/{} (Free/IggyUsage/TotalUsed/Total), Disk: {}/{} (Free/Total), IggyUsage: {}, Clients: {}, Messages: {}, Read: {}, Written: {}{}{}",
stats.cpu_usage,
stats.total_cpu_usage,
free_memory_percent,
stats.memory_usage,
stats.total_memory - stats.available_memory,
stats.total_memory,
stats.free_disk_space,
stats.total_disk_space,
stats.messages_size_bytes,
stats.clients_count.human_count_bare().to_string(),
stats.messages_count.human_count_bare().to_string(),
stats.read_bytes,
stats.written_bytes,
threads_info,
open_files_info,
);

Expand Down
Loading