diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b6d5799..3bb0c05f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ All notable changes to this project will be documented in this file. ### Changed - Refactor: move server configuration properties from the command line to configuration files. ([#911]). -- Add support for ZooKeeper to KRaft migration ([#923]). +- Add support for ZooKeeper to KRaft migration ([#923]), ([#933]). - Bump testing-tools to `0.3.0-stackable0.0.0-dev` ([#925]). ### Removed @@ -34,6 +34,7 @@ All notable changes to this project will be documented in this file. [#929]: https://github.com/stackabletech/kafka-operator/pull/929 [#930]: https://github.com/stackabletech/kafka-operator/pull/930 [#932]: https://github.com/stackabletech/kafka-operator/pull/932 +[#933]: https://github.com/stackabletech/kafka-operator/pull/933 ## [25.11.0] - 2025-11-07 diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index fcfc50b1..b1754b57 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -254,7 +254,7 @@ impl v1alpha1::KafkaCluster { match &self.spec.cluster_config.metadata_manager { Some(manager) => match manager.clone() { MetadataManager::ZooKeeper => { - if self.spec.image.product_version().starts_with("4\\.") { + if !self.spec.image.product_version().starts_with("3.") { Err(Error::Kafka4RequiresKraftMetadataManager) } else { Ok(MetadataManager::ZooKeeper) @@ -263,10 +263,10 @@ impl v1alpha1::KafkaCluster { _ => Ok(MetadataManager::KRaft), }, None => { - if self.spec.image.product_version().starts_with("4\\.") { - Ok(MetadataManager::KRaft) - } else { + if self.spec.image.product_version().starts_with("3.") { Ok(MetadataManager::ZooKeeper) + } else { + Ok(MetadataManager::KRaft) } } } @@ -508,6 +508,8 @@ pub enum MetadataManager { #[cfg(test)] mod tests { + use rstest::rstest; + use super::*; fn get_server_secret_class(kafka: &v1alpha1::KafkaCluster) -> Option { @@ -686,4 +688,66 @@ mod tests { tls::internal_tls_default() ); } + + #[rstest] + #[case("3.9.1", None, Ok(MetadataManager::ZooKeeper))] + #[case( + "3.9.1", + Some(MetadataManager::ZooKeeper), + Ok(MetadataManager::ZooKeeper) + )] + #[case("3.9.1", Some(MetadataManager::KRaft), Ok(MetadataManager::KRaft))] + #[case("4.1.1", None, Ok(MetadataManager::KRaft))] + #[case( + "4.1.1", + Some(MetadataManager::ZooKeeper), + Err(Error::Kafka4RequiresKraftMetadataManager) + )] + #[case("4.1.1", Some(MetadataManager::KRaft), Ok(MetadataManager::KRaft))] + fn test_effective_metadata_manager( + #[case] product_version: &str, + #[case] metadata_manager: Option, + #[case] expected: Result, + ) { + let input = format!( + r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: kafka + spec: + image: + productVersion: {product_version} + controllers: + roleGroups: + default: + replicas: 1 + brokers: + roleGroups: + default: + replicas: 1 + "# + ); + let mut kafka: v1alpha1::KafkaCluster = + serde_yaml::from_str(&input).expect("illegal test input"); + + if metadata_manager.is_some() { + kafka.spec.cluster_config.metadata_manager = metadata_manager; + } + + match kafka.effective_metadata_manager() { + Ok(manager) => match expected { + Ok(expected_manager) => assert_eq!(manager, expected_manager), + Err(_) => { + panic!("expected error but got metadata manager : {}", manager) + } + }, + Err(err) => match expected { + Ok(_) => panic!("expected Ok but got error: {}", err), + Err(expected_err) => { + assert_eq!(format!("{}", err), format!("{}", expected_err)) + } + }, + }; + } }