From 7cc5504410c2b3a9934bee9467a5005cd97a7979 Mon Sep 17 00:00:00 2001 From: grainier Date: Sat, 14 Mar 2026 07:23:41 +0530 Subject: [PATCH 1/3] refactor(repo): wrap credentials in SecretString to prevent leaks --- Cargo.lock | 14 ++++ Cargo.toml | 1 + core/cli/Cargo.toml | 1 + .../src/commands/binary_users/create_user.rs | 18 ++-- core/cli/src/credentials.rs | 43 ++++++---- core/common/Cargo.toml | 1 + .../login_with_personal_access_token.rs | 44 +++++++--- .../src/commands/users/change_password.rs | 82 +++++++++++++------ core/common/src/commands/users/create_user.rs | 59 ++++++++++--- core/common/src/commands/users/login_user.rs | 65 +++++++++++---- core/common/src/lib.rs | 1 + .../binary_impls/personal_access_tokens.rs | 3 +- core/common/src/traits/binary_impls/users.rs | 9 +- .../configuration/auth_config/auto_login.rs | 2 +- .../auth_config/connection_string.rs | 44 +++++----- .../configuration/auth_config/credentials.rs | 25 +++++- .../permissions/personal_access_token.rs | 11 ++- .../src/types/user/user_identity_info.rs | 12 ++- core/common/src/utils/mod.rs | 1 + core/common/src/utils/serde_secret.rs | 53 ++++++++++++ core/connectors/runtime/Cargo.toml | 1 + core/connectors/runtime/src/api/auth.rs | 5 +- core/connectors/runtime/src/api/config.rs | 19 ++++- core/connectors/runtime/src/context.rs | 5 +- .../sinks/elasticsearch_sink/Cargo.toml | 1 + .../sinks/elasticsearch_sink/src/lib.rs | 7 +- core/connectors/sinks/mongodb_sink/Cargo.toml | 2 + core/connectors/sinks/mongodb_sink/src/lib.rs | 10 ++- .../connectors/sinks/postgres_sink/Cargo.toml | 1 + .../connectors/sinks/postgres_sink/src/lib.rs | 10 ++- .../sources/elasticsearch_source/Cargo.toml | 1 + .../sources/elasticsearch_source/src/lib.rs | 7 +- .../sources/postgres_source/Cargo.toml | 1 + .../sources/postgres_source/src/lib.rs | 10 ++- core/integration/Cargo.toml | 1 + .../cli/user/test_user_create_command.rs | 8 +- .../server/scenarios/purge_delete_scenario.rs | 3 +- .../stale_client_consumer_group_scenario.rs | 3 +- core/integration/tests/state/file.rs | 7 +- core/integration/tests/state/system.rs | 10 ++- core/metadata/Cargo.toml | 1 + core/metadata/src/stm/user.rs | 5 +- core/sdk/Cargo.toml | 1 + core/sdk/src/client_provider.rs | 7 +- core/sdk/src/http/personal_access_tokens.rs | 3 +- core/sdk/src/http/users.rs | 9 +- core/sdk/src/quic/quic_client.rs | 44 +++++----- core/sdk/src/tcp/tcp_client.rs | 44 +++++----- core/sdk/src/websocket/websocket_client.rs | 6 +- core/server/Cargo.toml | 1 + core/server/src/binary/command.rs | 8 +- ...ogin_with_personal_access_token_handler.rs | 11 +-- .../handlers/users/login_user_handler.rs | 3 +- core/server/src/binary/macros.rs | 2 +- .../server/src/http/personal_access_tokens.rs | 3 +- core/server/src/http/users.rs | 3 +- core/server/src/shard/execution.rs | 17 ++-- core/server/src/state/command.rs | 2 +- core/server/src/state/models.rs | 2 +- core/server/src/state/system.rs | 7 +- 60 files changed, 545 insertions(+), 235 deletions(-) create mode 100644 core/common/src/utils/serde_secret.rs diff --git a/Cargo.lock b/Cargo.lock index 3afb4cadd3..faa885be49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5143,6 +5143,7 @@ dependencies = [ "reqwest-retry", "reqwest-tracing", "rustls", + "secrecy", "serde", "tokio", "tokio-rustls", @@ -5227,6 +5228,7 @@ dependencies = [ "iggy_common", "keyring", "passterm", + "secrecy", "serde", "serde_json", "thiserror 2.0.18", @@ -5272,6 +5274,7 @@ dependencies = [ "reqwest-middleware", "reqwest-retry", "reqwest-tracing", + "secrecy", "serde", "serde_json", "serde_with", @@ -5356,6 +5359,7 @@ dependencies = [ "rcgen", "ring", "rustls", + "secrecy", "serde", "serde_json", "serde_with", @@ -5381,6 +5385,7 @@ dependencies = [ "iggy_common", "iggy_connector_sdk", "once_cell", + "secrecy", "serde", "serde_json", "simd-json", @@ -5399,6 +5404,7 @@ dependencies = [ "iggy_common", "iggy_connector_sdk", "once_cell", + "secrecy", "serde", "serde_json", "simd-json", @@ -5431,8 +5437,10 @@ version = "0.3.0" dependencies = [ "async-trait", "humantime", + "iggy_common", "iggy_connector_sdk", "mongodb", + "secrecy", "serde", "serde_json", "tokio", @@ -5450,6 +5458,7 @@ dependencies = [ "iggy_common", "iggy_connector_sdk", "once_cell", + "secrecy", "serde", "serde_json", "simd-json", @@ -5470,6 +5479,7 @@ dependencies = [ "iggy_common", "iggy_connector_sdk", "once_cell", + "secrecy", "serde", "serde_json", "simd-json", @@ -5767,6 +5777,7 @@ dependencies = [ "reqwest-middleware", "reqwest-retry", "rmcp", + "secrecy", "serde", "serde_json", "serial_test", @@ -6572,6 +6583,7 @@ dependencies = [ "message_bus", "paste", "rmp-serde", + "secrecy", "serde", "slab", "tracing", @@ -9331,6 +9343,7 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a" dependencies = [ + "serde", "zeroize", ] @@ -9656,6 +9669,7 @@ dependencies = [ "rust-embed", "rustls", "rustls-pemfile", + "secrecy", "send_wrapper", "serde", "slab", diff --git a/Cargo.toml b/Cargo.toml index b3ea5a2bf6..5392514369 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -237,6 +237,7 @@ rust-embed = "8.11.0" rust-s3 = { version = "0.37.1", default-features = false, features = ["tokio-rustls-tls", "tags"] } rustls = { version = "0.23.37", features = ["ring"] } rustls-pemfile = "2.2.0" +secrecy = { version = "0.10", features = ["serde"] } send_wrapper = "0.6.0" serde = { version = "1.0.228", features = ["derive", "rc"] } serde_json = "1.0.149" diff --git a/core/cli/Cargo.toml b/core/cli/Cargo.toml index d2bbca3581..cbd35a6251 100644 --- a/core/cli/Cargo.toml +++ b/core/cli/Cargo.toml @@ -58,6 +58,7 @@ iggy = { workspace = true } iggy_common = { workspace = true } keyring = { workspace = true, optional = true } passterm = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/core/cli/src/commands/binary_users/create_user.rs b/core/cli/src/commands/binary_users/create_user.rs index 1a25459b77..a70d6b9deb 100644 --- a/core/cli/src/commands/binary_users/create_user.rs +++ b/core/cli/src/commands/binary_users/create_user.rs @@ -23,6 +23,7 @@ use iggy_common::Client; use iggy_common::Permissions; use iggy_common::UserStatus; use iggy_common::create_user::CreateUser; +use secrecy::{ExposeSecret, SecretString}; use tracing::{Level, event}; pub struct CreateUserCmd { @@ -39,7 +40,7 @@ impl CreateUserCmd { Self { create_user: CreateUser { username, - password, + password: SecretString::from(password), status, permissions, }, @@ -50,31 +51,28 @@ impl CreateUserCmd { #[async_trait] impl CliCommand for CreateUserCmd { fn explain(&self) -> String { - format!( - "create user with username: {} and password: {}", - self.create_user.username, self.create_user.password - ) + format!("create user with username: {}", self.create_user.username) } async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> { client .create_user( &self.create_user.username, - &self.create_user.password, + self.create_user.password.expose_secret(), self.create_user.status, self.create_user.permissions.clone(), ) .await .with_context(|| { format!( - "Problem creating user (username: {} and password: {})", - self.create_user.username, self.create_user.password + "Problem creating user (username: {})", + self.create_user.username ) })?; event!(target: PRINT_TARGET, Level::INFO, - "User with username: {} and password: {} created", - self.create_user.username, self.create_user.password + "User with username: {} created", + self.create_user.username ); Ok(()) diff --git a/core/cli/src/credentials.rs b/core/cli/src/credentials.rs index 4760218d39..0a92746c84 100644 --- a/core/cli/src/credentials.rs +++ b/core/cli/src/credentials.rs @@ -23,6 +23,7 @@ use iggy::clients::client::IggyClient; use iggy::prelude::{Args, IggyError, PersonalAccessTokenClient, UserClient}; use iggy_cli::commands::binary_system::session::ServerSession; use passterm::{Stream, isatty, prompt_password_stdin, prompt_password_tty}; +use secrecy::{ExposeSecret, SecretString}; use std::env::var; #[cfg(feature = "login-session")] @@ -40,13 +41,13 @@ static ENV_IGGY_PASSWORD: &str = "IGGY_PASSWORD"; struct IggyUserClient { username: String, - password: String, + password: SecretString, } enum Credentials { UserNameAndPassword(IggyUserClient), - PersonalAccessToken(String), - SessionWithToken(String, String), + PersonalAccessToken(SecretString), + SessionWithToken(SecretString, String), } pub(crate) struct IggyCredentials<'a> { @@ -73,7 +74,10 @@ impl<'a> IggyCredentials<'a> { let server_session = ServerSession::new(server_address.clone()); if let Some(token) = server_session.get_token() { return Ok(Self { - credentials: Some(Credentials::SessionWithToken(token, server_address)), + credentials: Some(Credentials::SessionWithToken( + SecretString::from(token), + server_address, + )), iggy_client: None, login_required, }); @@ -91,7 +95,9 @@ impl<'a> IggyCredentials<'a> { let token = entry.get_password()?; Ok(Self { - credentials: Some(Credentials::PersonalAccessToken(token)), + credentials: Some(Credentials::PersonalAccessToken(SecretString::from( + token, + ))), iggy_client: None, login_required, }) @@ -102,19 +108,22 @@ impl<'a> IggyCredentials<'a> { if let Some(token) = &cli_options.token { Ok(Self { - credentials: Some(Credentials::PersonalAccessToken(token.clone())), + credentials: Some(Credentials::PersonalAccessToken(SecretString::from( + token.clone(), + ))), iggy_client: None, login_required, }) } else if let Some(username) = &cli_options.username { let password = match &cli_options.password { - Some(password) => password.clone(), + Some(password) => SecretString::from(password.clone()), None => { - if isatty(Stream::Stdin) { + let pwd = if isatty(Stream::Stdin) { prompt_password_tty(Some("Password: "))? } else { prompt_password_stdin(None, Stream::Stdout)? - } + }; + SecretString::from(pwd) } }; @@ -130,7 +139,7 @@ impl<'a> IggyCredentials<'a> { Ok(Self { credentials: Some(Credentials::UserNameAndPassword(IggyUserClient { username: var(ENV_IGGY_USERNAME)?, - password: var(ENV_IGGY_PASSWORD)?, + password: SecretString::from(var(ENV_IGGY_PASSWORD)?), })), iggy_client: None, login_required, @@ -154,7 +163,7 @@ impl<'a> IggyCredentials<'a> { let _ = client .login_user( &username_and_password.username, - &username_and_password.password, + username_and_password.password.expose_secret(), ) .await .with_context(|| { @@ -166,14 +175,14 @@ impl<'a> IggyCredentials<'a> { } Credentials::PersonalAccessToken(token_value) => { let _ = client - .login_with_personal_access_token(token_value) + .login_with_personal_access_token(token_value.expose_secret()) .await - .with_context(|| { - format!("Problem with server login with token: {token_value}") - })?; + .with_context(|| "Problem with server login with token".to_string())?; } Credentials::SessionWithToken(token_value, server_address) => { - let login_result = client.login_with_personal_access_token(token_value).await; + let login_result = client + .login_with_personal_access_token(token_value.expose_secret()) + .await; if let Err(err) = login_result { if matches!( err, @@ -187,7 +196,7 @@ impl<'a> IggyCredentials<'a> { "Login session expired for Iggy server: {server_address}, please login again or use other authentication method" ); } else { - bail!("Problem with server login with token: {token_value}"); + bail!("Problem with server login with token"); } } } diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 0e1d0bd1c3..74d9dab1e9 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -55,6 +55,7 @@ papaya = { workspace = true } rcgen = { workspace = true } ring = { workspace = true } rustls = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } diff --git a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs index 59fafa9c1a..73dedb34b6 100644 --- a/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs +++ b/core/common/src/commands/personal_access_tokens/login_with_personal_access_token.rs @@ -22,6 +22,7 @@ use crate::defaults::*; use crate::error::IggyError; use crate::{Command, LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE}; use bytes::{BufMut, Bytes, BytesMut}; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; use std::str::from_utf8; @@ -29,10 +30,11 @@ use std::str::from_utf8; /// `LoginWithPersonalAccessToken` command is used to login the user with a personal access token, instead of the username and password. /// It has additional payload: /// - `token` - personal access token -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize)] pub struct LoginWithPersonalAccessToken { /// Personal access token - pub token: String, + #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")] + pub token: SecretString, } impl Command for LoginWithPersonalAccessToken { @@ -44,14 +46,15 @@ impl Command for LoginWithPersonalAccessToken { impl Default for LoginWithPersonalAccessToken { fn default() -> Self { LoginWithPersonalAccessToken { - token: "token".to_string(), + token: SecretString::from("token"), } } } impl Validatable for LoginWithPersonalAccessToken { fn validate(&self) -> Result<(), IggyError> { - if self.token.is_empty() || self.token.len() > MAX_PAT_LENGTH { + let token = self.token.expose_secret(); + if token.is_empty() || token.len() > MAX_PAT_LENGTH { return Err(IggyError::InvalidPersonalAccessToken); } @@ -61,10 +64,11 @@ impl Validatable for LoginWithPersonalAccessToken { impl BytesSerializable for LoginWithPersonalAccessToken { fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(5 + self.token.len()); + let token = self.token.expose_secret(); + let mut bytes = BytesMut::with_capacity(5 + token.len()); #[allow(clippy::cast_possible_truncation)] - bytes.put_u8(self.token.len() as u8); - bytes.put_slice(self.token.as_bytes()); + bytes.put_u8(token.len() as u8); + bytes.put_slice(token.as_bytes()); bytes.freeze() } @@ -82,14 +86,16 @@ impl BytesSerializable for LoginWithPersonalAccessToken { .map_err(|_| IggyError::InvalidUtf8)? .to_string(); - let command = LoginWithPersonalAccessToken { token }; + let command = LoginWithPersonalAccessToken { + token: SecretString::from(token), + }; Ok(command) } } impl Display for LoginWithPersonalAccessToken { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.token) + write!(f, "******") } } @@ -100,14 +106,14 @@ mod tests { #[test] fn should_be_serialized_as_bytes() { let command = LoginWithPersonalAccessToken { - token: "test".to_string(), + token: SecretString::from("test"), }; let bytes = command.to_bytes(); let token_length = bytes[0]; let token = from_utf8(&bytes[1..1 + token_length as usize]).unwrap(); assert!(!bytes.is_empty()); - assert_eq!(token, command.token); + assert_eq!(token, command.token.expose_secret()); } #[test] @@ -148,6 +154,20 @@ mod tests { assert!(command.is_ok()); let command = command.unwrap(); - assert_eq!(command.token, token); + assert_eq!(command.token.expose_secret(), token); + } + + #[test] + fn should_fail_validation_for_empty_token() { + let command = LoginWithPersonalAccessToken { + token: SecretString::from(""), + }; + assert!(command.validate().is_err()); + } + + #[test] + fn should_pass_validation_for_valid_token() { + let command = LoginWithPersonalAccessToken::default(); + assert!(command.validate().is_ok()); } } diff --git a/core/common/src/commands/users/change_password.rs b/core/common/src/commands/users/change_password.rs index ba8e251db3..9d5272433e 100644 --- a/core/common/src/commands/users/change_password.rs +++ b/core/common/src/commands/users/change_password.rs @@ -24,6 +24,7 @@ use crate::Validatable; use crate::error::IggyError; use crate::{CHANGE_PASSWORD_CODE, Command}; use bytes::{BufMut, Bytes, BytesMut}; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use std::fmt::Display; use std::str::from_utf8; @@ -33,15 +34,17 @@ use std::str::from_utf8; /// - `user_id` - unique user ID (numeric or name). /// - `current_password` - current password, must be between 3 and 100 characters long. /// - `new_password` - new password, must be between 3 and 100 characters long. -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct ChangePassword { /// Unique user ID (numeric or name). #[serde(skip)] pub user_id: Identifier, /// Current password, must be between 3 and 100 characters long. - pub current_password: String, + #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")] + pub current_password: SecretString, /// New password, must be between 3 and 100 characters long. - pub new_password: String, + #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")] + pub new_password: SecretString, } impl Command for ChangePassword { @@ -54,24 +57,26 @@ impl Default for ChangePassword { fn default() -> Self { ChangePassword { user_id: Identifier::default(), - current_password: "secret".to_string(), - new_password: "topsecret".to_string(), + current_password: SecretString::from("secret"), + new_password: SecretString::from("topsecret"), } } } impl Validatable for ChangePassword { fn validate(&self) -> Result<(), IggyError> { - if self.current_password.is_empty() - || self.current_password.len() > MAX_PASSWORD_LENGTH - || self.current_password.len() < MIN_PASSWORD_LENGTH + let current_password = self.current_password.expose_secret(); + if current_password.is_empty() + || current_password.len() > MAX_PASSWORD_LENGTH + || current_password.len() < MIN_PASSWORD_LENGTH { return Err(IggyError::InvalidPassword); } - if self.new_password.is_empty() - || self.new_password.len() > MAX_PASSWORD_LENGTH - || self.new_password.len() < MIN_PASSWORD_LENGTH + let new_password = self.new_password.expose_secret(); + if new_password.is_empty() + || new_password.len() > MAX_PASSWORD_LENGTH + || new_password.len() < MIN_PASSWORD_LENGTH { return Err(IggyError::InvalidPassword); } @@ -83,14 +88,16 @@ impl Validatable for ChangePassword { impl BytesSerializable for ChangePassword { fn to_bytes(&self) -> Bytes { let user_id_bytes = self.user_id.to_bytes(); + let current_password = self.current_password.expose_secret(); + let new_password = self.new_password.expose_secret(); let mut bytes = BytesMut::new(); bytes.put_slice(&user_id_bytes); #[allow(clippy::cast_possible_truncation)] - bytes.put_u8(self.current_password.len() as u8); - bytes.put_slice(self.current_password.as_bytes()); + bytes.put_u8(current_password.len() as u8); + bytes.put_slice(current_password.as_bytes()); #[allow(clippy::cast_possible_truncation)] - bytes.put_u8(self.new_password.len() as u8); - bytes.put_slice(self.new_password.as_bytes()); + bytes.put_u8(new_password.len() as u8); + bytes.put_slice(new_password.as_bytes()); bytes.freeze() } @@ -123,8 +130,8 @@ impl BytesSerializable for ChangePassword { let command = ChangePassword { user_id, - current_password, - new_password, + current_password: SecretString::from(current_password), + new_password: SecretString::from(new_password), }; Ok(command) } @@ -144,8 +151,8 @@ mod tests { fn should_be_serialized_as_bytes() { let command = ChangePassword { user_id: Identifier::numeric(1).unwrap(), - current_password: "user".to_string(), - new_password: "secret".to_string(), + current_password: SecretString::from("user"), + new_password: SecretString::from("secret"), }; let bytes = command.to_bytes(); @@ -163,8 +170,8 @@ mod tests { assert!(!bytes.is_empty()); assert_eq!(user_id, command.user_id); - assert_eq!(current_password, command.current_password); - assert_eq!(new_password, command.new_password); + assert_eq!(current_password, command.current_password.expose_secret()); + assert_eq!(new_password, command.new_password.expose_secret()); } #[test] @@ -204,7 +211,36 @@ mod tests { let command = command.unwrap(); assert_eq!(command.user_id, user_id); - assert_eq!(command.current_password, current_password); - assert_eq!(command.new_password, new_password); + assert_eq!(command.current_password.expose_secret(), current_password); + assert_eq!(command.new_password.expose_secret(), new_password); + } + + #[test] + fn should_fail_validation_for_invalid_current_password() { + for password in ["", "ab"] { + let command = ChangePassword { + current_password: SecretString::from(password), + ..ChangePassword::default() + }; + assert!( + command.validate().is_err(), + "expected validation error for current_password: {password:?}" + ); + } + } + + #[test] + fn should_fail_validation_for_empty_new_password() { + let command = ChangePassword { + new_password: SecretString::from(""), + ..ChangePassword::default() + }; + assert!(command.validate().is_err()); + } + + #[test] + fn should_pass_validation_for_valid_command() { + let command = ChangePassword::default(); + assert!(command.validate().is_ok()); } } diff --git a/core/common/src/commands/users/create_user.rs b/core/common/src/commands/users/create_user.rs index 6aa510ad6a..3405bd486f 100644 --- a/core/common/src/commands/users/create_user.rs +++ b/core/common/src/commands/users/create_user.rs @@ -24,6 +24,7 @@ use crate::Validatable; use crate::error::IggyError; use crate::{CREATE_USER_CODE, Command}; use bytes::{BufMut, Bytes, BytesMut}; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use std::fmt::Display; use std::str::from_utf8; @@ -34,12 +35,13 @@ use std::str::from_utf8; /// - `password` - password of the user, must be between 3 and 100 characters long. /// - `status` - status of the user, can be either `active` or `inactive`. /// - `permissions` - optional permissions of the user. If not provided, user will have no permissions. -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct CreateUser { /// Unique name of the user, must be between 3 and 50 characters long. pub username: String, /// Password of the user, must be between 3 and 100 characters long. - pub password: String, + #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")] + pub password: SecretString, /// Status of the user, can be either `active` or `inactive`. pub status: UserStatus, /// Optional permissions of the user. If not provided, user will have no permissions. @@ -56,7 +58,7 @@ impl Default for CreateUser { fn default() -> Self { CreateUser { username: "user".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), status: UserStatus::Active, permissions: None, } @@ -72,9 +74,10 @@ impl Validatable for CreateUser { return Err(IggyError::InvalidUsername); } - if self.password.is_empty() - || self.password.len() > MAX_PASSWORD_LENGTH - || self.password.len() < MIN_PASSWORD_LENGTH + let password = self.password.expose_secret(); + if password.is_empty() + || password.len() > MAX_PASSWORD_LENGTH + || password.len() < MIN_PASSWORD_LENGTH { return Err(IggyError::InvalidPassword); } @@ -85,13 +88,14 @@ impl Validatable for CreateUser { impl BytesSerializable for CreateUser { fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(2 + self.username.len() + self.password.len()); + let password = self.password.expose_secret(); + let mut bytes = BytesMut::with_capacity(2 + self.username.len() + password.len()); #[allow(clippy::cast_possible_truncation)] bytes.put_u8(self.username.len() as u8); bytes.put_slice(self.username.as_bytes()); #[allow(clippy::cast_possible_truncation)] - bytes.put_u8(self.password.len() as u8); - bytes.put_slice(self.password.as_bytes()); + bytes.put_u8(password.len() as u8); + bytes.put_slice(password.as_bytes()); bytes.put_u8(self.status.as_code()); if let Some(permissions) = &self.permissions { bytes.put_u8(1); @@ -161,7 +165,7 @@ impl BytesSerializable for CreateUser { let command = CreateUser { username, - password, + password: SecretString::from(password), status, permissions, }; @@ -193,7 +197,7 @@ mod tests { fn should_be_serialized_as_bytes() { let command = CreateUser { username: "user".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), status: UserStatus::Active, permissions: Some(Permissions { global: GlobalPermissions { @@ -234,7 +238,7 @@ mod tests { assert!(!bytes.is_empty()); assert_eq!(username, command.username); - assert_eq!(password, command.password); + assert_eq!(password, command.password.expose_secret()); assert_eq!(status, command.status); assert_eq!(has_permissions, 1); assert_eq!(permissions, command.permissions.unwrap()); @@ -306,9 +310,38 @@ mod tests { let command = command.unwrap(); assert_eq!(command.username, username); - assert_eq!(command.password, password); + assert_eq!(command.password.expose_secret(), password); assert_eq!(command.status, status); assert!(command.permissions.is_some()); assert_eq!(command.permissions.unwrap(), permissions); } + + #[test] + fn should_fail_validation_for_empty_username() { + let command = CreateUser { + username: "".to_string(), + ..CreateUser::default() + }; + assert!(command.validate().is_err()); + } + + #[test] + fn should_fail_validation_for_invalid_password() { + for password in ["", "ab"] { + let command = CreateUser { + password: SecretString::from(password), + ..CreateUser::default() + }; + assert!( + command.validate().is_err(), + "expected validation error for password: {password:?}" + ); + } + } + + #[test] + fn should_pass_validation_for_valid_command() { + let command = CreateUser::default(); + assert!(command.validate().is_ok()); + } } diff --git a/core/common/src/commands/users/login_user.rs b/core/common/src/commands/users/login_user.rs index 9fe907f115..ce729d1c04 100644 --- a/core/common/src/commands/users/login_user.rs +++ b/core/common/src/commands/users/login_user.rs @@ -22,6 +22,7 @@ use crate::Validatable; use crate::error::IggyError; use crate::{Command, LOGIN_USER_CODE}; use bytes::{BufMut, Bytes, BytesMut}; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use std::fmt::Display; use std::str::from_utf8; @@ -30,12 +31,13 @@ use std::str::from_utf8; /// It has additional payload: /// - `username` - username, must be between 3 and 50 characters long. /// - `password` - password, must be between 3 and 100 characters long. -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize)] pub struct LoginUser { /// Username, must be between 3 and 50 characters long. pub username: String, /// Password, must be between 3 and 100 characters long. - pub password: String, + #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")] + pub password: SecretString, // Version metadata added by SDK. pub version: Option, // Context metadata added by SDK. @@ -52,7 +54,7 @@ impl Default for LoginUser { fn default() -> Self { LoginUser { username: "user".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), version: None, context: None, } @@ -68,9 +70,10 @@ impl Validatable for LoginUser { return Err(IggyError::InvalidUsername); } - if self.password.is_empty() - || self.password.len() > MAX_PASSWORD_LENGTH - || self.password.len() < MIN_PASSWORD_LENGTH + let password = self.password.expose_secret(); + if password.is_empty() + || password.len() > MAX_PASSWORD_LENGTH + || password.len() < MIN_PASSWORD_LENGTH { return Err(IggyError::InvalidPassword); } @@ -81,13 +84,14 @@ impl Validatable for LoginUser { impl BytesSerializable for LoginUser { fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(2 + self.username.len() + self.password.len()); + let password = self.password.expose_secret(); + let mut bytes = BytesMut::with_capacity(2 + self.username.len() + password.len()); #[allow(clippy::cast_possible_truncation)] bytes.put_u8(self.username.len() as u8); bytes.put_slice(self.username.as_bytes()); #[allow(clippy::cast_possible_truncation)] - bytes.put_u8(self.password.len() as u8); - bytes.put_slice(self.password.as_bytes()); + bytes.put_u8(password.len() as u8); + bytes.put_slice(password.as_bytes()); match &self.version { Some(version) => { bytes.put_u32_le(version.len() as u32); @@ -200,7 +204,7 @@ impl BytesSerializable for LoginUser { let command = LoginUser { username, - password, + password: SecretString::from(password), version, context, }; @@ -222,7 +226,7 @@ mod tests { fn should_be_serialized_as_bytes() { let command = LoginUser { username: "user".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), version: Some("1.0.0".to_string()), context: Some("test".to_string()), }; @@ -253,7 +257,7 @@ mod tests { assert!(!bytes.is_empty()); assert_eq!(username, command.username); - assert_eq!(password, command.password); + assert_eq!(password, command.password.expose_secret()); assert_eq!(version, command.version); assert_eq!(context, command.context); } @@ -267,7 +271,7 @@ mod tests { fn from_bytes_should_fail_on_truncated_input() { let command = LoginUser { username: "user".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), version: Some("1.0.0".to_string()), context: Some("test".to_string()), }; @@ -275,7 +279,7 @@ mod tests { // Truncate at every position up to (but not including) the version field. // Positions within username/password must error; positions at or past the // version boundary are valid old-SDK payloads. - let version_offset = 2 + command.username.len() + command.password.len(); + let version_offset = 2 + command.username.len() + command.password.expose_secret().len(); for i in 0..version_offset { let truncated = bytes.slice(..i); assert!( @@ -332,7 +336,7 @@ mod tests { let command = LoginUser::from_bytes(bytes.freeze()).unwrap(); assert_eq!(command.username, username); - assert_eq!(command.password, password); + assert_eq!(command.password.expose_secret(), password); assert_eq!(command.version, None); assert_eq!(command.context, None); } @@ -359,8 +363,37 @@ mod tests { let command = command.unwrap(); assert_eq!(command.username, username); - assert_eq!(command.password, password); + assert_eq!(command.password.expose_secret(), password); assert_eq!(command.version, Some(version)); assert_eq!(command.context, Some(context)); } + + #[test] + fn should_fail_validation_for_empty_username() { + let command = LoginUser { + username: "".to_string(), + ..LoginUser::default() + }; + assert!(command.validate().is_err()); + } + + #[test] + fn should_fail_validation_for_invalid_password() { + for password in ["", "ab"] { + let command = LoginUser { + password: SecretString::from(password), + ..LoginUser::default() + }; + assert!( + command.validate().is_err(), + "expected validation error for password: {password:?}" + ); + } + } + + #[test] + fn should_pass_validation_for_valid_command() { + let command = LoginUser::default(); + assert!(command.validate().is_ok()); + } } diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 349704d6cb..b1b8aeb091 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -130,6 +130,7 @@ pub use utils::expiry::IggyExpiry; pub use utils::hash::*; pub use utils::personal_access_token_expiry::PersonalAccessTokenExpiry; pub use utils::random_id; +pub use utils::serde_secret; pub use utils::text; pub use utils::timestamp::*; pub use utils::topic_size::MaxTopicSize; diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs b/core/common/src/traits/binary_impls/personal_access_tokens.rs index 1323ac28bb..cd90fe6bfb 100644 --- a/core/common/src/traits/binary_impls/personal_access_tokens.rs +++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs @@ -28,6 +28,7 @@ use crate::{ ClientState, DiagnosticEvent, IdentityInfo, IggyError, PersonalAccessTokenClient, PersonalAccessTokenExpiry, PersonalAccessTokenInfo, RawPersonalAccessToken, }; +use secrecy::SecretString; #[async_trait::async_trait] impl PersonalAccessTokenClient for B { @@ -67,7 +68,7 @@ impl PersonalAccessTokenClient for B { ) -> Result { let response = self .send_with_response(&LoginWithPersonalAccessToken { - token: token.to_string(), + token: SecretString::from(token), }) .await?; self.set_state(ClientState::Authenticated).await; diff --git a/core/common/src/traits/binary_impls/users.rs b/core/common/src/traits/binary_impls/users.rs index 9b4edc8f92..dc2fd1fc24 100644 --- a/core/common/src/traits/binary_impls/users.rs +++ b/core/common/src/traits/binary_impls/users.rs @@ -32,6 +32,7 @@ use crate::{ ClientState, DiagnosticEvent, Identifier, IdentityInfo, IggyError, Permissions, UserClient, UserInfo, UserInfoDetails, UserStatus, }; +use secrecy::SecretString; #[async_trait::async_trait] impl UserClient for B { @@ -66,7 +67,7 @@ impl UserClient for B { let response = self .send_with_response(&CreateUser { username: username.to_string(), - password: password.to_string(), + password: SecretString::from(password), status, permissions, }) @@ -122,8 +123,8 @@ impl UserClient for B { fail_if_not_authenticated(self).await?; self.send_with_response(&ChangePassword { user_id: user_id.clone(), - current_password: current_password.to_string(), - new_password: new_password.to_string(), + current_password: SecretString::from(current_password), + new_password: SecretString::from(new_password), }) .await?; Ok(()) @@ -133,7 +134,7 @@ impl UserClient for B { let response = self .send_with_response(&LoginUser { username: username.to_string(), - password: password.to_string(), + password: SecretString::from(password), version: Some(env!("CARGO_PKG_VERSION").to_string()), context: Some("".to_string()), }) diff --git a/core/common/src/types/configuration/auth_config/auto_login.rs b/core/common/src/types/configuration/auth_config/auto_login.rs index b9d08dd8b5..63ff2e541e 100644 --- a/core/common/src/types/configuration/auth_config/auto_login.rs +++ b/core/common/src/types/configuration/auth_config/auto_login.rs @@ -17,7 +17,7 @@ use crate::Credentials; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum AutoLogin { Disabled, Enabled(Credentials), diff --git a/core/common/src/types/configuration/auth_config/connection_string.rs b/core/common/src/types/configuration/auth_config/connection_string.rs index 391633d534..f623e69845 100644 --- a/core/common/src/types/configuration/auth_config/connection_string.rs +++ b/core/common/src/types/configuration/auth_config/connection_string.rs @@ -17,6 +17,7 @@ */ use crate::{AutoLogin, ConnectionStringOptions, Credentials, IggyError, TransportProtocol}; +use secrecy::SecretString; use std::str::FromStr; const DEFAULT_CONNECTION_STRING_PREFIX: &str = "iggy://"; @@ -105,7 +106,7 @@ impl ConnectionString { return Ok(ConnectionString { server_address: server_address.to_owned(), auto_login: AutoLogin::Enabled(Credentials::PersonalAccessToken( - pat_token.to_owned(), + SecretString::from(pat_token.to_owned()), )), options: connection_string_options, }); @@ -115,7 +116,7 @@ impl ConnectionString { server_address: server_address.to_owned(), auto_login: AutoLogin::Enabled(Credentials::UsernamePassword( username.to_owned(), - password.to_owned(), + SecretString::from(password.to_owned()), )), options: connection_string_options, }) @@ -160,6 +161,7 @@ mod tests { use super::*; use crate::IggyDuration; use crate::TcpConnectionStringOptions; + use secrecy::ExposeSecret; #[test] fn should_fail_without_username() { @@ -243,13 +245,13 @@ mod tests { connection_string.server_address, format!("{server_address}:{port}") ); - assert_eq!( - connection_string.auto_login, - AutoLogin::Enabled(Credentials::UsernamePassword( - username.to_string(), - password.to_string() - )) - ); + match &connection_string.auto_login { + AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => { + assert_eq!(u, username); + assert_eq!(p.expose_secret(), password); + } + _ => panic!("Expected UsernamePassword credentials"), + } assert!(connection_string.options.retries().is_none()); assert_eq!( @@ -277,13 +279,13 @@ mod tests { connection_string.server_address, format!("{server_address}:{port}") ); - assert_eq!( - connection_string.auto_login, - AutoLogin::Enabled(Credentials::UsernamePassword( - username.to_string(), - password.to_string() - )) - ); + match &connection_string.auto_login { + AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => { + assert_eq!(u, username); + assert_eq!(p.expose_secret(), password); + } + _ => panic!("Expected UsernamePassword credentials"), + } assert_eq!(connection_string.options.retries().unwrap(), 3); assert_eq!( @@ -306,10 +308,12 @@ mod tests { connection_string.server_address, format!("{server_address}:{port}") ); - assert_eq!( - connection_string.auto_login, - AutoLogin::Enabled(Credentials::PersonalAccessToken(pat.to_string())) - ); + match &connection_string.auto_login { + AutoLogin::Enabled(Credentials::PersonalAccessToken(token)) => { + assert_eq!(token.expose_secret(), pat); + } + _ => panic!("Expected PersonalAccessToken credentials"), + } assert!(connection_string.options.retries().is_none()); assert_eq!( diff --git a/core/common/src/types/configuration/auth_config/credentials.rs b/core/common/src/types/configuration/auth_config/credentials.rs index ca6e00f673..61e291c6e7 100644 --- a/core/common/src/types/configuration/auth_config/credentials.rs +++ b/core/common/src/types/configuration/auth_config/credentials.rs @@ -15,8 +15,27 @@ // specific language governing permissions and limitations // under the License. -#[derive(Debug, Clone, PartialEq)] +use secrecy::SecretString; +use std::fmt; + +#[derive(Clone)] pub enum Credentials { - UsernamePassword(String, String), - PersonalAccessToken(String), + UsernamePassword(String, SecretString), + PersonalAccessToken(SecretString), +} + +impl fmt::Debug for Credentials { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Credentials::UsernamePassword(username, _) => f + .debug_tuple("UsernamePassword") + .field(username) + .field(&"[REDACTED]") + .finish(), + Credentials::PersonalAccessToken(_) => f + .debug_tuple("PersonalAccessToken") + .field(&"[REDACTED]") + .finish(), + } + } } diff --git a/core/common/src/types/permissions/personal_access_token.rs b/core/common/src/types/permissions/personal_access_token.rs index 0837ba17c1..06ea4c1d1b 100644 --- a/core/common/src/types/permissions/personal_access_token.rs +++ b/core/common/src/types/permissions/personal_access_token.rs @@ -18,16 +18,25 @@ use crate::utils::timestamp::IggyTimestamp; use serde::{Deserialize, Serialize}; +use std::fmt; /// `RawPersonalAccessToken` represents the raw personal access token - the secured token which is returned only once during the creation. /// It consists of the following fields: /// - `token`: the unique token that should be securely stored by the user and can be used for authentication. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub struct RawPersonalAccessToken { /// The unique token that should be securely stored by the user and can be used for authentication. pub token: String, } +impl fmt::Debug for RawPersonalAccessToken { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RawPersonalAccessToken") + .field("token", &"[REDACTED]") + .finish() + } +} + /// `PersonalAccessToken` represents the personal access token. It does not contain the token itself, but the information about the token. /// It consists of the following fields: /// - `name`: the unique name of the token. diff --git a/core/common/src/types/user/user_identity_info.rs b/core/common/src/types/user/user_identity_info.rs index 64005eec23..35cec07699 100644 --- a/core/common/src/types/user/user_identity_info.rs +++ b/core/common/src/types/user/user_identity_info.rs @@ -18,6 +18,7 @@ use crate::UserId; use serde::{Deserialize, Serialize}; +use std::fmt; /// `IdentityInfo` represents the information about an identity. /// It consists of the following fields: @@ -35,10 +36,19 @@ pub struct IdentityInfo { /// It consists of the following fields: /// - `token`: the value of token. /// - `expiry`: the expiry of token. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub struct TokenInfo { /// The value of token. pub token: String, /// The expiry of token. pub expiry: u64, } + +impl fmt::Debug for TokenInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TokenInfo") + .field("token", &"[REDACTED]") + .field("expiry", &self.expiry) + .finish() + } +} diff --git a/core/common/src/utils/mod.rs b/core/common/src/utils/mod.rs index 00793cad15..0a300136ee 100644 --- a/core/common/src/utils/mod.rs +++ b/core/common/src/utils/mod.rs @@ -25,6 +25,7 @@ pub(crate) mod expiry; pub(crate) mod hash; pub(crate) mod personal_access_token_expiry; pub mod random_id; +pub mod serde_secret; pub mod text; pub(crate) mod timestamp; pub(crate) mod topic_size; diff --git a/core/common/src/utils/serde_secret.rs b/core/common/src/utils/serde_secret.rs new file mode 100644 index 0000000000..7036981609 --- /dev/null +++ b/core/common/src/utils/serde_secret.rs @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Serde serialization helpers for `SecretString` fields. +//! +//! `SecretString` intentionally does not implement `Serialize` to prevent +//! accidental secret exposure. These helpers are for fields that **must** be +//! serialized (e.g., wire protocol payloads, persisted TOML configs, API +//! responses that already expose credentials by design). +//! +//! Usage: +//! ```ignore +//! #[serde(serialize_with = "crate::utils::serde_secret::serialize_secret")] +//! pub password: SecretString, +//! ``` +//! +//! Do **not** add `serialize_with` to fields that should remain redacted in +//! serialized output — rely on `SecretString`'s default behavior instead. + +use secrecy::{ExposeSecret, SecretString}; + +pub fn serialize_secret( + secret: &SecretString, + serializer: S, +) -> Result { + serializer.serialize_str(secret.expose_secret()) +} + +pub fn serialize_optional_secret( + secret: &Option, + serializer: S, +) -> Result { + match secret { + Some(s) => serializer.serialize_some(s.expose_secret()), + None => serializer.serialize_none(), + } +} diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index cee58dd17e..20d42deeca 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -60,6 +60,7 @@ reqwest = { workspace = true } reqwest-middleware = { workspace = true } reqwest-retry = { workspace = true } reqwest-tracing = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } diff --git a/core/connectors/runtime/src/api/auth.rs b/core/connectors/runtime/src/api/auth.rs index 6f4439af17..57ccf207a3 100644 --- a/core/connectors/runtime/src/api/auth.rs +++ b/core/connectors/runtime/src/api/auth.rs @@ -24,6 +24,7 @@ use axum::{ middleware::Next, response::Response, }; +use secrecy::ExposeSecret; use std::sync::Arc; const API_KEY_HEADER: &str = "api-key"; @@ -38,7 +39,7 @@ pub async fn resolve_api_key( return Ok(next.run(request).await); } - if context.api_key.is_empty() { + if context.api_key.expose_secret().is_empty() { return Ok(next.run(request).await); }; @@ -50,7 +51,7 @@ pub async fn resolve_api_key( return Err(StatusCode::UNAUTHORIZED); }; - if api_key != context.api_key { + if api_key != context.api_key.expose_secret() { return Err(StatusCode::UNAUTHORIZED); } diff --git a/core/connectors/runtime/src/api/config.rs b/core/connectors/runtime/src/api/config.rs index 2b5bc009ab..e2ecdf1613 100644 --- a/core/connectors/runtime/src/api/config.rs +++ b/core/connectors/runtime/src/api/config.rs @@ -31,7 +31,7 @@ pub const YAML_HEADER: HeaderValue = HeaderValue::from_static("application/yaml" pub const TOML_HEADER: HeaderValue = HeaderValue::from_static("application/toml"); pub const TEXT_HEADER: HeaderValue = HeaderValue::from_static("text/plain"); -#[derive(Debug, Clone, Deserialize, Serialize, ConfigEnv)] +#[derive(Clone, Deserialize, Serialize, ConfigEnv)] pub struct HttpConfig { pub enabled: bool, pub address: String, @@ -42,6 +42,19 @@ pub struct HttpConfig { pub metrics: HttpMetricsConfig, } +impl std::fmt::Debug for HttpConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HttpConfig") + .field("enabled", &self.enabled) + .field("address", &self.address) + .field("api_key", &"[REDACTED]") + .field("cors", &self.cors) + .field("tls", &self.tls) + .field("metrics", &self.metrics) + .finish() + } +} + #[derive(Debug, Clone, Deserialize, Serialize, ConfigEnv)] pub struct HttpMetricsConfig { pub enabled: bool, @@ -178,8 +191,8 @@ impl std::fmt::Display for HttpConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ address: {}, api_key: {}, cors: {}, tls: {}, metrics: {} }}", - self.address, self.api_key, self.cors, self.tls, self.metrics + "{{ address: {}, api_key: ******, cors: {}, tls: {}, metrics: {} }}", + self.address, self.cors, self.tls, self.metrics ) } } diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index a87095463b..b5a1c4c411 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -30,6 +30,7 @@ use crate::{ use iggy_common::IggyTimestamp; use iggy_connector_sdk::api::ConnectorError; use iggy_connector_sdk::api::ConnectorStatus; +use secrecy::SecretString; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; @@ -38,7 +39,7 @@ use tracing::error; pub struct RuntimeContext { pub sinks: SinkManager, pub sources: SourceManager, - pub api_key: String, + pub api_key: SecretString, pub config_provider: Arc, pub metrics: Arc, pub start_time: IggyTimestamp, @@ -67,7 +68,7 @@ pub fn init( RuntimeContext { sinks, sources, - api_key: config.http.api_key.to_owned(), + api_key: SecretString::from(config.http.api_key.to_owned()), config_provider: Arc::from(config_provider), metrics, start_time: IggyTimestamp::now(), diff --git a/core/connectors/sinks/elasticsearch_sink/Cargo.toml b/core/connectors/sinks/elasticsearch_sink/Cargo.toml index 2598fcc8ff..40923ea9d4 100644 --- a/core/connectors/sinks/elasticsearch_sink/Cargo.toml +++ b/core/connectors/sinks/elasticsearch_sink/Cargo.toml @@ -42,6 +42,7 @@ elasticsearch = { workspace = true } iggy_common = { workspace = true } iggy_connector_sdk = { workspace = true } once_cell = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } simd-json = { workspace = true } diff --git a/core/connectors/sinks/elasticsearch_sink/src/lib.rs b/core/connectors/sinks/elasticsearch_sink/src/lib.rs index b9f17d6e80..36c76935cb 100644 --- a/core/connectors/sinks/elasticsearch_sink/src/lib.rs +++ b/core/connectors/sinks/elasticsearch_sink/src/lib.rs @@ -28,6 +28,7 @@ use iggy_common::IggyTimestamp; use iggy_connector_sdk::{ ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, }; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use serde_json::json; use simd_json::{OwnedValue, prelude::*}; @@ -73,7 +74,8 @@ pub struct ElasticsearchSinkConfig { pub url: String, pub index: String, pub username: Option, - pub password: Option, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")] + pub password: Option, pub batch_size: Option, pub timeout_seconds: Option, pub create_index_if_not_exists: Option, @@ -110,7 +112,8 @@ impl ElasticsearchSink { let mut transport_builder = TransportBuilder::new(conn_pool); if let (Some(username), Some(password)) = (&self.config.username, &self.config.password) { - let credentials = Credentials::Basic(username.clone(), password.clone()); + let credentials = + Credentials::Basic(username.clone(), password.expose_secret().to_string()); transport_builder = transport_builder.auth(credentials); } diff --git a/core/connectors/sinks/mongodb_sink/Cargo.toml b/core/connectors/sinks/mongodb_sink/Cargo.toml index 4acbf7a38f..0a4305f2f0 100644 --- a/core/connectors/sinks/mongodb_sink/Cargo.toml +++ b/core/connectors/sinks/mongodb_sink/Cargo.toml @@ -34,8 +34,10 @@ crate-type = ["cdylib", "lib"] [dependencies] async-trait = { workspace = true } humantime = { workspace = true } +iggy_common = { workspace = true } iggy_connector_sdk = { workspace = true } mongodb = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } diff --git a/core/connectors/sinks/mongodb_sink/src/lib.rs b/core/connectors/sinks/mongodb_sink/src/lib.rs index 34b01ca7b2..2a4e89101d 100644 --- a/core/connectors/sinks/mongodb_sink/src/lib.rs +++ b/core/connectors/sinks/mongodb_sink/src/lib.rs @@ -22,6 +22,7 @@ use iggy_connector_sdk::{ ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata, sink_connector, }; use mongodb::{Client, Collection, bson, options::ClientOptions}; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; @@ -52,7 +53,8 @@ pub struct MongoDbSink { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MongoDbSinkConfig { - pub connection_uri: String, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_secret")] + pub connection_uri: SecretString, pub database: String, pub collection: String, pub max_pool_size: Option, @@ -169,11 +171,11 @@ impl Sink for MongoDbSink { impl MongoDbSink { /// Build a MongoDB client using ClientOptions so max_pool_size can be applied. async fn connect(&mut self) -> Result<(), Error> { - let redacted = redact_connection_uri(&self.config.connection_uri); + let redacted = redact_connection_uri(self.config.connection_uri.expose_secret()); info!("Connecting to MongoDB: {redacted}"); - let mut options = ClientOptions::parse(&self.config.connection_uri) + let mut options = ClientOptions::parse(self.config.connection_uri.expose_secret()) .await .map_err(|e| Error::InitError(format!("Failed to parse connection URI: {e}")))?; @@ -598,7 +600,7 @@ mod tests { fn given_default_config() -> MongoDbSinkConfig { MongoDbSinkConfig { - connection_uri: "mongodb://localhost:27017".to_string(), + connection_uri: SecretString::from("mongodb://localhost:27017"), database: "test_db".to_string(), collection: "test_collection".to_string(), max_pool_size: None, diff --git a/core/connectors/sinks/postgres_sink/Cargo.toml b/core/connectors/sinks/postgres_sink/Cargo.toml index fb992f1eb0..56ab685a33 100644 --- a/core/connectors/sinks/postgres_sink/Cargo.toml +++ b/core/connectors/sinks/postgres_sink/Cargo.toml @@ -42,6 +42,7 @@ humantime = { workspace = true } iggy_common = { workspace = true } iggy_connector_sdk = { workspace = true } once_cell = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } simd-json = { workspace = true } diff --git a/core/connectors/sinks/postgres_sink/src/lib.rs b/core/connectors/sinks/postgres_sink/src/lib.rs index 2cbdcd6052..e45572a72a 100644 --- a/core/connectors/sinks/postgres_sink/src/lib.rs +++ b/core/connectors/sinks/postgres_sink/src/lib.rs @@ -22,6 +22,7 @@ use iggy_common::{DateTime, Utc}; use iggy_connector_sdk::{ ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata, sink_connector, }; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPoolOptions; use sqlx::{Pool, Postgres}; @@ -47,7 +48,8 @@ pub struct PostgresSink { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PostgresSinkConfig { - pub connection_string: String, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_secret")] + pub connection_string: SecretString, pub target_table: String, pub batch_size: Option, pub max_connections: Option, @@ -159,13 +161,13 @@ impl Sink for PostgresSink { impl PostgresSink { async fn connect(&mut self) -> Result<(), Error> { let max_connections = self.config.max_connections.unwrap_or(10); - let redacted = redact_connection_string(&self.config.connection_string); + let redacted = redact_connection_string(self.config.connection_string.expose_secret()); info!("Connecting to PostgreSQL with max {max_connections} connections: {redacted}"); let pool = PgPoolOptions::new() .max_connections(max_connections) - .connect(&self.config.connection_string) + .connect(self.config.connection_string.expose_secret()) .await .map_err(|e| Error::InitError(format!("Failed to connect to PostgreSQL: {e}")))?; @@ -546,7 +548,7 @@ mod tests { fn test_config() -> PostgresSinkConfig { PostgresSinkConfig { - connection_string: "postgres://localhost/db".to_string(), + connection_string: SecretString::from("postgres://localhost/db"), target_table: "messages".to_string(), batch_size: Some(100), max_connections: None, diff --git a/core/connectors/sources/elasticsearch_source/Cargo.toml b/core/connectors/sources/elasticsearch_source/Cargo.toml index f705bde7fc..d1bb9c654a 100644 --- a/core/connectors/sources/elasticsearch_source/Cargo.toml +++ b/core/connectors/sources/elasticsearch_source/Cargo.toml @@ -42,6 +42,7 @@ humantime = { workspace = true } iggy_common = { workspace = true } iggy_connector_sdk = { workspace = true } once_cell = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } simd-json = { workspace = true } diff --git a/core/connectors/sources/elasticsearch_source/src/lib.rs b/core/connectors/sources/elasticsearch_source/src/lib.rs index fc934847b8..42466d15b3 100644 --- a/core/connectors/sources/elasticsearch_source/src/lib.rs +++ b/core/connectors/sources/elasticsearch_source/src/lib.rs @@ -26,6 +26,7 @@ use iggy_common::{DateTime, Utc}; use iggy_connector_sdk::{ ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector, }; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::str::FromStr; @@ -95,7 +96,8 @@ pub struct ElasticsearchSourceConfig { pub url: String, pub index: String, pub username: Option, - pub password: Option, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")] + pub password: Option, pub query: Option, pub polling_interval: Option, pub batch_size: Option, @@ -303,7 +305,8 @@ impl ElasticsearchSource { let mut transport_builder = TransportBuilder::new(conn_pool); if let (Some(username), Some(password)) = (&self.config.username, &self.config.password) { - let credentials = Credentials::Basic(username.clone(), password.clone()); + let credentials = + Credentials::Basic(username.clone(), password.expose_secret().to_string()); transport_builder = transport_builder.auth(credentials); } diff --git a/core/connectors/sources/postgres_source/Cargo.toml b/core/connectors/sources/postgres_source/Cargo.toml index 2b0969901e..bbc82bed28 100644 --- a/core/connectors/sources/postgres_source/Cargo.toml +++ b/core/connectors/sources/postgres_source/Cargo.toml @@ -47,6 +47,7 @@ humantime = { workspace = true } iggy_common = { workspace = true } iggy_connector_sdk = { workspace = true } once_cell = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } simd-json = { workspace = true } diff --git a/core/connectors/sources/postgres_source/src/lib.rs b/core/connectors/sources/postgres_source/src/lib.rs index 798c4b9e98..f01bcba3f2 100644 --- a/core/connectors/sources/postgres_source/src/lib.rs +++ b/core/connectors/sources/postgres_source/src/lib.rs @@ -23,6 +23,7 @@ use iggy_common::{DateTime, Utc}; use iggy_connector_sdk::{ ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector, }; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPoolOptions; use sqlx::{Column, Pool, Postgres, Row, TypeInfo}; @@ -51,7 +52,8 @@ pub struct PostgresSource { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PostgresSourceConfig { - pub connection_string: String, + #[serde(serialize_with = "iggy_common::serde_secret::serialize_secret")] + pub connection_string: SecretString, pub mode: String, pub tables: Vec, pub poll_interval: Option, @@ -263,13 +265,13 @@ impl Source for PostgresSource { impl PostgresSource { async fn connect(&mut self) -> Result<(), Error> { let max_connections = self.config.max_connections.unwrap_or(10); - let redacted = redact_connection_string(&self.config.connection_string); + let redacted = redact_connection_string(self.config.connection_string.expose_secret()); info!("Connecting to PostgreSQL with max {max_connections} connections: {redacted}"); let pool = PgPoolOptions::new() .max_connections(max_connections) - .connect(&self.config.connection_string) + .connect(self.config.connection_string.expose_secret()) .await .map_err(|e| Error::InitError(format!("Failed to connect to PostgreSQL: {e}")))?; @@ -1143,7 +1145,7 @@ mod tests { fn test_config() -> PostgresSourceConfig { PostgresSourceConfig { - connection_string: "postgres://localhost/db".to_string(), + connection_string: SecretString::from("postgres://localhost/db"), mode: "polling".to_string(), tables: vec!["users".to_string()], poll_interval: Some("5s".to_string()), diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index b0cf3baf92..9851e5be0d 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -61,6 +61,7 @@ rmcp = { workspace = true, features = [ "transport-streamable-http-client", "transport-streamable-http-client-reqwest", ] } +secrecy = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true } diff --git a/core/integration/tests/cli/user/test_user_create_command.rs b/core/integration/tests/cli/user/test_user_create_command.rs index f880cf46ae..310485a3de 100644 --- a/core/integration/tests/cli/user/test_user_create_command.rs +++ b/core/integration/tests/cli/user/test_user_create_command.rs @@ -101,10 +101,10 @@ impl IggyCmdTestCase for TestUserCreateCmd { } fn verify_command(&self, command_state: Assert) { - command_state - .success() - .stdout(diff(format!("Executing create user with username: {} and password: {}\nUser with username: {} and password: {} created\n", - self.username, self.password, self.username, self.password))); + command_state.success().stdout(diff(format!( + "Executing create user with username: {}\nUser with username: {} created\n", + self.username, self.username + ))); } async fn verify_server_state(&self, client: &dyn Client) { diff --git a/core/integration/tests/server/scenarios/purge_delete_scenario.rs b/core/integration/tests/server/scenarios/purge_delete_scenario.rs index 2e971be407..211f3c5fc2 100644 --- a/core/integration/tests/server/scenarios/purge_delete_scenario.rs +++ b/core/integration/tests/server/scenarios/purge_delete_scenario.rs @@ -19,6 +19,7 @@ use bytes::Bytes; use iggy::prelude::*; use iggy_common::Credentials; use integration::harness::TestHarness; +use secrecy::SecretString; use std::fs::{metadata, read_dir}; use std::path::Path; use std::str::FromStr; @@ -961,7 +962,7 @@ fn build_root_client(harness: &TestHarness) -> IggyClient { .with_server_address(addr.to_string()) .with_auto_sign_in(AutoLogin::Enabled(Credentials::UsernamePassword( DEFAULT_ROOT_USERNAME.to_string(), - DEFAULT_ROOT_PASSWORD.to_string(), + SecretString::from(DEFAULT_ROOT_PASSWORD), ))) .with_reconnection_max_retries(Some(10)) .with_reconnection_interval(interval) diff --git a/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs index e6fc95650f..b3b83fff59 100644 --- a/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs +++ b/core/integration/tests/server/scenarios/stale_client_consumer_group_scenario.rs @@ -20,6 +20,7 @@ use futures::StreamExt; use iggy::prelude::*; use iggy_common::Credentials; use integration::iggy_harness; +use secrecy::SecretString; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -50,7 +51,7 @@ async fn create_reconnecting_client(server_addr: &str) -> IggyClient { nodelay: true, auto_login: AutoLogin::Enabled(Credentials::UsernamePassword( DEFAULT_ROOT_USERNAME.to_string(), - DEFAULT_ROOT_PASSWORD.to_string(), + SecretString::from(DEFAULT_ROOT_PASSWORD), )), reconnection: TcpClientReconnectionConfig { enabled: true, diff --git a/core/integration/tests/state/file.rs b/core/integration/tests/state/file.rs index 9b1a9d1813..838c8d4a66 100644 --- a/core/integration/tests/state/file.rs +++ b/core/integration/tests/state/file.rs @@ -21,6 +21,7 @@ use bytes::Bytes; use iggy::prelude::BytesSerializable; use iggy_common::create_stream::CreateStream; use iggy_common::create_user::CreateUser; +use secrecy::SecretString; use server::state::command::EntryCommand; use server::state::entry::StateEntry; use server::state::models::{CreateStreamWithId, CreateUserWithId}; @@ -45,7 +46,7 @@ async fn should_apply_single_entry() { user_id: 1, command: CreateUser { username: "test".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), status: Default::default(), permissions: None, }, @@ -71,7 +72,7 @@ async fn should_apply_encrypted_entry() { user_id: 1, command: CreateUser { username: "test".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), status: Default::default(), permissions: None, }, @@ -103,7 +104,7 @@ async fn should_apply_multiple_entries() { user_id: created_user_id, command: CreateUser { username: "test".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), status: Default::default(), permissions: None, }, diff --git a/core/integration/tests/state/system.rs b/core/integration/tests/state/system.rs index d54146014d..ca567de41e 100644 --- a/core/integration/tests/state/system.rs +++ b/core/integration/tests/state/system.rs @@ -25,6 +25,7 @@ use iggy_common::create_stream::CreateStream; use iggy_common::create_topic::CreateTopic; use iggy_common::create_user::CreateUser; use iggy_common::delete_stream::DeleteStream; +use secrecy::{ExposeSecret, SecretString}; use server::state::command::EntryCommand; use server::state::models::{ CreateConsumerGroupWithId, CreatePersonalAccessTokenWithHash, CreateStreamWithId, @@ -41,13 +42,13 @@ async fn should_be_initialized_based_on_state_entries() { let user_id = 0; let create_user = CreateUser { username: "user".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), status: Default::default(), permissions: None, }; let create_user_clone = CreateUser { username: "user".to_string(), - password: "secret".to_string(), + password: SecretString::from("secret"), status: Default::default(), permissions: None, }; @@ -222,7 +223,10 @@ async fn should_be_initialized_based_on_state_entries() { let mut user = system.users.remove(&user_id).unwrap(); assert_eq!(user.id, user_id); assert_eq!(user.username, create_user_clone.username); - assert_eq!(user.password_hash, create_user_clone.password); + assert_eq!( + user.password_hash, + create_user_clone.password.expose_secret() + ); assert_eq!(user.personal_access_tokens.len(), 1); let personal_access_token = user diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml index 6cdad3ad48..e0d024aaf6 100644 --- a/core/metadata/Cargo.toml +++ b/core/metadata/Cargo.toml @@ -37,6 +37,7 @@ left-right = { workspace = true } message_bus = { workspace = true } paste = { workspace = true } rmp-serde = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true, features = ["derive"] } slab = { workspace = true } tracing = { workspace = true } diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index 8e0ec5d8e7..5ebf2ee5ce 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -32,6 +32,7 @@ use iggy_common::{ GlobalPermissions, IggyTimestamp, Permissions, PersonalAccessToken, StreamPermissions, UserId, UserStatus, }; +use secrecy::ExposeSecret; use serde::{Deserialize, Serialize}; use slab::Slab; use std::sync::Arc; @@ -137,7 +138,7 @@ impl StateHandler for CreateUser { let user = User { id: 0, username: username_arc.clone(), - password_hash: Arc::from(self.password.as_str()), + password_hash: Arc::from(self.password.expose_secret()), status: self.status, created_at: iggy_common::IggyTimestamp::now(), permissions: self.permissions.as_ref().map(|p| Arc::new(p.clone())), @@ -214,7 +215,7 @@ impl StateHandler for ChangePassword { }; if let Some(user) = state.items.get_mut(user_id) { - user.password_hash = Arc::from(self.new_password.as_str()); + user.password_hash = Arc::from(self.new_password.expose_secret()); } Bytes::new() } diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index 4d2af3e6e8..6bee2175f5 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -45,6 +45,7 @@ reqwest-middleware = { workspace = true } reqwest-retry = { workspace = true } reqwest-tracing = { workspace = true } rustls = { workspace = true } +secrecy = { workspace = true } serde = { workspace = true } tokio = { workspace = true } tokio-rustls = { workspace = true } diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index fa93fd2c87..6943518159 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -30,6 +30,7 @@ use iggy_common::{ AutoLogin, Credentials, TransportProtocol, WebSocketClientConfig, WebSocketClientReconnectionConfig, WebSocketConfig, }; +use secrecy::SecretString; use std::str::FromStr; use std::sync::Arc; @@ -106,7 +107,7 @@ impl ClientProviderConfig { auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( args.username, - args.password, + SecretString::from(args.password), )) } else { AutoLogin::Disabled @@ -150,7 +151,7 @@ impl ClientProviderConfig { auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( args.username, - args.password, + SecretString::from(args.password), )) } else { AutoLogin::Disabled @@ -175,7 +176,7 @@ impl ClientProviderConfig { auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( args.username, - args.password, + SecretString::from(args.password), )) } else { AutoLogin::Disabled diff --git a/core/sdk/src/http/personal_access_tokens.rs b/core/sdk/src/http/personal_access_tokens.rs index f9c6923a2e..902f0baf40 100644 --- a/core/sdk/src/http/personal_access_tokens.rs +++ b/core/sdk/src/http/personal_access_tokens.rs @@ -26,6 +26,7 @@ use iggy_common::PersonalAccessTokenExpiry; use iggy_common::create_personal_access_token::CreatePersonalAccessToken; use iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken; use iggy_common::{PersonalAccessTokenInfo, RawPersonalAccessToken}; +use secrecy::SecretString; const PATH: &str = "/personal-access-tokens"; @@ -74,7 +75,7 @@ impl PersonalAccessTokenClient for HttpClient { .post( &format!("{PATH}/login"), &LoginWithPersonalAccessToken { - token: token.to_string(), + token: SecretString::from(token), }, ) .await?; diff --git a/core/sdk/src/http/users.rs b/core/sdk/src/http/users.rs index 496e94e867..08ad3b504a 100644 --- a/core/sdk/src/http/users.rs +++ b/core/sdk/src/http/users.rs @@ -27,6 +27,7 @@ use iggy_common::login_user::LoginUser; use iggy_common::update_permissions::UpdatePermissions; use iggy_common::update_user::UpdateUser; use iggy_common::{IdentityInfo, Permissions, UserInfo, UserInfoDetails, UserStatus}; +use secrecy::SecretString; const PATH: &str = "/users"; @@ -70,7 +71,7 @@ impl UserClient for HttpClient { PATH, &CreateUser { username: username.to_string(), - password: password.to_string(), + password: SecretString::from(password), status, permissions, }, @@ -133,8 +134,8 @@ impl UserClient for HttpClient { &format!("{PATH}/{}/password", &user_id.as_cow_str()), &ChangePassword { user_id: user_id.clone(), - current_password: current_password.to_string(), - new_password: new_password.to_string(), + current_password: SecretString::from(current_password), + new_password: SecretString::from(new_password), }, ) .await?; @@ -147,7 +148,7 @@ impl UserClient for HttpClient { &format!("{PATH}/login"), &LoginUser { username: username.to_string(), - password: password.to_string(), + password: SecretString::from(password), version: Some(env!("CARGO_PKG_VERSION").to_string()), context: Some("".to_string()), }, diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 6cde885095..a53dab901d 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -32,6 +32,7 @@ use iggy_common::{ use quinn::crypto::rustls::QuicClientConfig as QuinnQuicClientConfig; use quinn::{ClientConfig, Connection, Endpoint, IdleTimeout, RecvStream, VarInt}; use rustls::crypto::CryptoProvider; +use secrecy::ExposeSecret; use std::net::{SocketAddr, ToSocketAddrs}; use std::str::FromStr; use std::sync::Arc; @@ -393,7 +394,7 @@ impl QuicClient { self.set_state(ClientState::Authenticating).await; match credentials { Credentials::UsernamePassword(username, password) => { - self.login_user(username, password).await?; + self.login_user(username, password.expose_secret()).await?; self.publish_event(DiagnosticEvent::SignedIn).await; info!( "{NAME} client: {} has signed in with the user credentials, username: {username}", @@ -401,7 +402,8 @@ impl QuicClient { ); } Credentials::PersonalAccessToken(token) => { - self.login_with_personal_access_token(token).await?; + self.login_with_personal_access_token(token.expose_secret()) + .await?; self.publish_event(DiagnosticEvent::SignedIn).await; info!( "{NAME} client: {} has signed in with a personal access token.", @@ -785,13 +787,13 @@ mod tests { quic_client_config.server_address, format!("{server_address}:{port}") ); - assert_eq!( - quic_client_config.auto_login, - AutoLogin::Enabled(Credentials::UsernamePassword( - username.to_string(), - password.to_string() - )) - ); + match &quic_client_config.auto_login { + AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => { + assert_eq!(u, &username.to_string()); + assert_eq!(p.expose_secret(), password); + } + other => panic!("expected UsernamePassword auto_login, got {other:?}"), + } assert_eq!(quic_client_config.response_buffer_size, 10_000_000); assert_eq!(quic_client_config.max_concurrent_bidi_streams, 10_000); @@ -840,13 +842,13 @@ mod tests { quic_client_config.server_address, format!("{server_address}:{port}") ); - assert_eq!( - quic_client_config.auto_login, - AutoLogin::Enabled(Credentials::UsernamePassword( - username.to_string(), - password.to_string() - )) - ); + match &quic_client_config.auto_login { + AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => { + assert_eq!(u, &username.to_string()); + assert_eq!(p.expose_secret(), password); + } + other => panic!("expected UsernamePassword auto_login, got {other:?}"), + } assert_eq!(quic_client_config.response_buffer_size, 10_000_000); assert_eq!(quic_client_config.max_concurrent_bidi_streams, 10_000); @@ -893,10 +895,12 @@ mod tests { quic_client_config.server_address, format!("{server_address}:{port}") ); - assert_eq!( - quic_client_config.auto_login, - AutoLogin::Enabled(Credentials::PersonalAccessToken(pat.to_string())) - ); + match &quic_client_config.auto_login { + AutoLogin::Enabled(Credentials::PersonalAccessToken(t)) => { + assert_eq!(t.expose_secret(), pat); + } + other => panic!("expected PersonalAccessToken auto_login, got {other:?}"), + } assert_eq!(quic_client_config.response_buffer_size, 10_000_000); assert_eq!(quic_client_config.max_concurrent_bidi_streams, 10_000); diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index bd61df2cab..12ffb1c9a3 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -32,6 +32,7 @@ use iggy_common::{ }; use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, UserClient}; use rustls::pki_types::{CertificateDer, ServerName, pem::PemObject}; +use secrecy::ExposeSecret; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; @@ -443,13 +444,14 @@ impl TcpClient { self.set_state(ClientState::Authenticating).await; match credentials { Credentials::UsernamePassword(username, password) => { - self.login_user(username, password).await?; + self.login_user(username, password.expose_secret()).await?; info!( "{NAME} client: {client_address} has signed in with the user credentials, username: {username}", ); } Credentials::PersonalAccessToken(token) => { - self.login_with_personal_access_token(token).await?; + self.login_with_personal_access_token(token.expose_secret()) + .await?; info!( "{NAME} client: {client_address} has signed in with a personal access token.", ); @@ -766,13 +768,13 @@ mod tests { tcp_client_config.server_address, format!("{server_address}:{port}") ); - assert_eq!( - tcp_client_config.auto_login, - AutoLogin::Enabled(Credentials::UsernamePassword( - username.to_string(), - password.to_string() - )) - ); + match &tcp_client_config.auto_login { + AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => { + assert_eq!(u, &username.to_string()); + assert_eq!(p.expose_secret(), password); + } + other => panic!("expected UsernamePassword auto_login, got {other:?}"), + } assert!(!tcp_client_config.tls_enabled); assert!(tcp_client_config.tls_domain.is_empty()); @@ -815,13 +817,13 @@ mod tests { tcp_client_config.server_address, format!("{server_address}:{port}") ); - assert_eq!( - tcp_client_config.auto_login, - AutoLogin::Enabled(Credentials::UsernamePassword( - username.to_string(), - password.to_string() - )) - ); + match &tcp_client_config.auto_login { + AutoLogin::Enabled(Credentials::UsernamePassword(u, p)) => { + assert_eq!(u, &username.to_string()); + assert_eq!(p.expose_secret(), password); + } + other => panic!("expected UsernamePassword auto_login, got {other:?}"), + } assert!(!tcp_client_config.tls_enabled); assert!(tcp_client_config.tls_domain.is_empty()); @@ -862,10 +864,12 @@ mod tests { tcp_client_config.server_address, format!("{server_address}:{port}") ); - assert_eq!( - tcp_client_config.auto_login, - AutoLogin::Enabled(Credentials::PersonalAccessToken(pat.to_string())) - ); + match &tcp_client_config.auto_login { + AutoLogin::Enabled(Credentials::PersonalAccessToken(t)) => { + assert_eq!(t.expose_secret(), pat); + } + other => panic!("expected PersonalAccessToken auto_login, got {other:?}"), + } assert!(!tcp_client_config.tls_enabled); assert!(tcp_client_config.tls_domain.is_empty()); diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 0cbeb6b288..f86856d98b 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -32,6 +32,7 @@ use iggy_common::{ WebSocketConnectionStringOptions, }; use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, UserClient}; +use secrecy::ExposeSecret; use std::net::SocketAddr; use std::sync::Arc; use tokio::net::TcpStream; @@ -496,14 +497,15 @@ impl WebSocketClient { self.set_state(ClientState::Authenticating).await; match credentials { Credentials::UsernamePassword(username, password) => { - self.login_user(username, password).await?; + self.login_user(username, password.expose_secret()).await?; info!( "{NAME} client: {client_address} has signed in with the user credentials, username: {username}", ); Ok(()) } Credentials::PersonalAccessToken(token) => { - self.login_with_personal_access_token(token).await?; + self.login_with_personal_access_token(token.expose_secret()) + .await?; info!( "{NAME} client: {client_address} has signed in with a personal access token.", ); diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index c16ce5a6c1..b1eb2a74f0 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -85,6 +85,7 @@ rolling-file = { workspace = true } rust-embed = { workspace = true, optional = true } rustls = { workspace = true } rustls-pemfile = { workspace = true } +secrecy = { workspace = true } send_wrapper = { workspace = true } serde = { workspace = true } slab = { workspace = true } diff --git a/core/server/src/binary/command.rs b/core/server/src/binary/command.rs index 78a2b4417d..cac319f59e 100644 --- a/core/server/src/binary/command.rs +++ b/core/server/src/binary/command.rs @@ -424,9 +424,9 @@ mod tests { let mut bytes = BytesMut::with_capacity(payload.len()); bytes.put_slice(&payload); let bytes = Bytes::from(bytes); - assert_eq!( - &ServerCommand::from_code_and_payload(command_id, bytes).unwrap(), - command - ); + let deserialized = ServerCommand::from_code_and_payload(command_id, bytes).unwrap(); + // SecretString doesn't implement PartialEq, so we compare serialized bytes instead. + // This is sufficient since to_bytes() covers all wire-protocol-relevant fields. + assert_eq!(deserialized.to_bytes(), command.to_bytes()); } } diff --git a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs index db8a94f51d..3a7475d3b6 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs @@ -28,6 +28,7 @@ use crate::streaming::session::Session; use err_trail::ErrContext; use iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken; use iggy_common::{IggyError, SenderKind}; +use secrecy::ExposeSecret; use tracing::{debug, instrument}; impl ServerCommandHandler for LoginWithPersonalAccessToken { @@ -44,16 +45,12 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken { shard: &Rc, ) -> Result { debug!("session: {session}, command: {self}"); + let token = self.token.expose_secret(); let user = shard - .login_with_personal_access_token(&self.token, Some(session)) + .login_with_personal_access_token(token, Some(session)) .error(|e: &IggyError| { - let redacted_token = if self.token.len() > 4 { - format!("{}****", &self.token[..4]) - } else { - "****".to_string() - }; format!( - "{COMPONENT} (error: {e}) - failed to login with personal access token: {redacted_token}, session: {session}", + "{COMPONENT} (error: {e}) - failed to login with personal access token, session: {session}", ) })?; let identity_info = mapper::map_identity_info(user.id); diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs b/core/server/src/binary/handlers/users/login_user_handler.rs index 89a981c2a5..4836ffc7cd 100644 --- a/core/server/src/binary/handlers/users/login_user_handler.rs +++ b/core/server/src/binary/handlers/users/login_user_handler.rs @@ -27,6 +27,7 @@ use crate::streaming::session::Session; use err_trail::ErrContext; use iggy_common::login_user::LoginUser; use iggy_common::{IggyError, SenderKind}; +use secrecy::ExposeSecret; use std::rc::Rc; use tracing::{debug, info, instrument, warn}; @@ -54,7 +55,7 @@ impl ServerCommandHandler for LoginUser { } = self; info!("Logging in user: {} ...", &username); let user = shard - .login_user(&username, &password, Some(session)) + .login_user(&username, password.expose_secret(), Some(session)) .error(|e: &IggyError| { format!( "{COMPONENT} (error: {e}) - failed to login user with name: {}, session: {session}", diff --git a/core/server/src/binary/macros.rs b/core/server/src/binary/macros.rs index 22e5a66779..655705088c 100644 --- a/core/server/src/binary/macros.rs +++ b/core/server/src/binary/macros.rs @@ -33,7 +33,7 @@ macro_rules! define_server_command_enum { );* $(;)? ) => { #[enum_dispatch(ServerCommandHandler)] - #[derive(Debug, PartialEq, EnumString, EnumIter)] + #[derive(Debug, EnumString, EnumIter)] pub enum ServerCommand { $( $variant($ty), diff --git a/core/server/src/http/personal_access_tokens.rs b/core/server/src/http/personal_access_tokens.rs index 209d9237a1..fbdf7ab3ff 100644 --- a/core/server/src/http/personal_access_tokens.rs +++ b/core/server/src/http/personal_access_tokens.rs @@ -36,6 +36,7 @@ use iggy_common::create_personal_access_token::CreatePersonalAccessToken; use iggy_common::delete_personal_access_token::DeletePersonalAccessToken; use iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken; use iggy_common::{IggyError, RawPersonalAccessToken}; +use secrecy::ExposeSecret; use std::sync::Arc; use tracing::instrument; @@ -129,7 +130,7 @@ async fn login_with_personal_access_token( let user = state .shard .shard() - .login_with_personal_access_token(&command.token, None) + .login_with_personal_access_token(command.token.expose_secret(), None) .error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to login with personal access token") })?; diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs index ea0db9b9fb..b8d5773429 100644 --- a/core/server/src/http/users.rs +++ b/core/server/src/http/users.rs @@ -41,6 +41,7 @@ use iggy_common::Identifier; use iggy_common::IdentityInfo; use iggy_common::Validatable; use iggy_common::{IggyError, UserInfo, UserInfoDetails}; +use secrecy::ExposeSecret; use send_wrapper::SendWrapper; use serde::Deserialize; use std::sync::Arc; @@ -229,7 +230,7 @@ async fn login_user( let user = state .shard .shard() - .login_user(&command.username, &command.password, None) + .login_user(&command.username, command.password.expose_secret(), None) .error(|e: &IggyError| { format!( "{COMPONENT} (error: {e}) - failed to login, username: {}", diff --git a/core/server/src/shard/execution.rs b/core/server/src/shard/execution.rs index 2989b2c6db..1cdaea83d1 100644 --- a/core/server/src/shard/execution.rs +++ b/core/server/src/shard/execution.rs @@ -46,6 +46,7 @@ use iggy_common::{ purge_stream::PurgeStream, purge_topic::PurgeTopic, update_permissions::UpdatePermissions, update_stream::UpdateStream, update_topic::UpdateTopic, update_user::UpdateUser, }; +use secrecy::{ExposeSecret, SecretString}; pub struct DeleteStreamResult { pub stream_id: usize, } @@ -558,7 +559,7 @@ pub async fn execute_create_user( let user = shard.create_user( &command.username, - &command.password, + command.password.expose_secret(), command.status, command.permissions.clone(), )?; @@ -570,7 +571,9 @@ pub async fn execute_create_user( &EntryCommand::CreateUser(CreateUserWithId { user_id: user.id, command: iggy_common::create_user::CreateUser { - password: crypto::hash_password(&command.password), + password: SecretString::from(crypto::hash_password( + command.password.expose_secret(), + )), ..command }, }), @@ -626,8 +629,8 @@ pub async fn execute_change_password( shard.change_password( &command.user_id, - &command.current_password, - &command.new_password, + command.current_password.expose_secret(), + command.new_password.expose_secret(), )?; shard @@ -635,8 +638,10 @@ pub async fn execute_change_password( .apply( user_id, &EntryCommand::ChangePassword(ChangePassword { - current_password: String::new(), - new_password: crypto::hash_password(&command.new_password), + current_password: SecretString::from(String::new()), + new_password: SecretString::from(crypto::hash_password( + command.new_password.expose_secret(), + )), ..command }), ) diff --git a/core/server/src/state/command.rs b/core/server/src/state/command.rs index 0cf32621fb..b9783d2a6c 100644 --- a/core/server/src/state/command.rs +++ b/core/server/src/state/command.rs @@ -47,7 +47,7 @@ use iggy_common::{ }; use std::fmt::{Display, Formatter}; -#[derive(Debug, PartialEq)] +#[derive(Debug)] pub enum EntryCommand { CreateStream(CreateStreamWithId), UpdateStream(UpdateStream), diff --git a/core/server/src/state/models.rs b/core/server/src/state/models.rs index 8af43ec88b..e0b97106f3 100644 --- a/core/server/src/state/models.rs +++ b/core/server/src/state/models.rs @@ -51,7 +51,7 @@ pub struct CreateConsumerGroupWithId { pub command: CreateConsumerGroup, } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct CreateUserWithId { pub user_id: u32, pub command: CreateUser, diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs index 35a5601640..7fc23484f9 100644 --- a/core/server/src/state/system.rs +++ b/core/server/src/state/system.rs @@ -31,6 +31,7 @@ use iggy_common::PersonalAccessToken; use iggy_common::create_user::CreateUser; use iggy_common::defaults::DEFAULT_ROOT_USER_ID; use iggy_common::{IdKind, Identifier, Permissions, UserStatus}; +use secrecy::{ExposeSecret, SecretString}; use std::collections::BTreeMap; use std::fmt::Display; use tracing::{debug, error, info}; @@ -122,7 +123,7 @@ impl SystemState { let root = create_root_user(); let command = CreateUser { username: root.username.clone(), - password: root.password.clone(), + password: SecretString::from(root.password.clone()), status: root.status, permissions: root.permissions.clone(), }; @@ -379,7 +380,7 @@ impl SystemState { let user = UserState { id: user_id, username: command.username, - password_hash: command.password, // This is already hashed + password_hash: command.password.expose_secret().to_owned(), // This is already hashed status: command.status, created_at: entry.timestamp, permissions: command.permissions, @@ -408,7 +409,7 @@ impl SystemState { let user = users .get_mut(&user_id) .unwrap_or_else(|| panic!("{}", format!("User: {user_id} not found"))); - user.password_hash = command.new_password // This is already hashed + user.password_hash = command.new_password.expose_secret().to_owned() // This is already hashed } EntryCommand::UpdatePermissions(command) => { let user_id = find_user_id(&users, &command.user_id); From 66c4c42f8ad02e90c9e352d474641abc82690b20 Mon Sep 17 00:00:00 2001 From: grainier Date: Mon, 16 Mar 2026 09:47:48 +0530 Subject: [PATCH 2/3] fix(repo): address PR review feedback for SecretString migration --- .../create_personal_access_token.rs | 7 +-- core/cli/src/commands/binary_system/login.rs | 3 +- core/common/src/traits/binary_mapper.rs | 5 +- .../auth_config/connection_string.rs | 4 +- .../permissions/personal_access_token.rs | 5 +- .../src/types/user/user_identity_info.rs | 5 +- core/common/src/utils/serde_secret.rs | 53 +++++++++++++++++++ core/connectors/runtime/src/api/config.rs | 9 ++-- core/connectors/runtime/src/context.rs | 2 +- .../test_pat_login_options.rs | 7 +-- core/integration/tests/mcp/mod.rs | 3 +- .../scenarios/authentication_scenario.rs | 3 +- .../scenarios/cross_protocol_pat_scenario.rs | 5 +- .../tests/server/scenarios/user_scenario.rs | 9 ++-- core/sdk/src/http/http_client.rs | 3 +- core/server/src/http/mapper.rs | 3 +- .../server/src/http/personal_access_tokens.rs | 6 ++- core/server/src/state/system.rs | 28 +++++++++- 18 files changed, 130 insertions(+), 30 deletions(-) diff --git a/core/cli/src/commands/binary_personal_access_tokens/create_personal_access_token.rs b/core/cli/src/commands/binary_personal_access_tokens/create_personal_access_token.rs index 4fa96f7000..bf0aef95fe 100644 --- a/core/cli/src/commands/binary_personal_access_tokens/create_personal_access_token.rs +++ b/core/cli/src/commands/binary_personal_access_tokens/create_personal_access_token.rs @@ -23,6 +23,7 @@ use iggy_common::Client; use iggy_common::PersonalAccessTokenExpiry; use iggy_common::create_personal_access_token::CreatePersonalAccessToken; use keyring::Entry; +use secrecy::ExposeSecret; use tracing::{Level, event}; pub struct CreatePersonalAccessTokenCmd { @@ -84,7 +85,7 @@ impl CliCommand for CreatePersonalAccessTokenCmd { if self.store_token { let server_address = format!("iggy:{}", self.server_address); let entry = Entry::new(&server_address, &self.create_token.name)?; - entry.set_password(&token.token)?; + entry.set_password(token.token.expose_secret())?; event!(target: PRINT_TARGET, Level::DEBUG,"Stored token under service: {} and name: {}", server_address, self.create_token.name); event!(target: PRINT_TARGET, Level::INFO, @@ -96,7 +97,7 @@ impl CliCommand for CreatePersonalAccessTokenCmd { }, ); } else if self.quiet_mode { - println!("{}", token.token); + println!("{}", token.token.expose_secret()); } else { event!(target: PRINT_TARGET, Level::INFO, "Personal access token with name: {} and {} created", @@ -107,7 +108,7 @@ impl CliCommand for CreatePersonalAccessTokenCmd { }, ); event!(target: PRINT_TARGET, Level::INFO,"Token: {}", - token.token); + token.token.expose_secret()); } Ok(()) diff --git a/core/cli/src/commands/binary_system/login.rs b/core/cli/src/commands/binary_system/login.rs index 7c99c1db97..efac416323 100644 --- a/core/cli/src/commands/binary_system/login.rs +++ b/core/cli/src/commands/binary_system/login.rs @@ -23,6 +23,7 @@ use anyhow::Context; use async_trait::async_trait; use iggy_common::Client; use iggy_common::SEC_IN_MICRO; +use secrecy::ExposeSecret; use tracing::{Level, event}; const DEFAULT_LOGIN_SESSION_TIMEOUT: u64 = SEC_IN_MICRO * 15 * 60; @@ -94,7 +95,7 @@ impl CliCommand for LoginCmd { ) })?; - self.server_session.store(&token.token)?; + self.server_session.store(token.token.expose_secret())?; event!(target: PRINT_TARGET, Level::INFO, "Successfully logged into Iggy server {}", diff --git a/core/common/src/traits/binary_mapper.rs b/core/common/src/traits/binary_mapper.rs index 727f16aa15..f211febaf3 100644 --- a/core/common/src/traits/binary_mapper.rs +++ b/core/common/src/traits/binary_mapper.rs @@ -24,6 +24,7 @@ use crate::{ Stream, StreamDetails, Topic, TopicDetails, UserInfo, UserInfoDetails, UserStatus, }; use bytes::Bytes; +use secrecy::SecretString; use std::collections::HashMap; use std::str::from_utf8; @@ -469,7 +470,9 @@ pub fn map_raw_pat(payload: Bytes) -> Result let token = from_utf8(&payload[1..1 + token_length as usize]) .map_err(|_| IggyError::InvalidUtf8)? .to_string(); - Ok(RawPersonalAccessToken { token }) + Ok(RawPersonalAccessToken { + token: SecretString::from(token), + }) } pub fn map_client(payload: Bytes) -> Result { diff --git a/core/common/src/types/configuration/auth_config/connection_string.rs b/core/common/src/types/configuration/auth_config/connection_string.rs index f623e69845..35332474a6 100644 --- a/core/common/src/types/configuration/auth_config/connection_string.rs +++ b/core/common/src/types/configuration/auth_config/connection_string.rs @@ -106,7 +106,7 @@ impl ConnectionString { return Ok(ConnectionString { server_address: server_address.to_owned(), auto_login: AutoLogin::Enabled(Credentials::PersonalAccessToken( - SecretString::from(pat_token.to_owned()), + SecretString::from(pat_token), )), options: connection_string_options, }); @@ -116,7 +116,7 @@ impl ConnectionString { server_address: server_address.to_owned(), auto_login: AutoLogin::Enabled(Credentials::UsernamePassword( username.to_owned(), - SecretString::from(password.to_owned()), + SecretString::from(password), )), options: connection_string_options, }) diff --git a/core/common/src/types/permissions/personal_access_token.rs b/core/common/src/types/permissions/personal_access_token.rs index 06ea4c1d1b..6fc5be45c9 100644 --- a/core/common/src/types/permissions/personal_access_token.rs +++ b/core/common/src/types/permissions/personal_access_token.rs @@ -16,7 +16,9 @@ * under the License. */ +use crate::utils::serde_secret::serialize_secret; use crate::utils::timestamp::IggyTimestamp; +use secrecy::SecretString; use serde::{Deserialize, Serialize}; use std::fmt; @@ -26,7 +28,8 @@ use std::fmt; #[derive(Serialize, Deserialize)] pub struct RawPersonalAccessToken { /// The unique token that should be securely stored by the user and can be used for authentication. - pub token: String, + #[serde(serialize_with = "serialize_secret")] + pub token: SecretString, } impl fmt::Debug for RawPersonalAccessToken { diff --git a/core/common/src/types/user/user_identity_info.rs b/core/common/src/types/user/user_identity_info.rs index 35cec07699..fef4946d3d 100644 --- a/core/common/src/types/user/user_identity_info.rs +++ b/core/common/src/types/user/user_identity_info.rs @@ -17,6 +17,8 @@ */ use crate::UserId; +use crate::utils::serde_secret::serialize_secret; +use secrecy::SecretString; use serde::{Deserialize, Serialize}; use std::fmt; @@ -39,7 +41,8 @@ pub struct IdentityInfo { #[derive(Serialize, Deserialize)] pub struct TokenInfo { /// The value of token. - pub token: String, + #[serde(serialize_with = "serialize_secret")] + pub token: SecretString, /// The expiry of token. pub expiry: u64, } diff --git a/core/common/src/utils/serde_secret.rs b/core/common/src/utils/serde_secret.rs index 7036981609..8c6a7700c3 100644 --- a/core/common/src/utils/serde_secret.rs +++ b/core/common/src/utils/serde_secret.rs @@ -51,3 +51,56 @@ pub fn serialize_optional_secret( None => serializer.serialize_none(), } } + +#[cfg(test)] +mod tests { + use super::*; + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize)] + struct WithSecret { + #[serde(serialize_with = "serialize_secret")] + password: SecretString, + } + + #[derive(Serialize, Deserialize)] + struct WithOptionalSecret { + #[serde(serialize_with = "serialize_optional_secret")] + token: Option, + } + + #[test] + fn serialize_secret_preserves_value_in_json() { + let s = WithSecret { + password: SecretString::from("my_password"), + }; + let json = serde_json::to_string(&s).unwrap(); + assert_eq!(json, r#"{"password":"my_password"}"#); + } + + #[test] + fn serialize_secret_roundtrips_through_json() { + let original = WithSecret { + password: SecretString::from("roundtrip"), + }; + let json = serde_json::to_string(&original).unwrap(); + let restored: WithSecret = serde_json::from_str(&json).unwrap(); + assert_eq!(restored.password.expose_secret(), "roundtrip"); + } + + #[test] + fn serialize_optional_secret_with_some_value() { + let s = WithOptionalSecret { + token: Some(SecretString::from("tok_123")), + }; + let json = serde_json::to_string(&s).unwrap(); + assert_eq!(json, r#"{"token":"tok_123"}"#); + } + + #[test] + fn serialize_optional_secret_with_none() { + let s = WithOptionalSecret { token: None }; + let json = serde_json::to_string(&s).unwrap(); + assert_eq!(json, r#"{"token":null}"#); + } +} diff --git a/core/connectors/runtime/src/api/config.rs b/core/connectors/runtime/src/api/config.rs index e2ecdf1613..72b48f9a07 100644 --- a/core/connectors/runtime/src/api/config.rs +++ b/core/connectors/runtime/src/api/config.rs @@ -21,6 +21,8 @@ use crate::configs::connectors::ConfigFormat; use crate::error::RuntimeError; use axum::http::{HeaderValue, Method}; use configs_derive::ConfigEnv; +use iggy_common::serde_secret::serialize_secret; +use secrecy::SecretString; use serde::{Deserialize, Serialize}; use std::fmt::Formatter; use tower_http::cors::{AllowOrigin, CorsLayer}; @@ -35,8 +37,9 @@ pub const TEXT_HEADER: HeaderValue = HeaderValue::from_static("text/plain"); pub struct HttpConfig { pub enabled: bool, pub address: String, - #[config_env(secret)] - pub api_key: String, + #[config_env(secret, leaf)] + #[serde(serialize_with = "serialize_secret")] + pub api_key: SecretString, pub cors: HttpCorsConfig, pub tls: HttpTlsConfig, pub metrics: HttpMetricsConfig, @@ -179,7 +182,7 @@ impl Default for HttpConfig { Self { enabled: true, address: "localhost:8081".to_owned(), - api_key: "".to_owned(), + api_key: SecretString::from(""), cors: HttpCorsConfig::default(), tls: HttpTlsConfig::default(), metrics: HttpMetricsConfig::default(), diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index b5a1c4c411..a15d4bea67 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -68,7 +68,7 @@ pub fn init( RuntimeContext { sinks, sources, - api_key: SecretString::from(config.http.api_key.to_owned()), + api_key: config.http.api_key.clone(), config_provider: Arc::from(config_provider), metrics, start_time: IggyTimestamp::now(), diff --git a/core/integration/tests/cli/personal_access_token/test_pat_login_options.rs b/core/integration/tests/cli/personal_access_token/test_pat_login_options.rs index 1441639982..2709c5254e 100644 --- a/core/integration/tests/cli/personal_access_token/test_pat_login_options.rs +++ b/core/integration/tests/cli/personal_access_token/test_pat_login_options.rs @@ -23,6 +23,7 @@ use iggy::prelude::Client; use iggy::prelude::PersonalAccessTokenExpiry; use keyring::Entry; use predicates::str::{contains, starts_with}; +use secrecy::ExposeSecret; use serial_test::parallel; use std::fmt::{Display, Formatter, Result}; @@ -88,11 +89,11 @@ impl IggyCmdTestCase for TestLoginOptions { .await; assert!(token.is_ok()); let token = token.unwrap(); - let token_value = token.token.clone(); - self.token_value = Some(token.token); + let token_value = token.token.expose_secret().to_owned(); self.keyring - .set_password(token_value.as_str()) + .set_password(&token_value) .expect("Failed to set token"); + self.token_value = Some(token_value); } fn get_command(&self) -> IggyCmdCommand { diff --git a/core/integration/tests/mcp/mod.rs b/core/integration/tests/mcp/mod.rs index 2d08b4b4fb..61560bb8cc 100644 --- a/core/integration/tests/mcp/mod.rs +++ b/core/integration/tests/mcp/mod.rs @@ -31,6 +31,7 @@ use rmcp::{ serde::de::DeserializeOwned, serde_json::{self, json}, }; +use secrecy::ExposeSecret; async fn invoke( client: &McpClient, @@ -512,7 +513,7 @@ async fn should_create_personal_access_token(harness: &TestHarness) { ) .await; - assert!(!token.token.is_empty()); + assert!(!token.token.expose_secret().is_empty()); } #[iggy_harness(server(mcp), seed = seeds::mcp_standard)] diff --git a/core/integration/tests/server/scenarios/authentication_scenario.rs b/core/integration/tests/server/scenarios/authentication_scenario.rs index 4dc440c4f2..f7b08b5201 100644 --- a/core/integration/tests/server/scenarios/authentication_scenario.rs +++ b/core/integration/tests/server/scenarios/authentication_scenario.rs @@ -30,6 +30,7 @@ use crate::server::scenarios::create_client; use bytes::Bytes; use iggy::prelude::*; use integration::harness::{TestHarness, login_root}; +use secrecy::ExposeSecret; use server::binary::command::ServerCommand; use strum::IntoEnumIterator; @@ -90,7 +91,7 @@ pub async fn run(harness: &TestHarness) { ); let identity = client - .login_with_personal_access_token(&raw_pat.token) + .login_with_personal_access_token(raw_pat.token.expose_secret()) .await .expect("PAT login should work"); assert_eq!(identity.user_id, 0, "PAT should authenticate as root"); diff --git a/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs b/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs index bedfcfe678..130c557a6e 100644 --- a/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs +++ b/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs @@ -24,6 +24,7 @@ use iggy::prelude::*; use iggy_common::TransportProtocol; use integration::harness::TestHarness; use integration::iggy_harness; +use secrecy::ExposeSecret; const PAT_NAME: &str = "cross-protocol-test-pat"; const TCP_CLIENT_COUNT: usize = 20; @@ -48,7 +49,7 @@ pub async fn should_see_pat_created_via_http_when_listing_via_tcp(harness: &Test .await .expect("Failed to create PAT via HTTP"); - assert!(!created_pat.token.is_empty()); + assert!(!created_pat.token.expose_secret().is_empty()); let http_pats = http_client .get_personal_access_tokens() @@ -107,7 +108,7 @@ pub async fn should_see_pat_created_via_tcp_when_listing_via_http(harness: &Test .await .expect("Failed to create PAT via TCP"); - assert!(!created_pat.token.is_empty()); + assert!(!created_pat.token.expose_secret().is_empty()); let http_client = create_root_client(harness, TransportProtocol::Http).await; diff --git a/core/integration/tests/server/scenarios/user_scenario.rs b/core/integration/tests/server/scenarios/user_scenario.rs index f582711b32..d1904d6c16 100644 --- a/core/integration/tests/server/scenarios/user_scenario.rs +++ b/core/integration/tests/server/scenarios/user_scenario.rs @@ -24,6 +24,7 @@ use iggy::prelude::defaults::DEFAULT_ROOT_USERNAME; use iggy::prelude::{GlobalPermissions, Permissions}; use iggy::prelude::{PersonalAccessTokenClient, SEC_IN_MICRO, SystemClient, UserClient}; use integration::harness::{TestHarness, assert_clean_system, login_root}; +use secrecy::ExposeSecret; pub async fn run(harness: &TestHarness) { let client = create_client(harness).await; @@ -146,14 +147,14 @@ pub async fn run(harness: &TestHarness) { .await .unwrap(); - assert!(!raw_pat1.token.is_empty()); + assert!(!raw_pat1.token.expose_secret().is_empty()); let raw_pat2 = client .create_personal_access_token(pat_name2, PersonalAccessTokenExpiry::NeverExpire) .await .unwrap(); - assert!(!raw_pat2.token.is_empty()); + assert!(!raw_pat2.token.expose_secret().is_empty()); // 14. Get personal access tokens and verify that the token is there let personal_access_tokens = client.get_personal_access_tokens().await.unwrap(); @@ -164,14 +165,14 @@ pub async fn run(harness: &TestHarness) { // 16. Login with the personal access tokens let identity_info = client - .login_with_personal_access_token(&raw_pat1.token) + .login_with_personal_access_token(raw_pat1.token.expose_secret()) .await .unwrap(); assert_eq!(identity_info.user_id, 1); let identity_info = client - .login_with_personal_access_token(&raw_pat2.token) + .login_with_personal_access_token(raw_pat2.token.expose_secret()) .await .unwrap(); diff --git a/core/sdk/src/http/http_client.rs b/core/sdk/src/http/http_client.rs index a6ddcaa650..e2142f4f52 100644 --- a/core/sdk/src/http/http_client.rs +++ b/core/sdk/src/http/http_client.rs @@ -29,6 +29,7 @@ use reqwest::{Response, StatusCode, Url}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; use reqwest_tracing::{SpanBackendWithUrl, TracingMiddleware}; +use secrecy::ExposeSecret; use serde::Serialize; use std::ops::Deref; use std::str::FromStr; @@ -249,7 +250,7 @@ impl HttpTransport for HttpClient { } let access_token = identity.access_token.as_ref().unwrap(); - self.set_access_token(Some(access_token.token.clone())) + self.set_access_token(Some(access_token.token.expose_secret().to_owned())) .await; Ok(()) } diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs index 879d983382..04bc163967 100644 --- a/core/server/src/http/mapper.rs +++ b/core/server/src/http/mapper.rs @@ -24,6 +24,7 @@ use iggy_common::PersonalAccessToken; use iggy_common::{ConsumerGroupDetails, ConsumerGroupInfo, ConsumerGroupMember, IggyByteSize}; use iggy_common::{IdentityInfo, PersonalAccessTokenInfo, TokenInfo, TopicDetails}; use iggy_common::{UserInfo, UserInfoDetails}; +use secrecy::SecretString; pub fn map_user(user: &User) -> UserInfoDetails { UserInfoDetails { @@ -105,7 +106,7 @@ pub fn map_generated_access_token_to_identity_info(token: GeneratedToken) -> Ide IdentityInfo { user_id: token.user_id, access_token: Some(TokenInfo { - token: token.access_token, + token: SecretString::from(token.access_token), expiry: token.access_token_expiry, }), } diff --git a/core/server/src/http/personal_access_tokens.rs b/core/server/src/http/personal_access_tokens.rs index fbdf7ab3ff..1ff235ddb3 100644 --- a/core/server/src/http/personal_access_tokens.rs +++ b/core/server/src/http/personal_access_tokens.rs @@ -36,7 +36,7 @@ use iggy_common::create_personal_access_token::CreatePersonalAccessToken; use iggy_common::delete_personal_access_token::DeletePersonalAccessToken; use iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken; use iggy_common::{IggyError, RawPersonalAccessToken}; -use secrecy::ExposeSecret; +use secrecy::{ExposeSecret, SecretString}; use std::sync::Arc; use tracing::instrument; @@ -93,7 +93,9 @@ async fn create_personal_access_token( match state.shard.send_to_control_plane(request).await? { ShardResponse::CreatePersonalAccessTokenResponse(_, token) => { - Ok(Json(RawPersonalAccessToken { token })) + Ok(Json(RawPersonalAccessToken { + token: SecretString::from(token), + })) } ShardResponse::ErrorResponse(err) => Err(err.into()), _ => unreachable!("Expected CreatePersonalAccessTokenResponse"), diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs index 7fc23484f9..285df37448 100644 --- a/core/server/src/state/system.rs +++ b/core/server/src/state/system.rs @@ -75,14 +75,24 @@ pub struct PartitionState { pub created_at: IggyTimestamp, } -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct PersonalAccessTokenState { pub name: String, pub token_hash: String, pub expiry_at: Option, } -#[derive(Debug, Clone)] +impl std::fmt::Debug for PersonalAccessTokenState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PersonalAccessTokenState") + .field("name", &self.name) + .field("token_hash", &"[REDACTED]") + .field("expiry_at", &self.expiry_at) + .finish() + } +} + +#[derive(Clone)] pub struct UserState { pub id: u32, pub username: String, @@ -93,6 +103,20 @@ pub struct UserState { pub personal_access_tokens: AHashMap, } +impl std::fmt::Debug for UserState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("UserState") + .field("id", &self.id) + .field("username", &self.username) + .field("password_hash", &"[REDACTED]") + .field("status", &self.status) + .field("created_at", &self.created_at) + .field("permissions", &self.permissions) + .field("personal_access_tokens", &self.personal_access_tokens) + .finish() + } +} + #[derive(Debug, Clone)] pub struct ConsumerGroupState { pub id: u32, From be993fc78f39357511108fed4f762ffcf8a07329 Mon Sep 17 00:00:00 2001 From: grainier Date: Mon, 16 Mar 2026 18:15:12 +0530 Subject: [PATCH 3/3] refactor(server): add TODO for SecretString migration of hash fields --- core/server/src/state/system.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs index 285df37448..666d624759 100644 --- a/core/server/src/state/system.rs +++ b/core/server/src/state/system.rs @@ -75,6 +75,7 @@ pub struct PartitionState { pub created_at: IggyTimestamp, } +// TODO: consider converting token_hash to SecretString (requires updating the full hash flow across crates) #[derive(Clone)] pub struct PersonalAccessTokenState { pub name: String, @@ -92,6 +93,7 @@ impl std::fmt::Debug for PersonalAccessTokenState { } } +// TODO: consider converting password_hash to SecretString (requires updating the full hash flow across crates) #[derive(Clone)] pub struct UserState { pub id: u32,