Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/v1alpha1/apikey_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ type APIKeySpec struct {

// Revoke indicates whether this API key should be revoked
// +optional
Revoke bool `json:"revoke,omitempty"`
Revoke bool `json:"revoke"`

// EncryptionKey contains the public key used to encrypt the token
// +optional
EncryptionKey *EncryptionKey `json:"encryptionKey,omitempty"`

// ExportPlaintextToken indicates whether the token should be exported in plaintext
// +optional
ExportPlaintextToken *bool `json:"exportPlaintextToken,omitempty"`
ExportPlaintextToken *bool `json:"exportPlaintextToken"`
}

// EncryptionKey contains a public key used for encryption
Expand Down
6 changes: 3 additions & 3 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ type ProducerConfig struct {
MaxPendingMessagesAcrossPartitions int `json:"maxPendingMessagesAcrossPartitions,omitempty" yaml:"maxPendingMessagesAcrossPartitions"`

// +optional
UseThreadLocalProducers bool `json:"useThreadLocalProducers,omitempty" yaml:"useThreadLocalProducers"`
UseThreadLocalProducers bool `json:"useThreadLocalProducers" yaml:"useThreadLocalProducers"`

// +optional
CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty" yaml:"cryptoConfig"`
Expand All @@ -191,7 +191,7 @@ type ConsumerConfig struct {
SerdeClassName string `json:"serdeClassName,omitempty" yaml:"serdeClassName"`

// +optional
RegexPattern bool `json:"regexPattern,omitempty" yaml:"regexPattern"`
RegexPattern bool `json:"regexPattern" yaml:"regexPattern"`

// +optional
ReceiverQueueSize int `json:"receiverQueueSize,omitempty" yaml:"receiverQueueSize"`
Expand All @@ -206,7 +206,7 @@ type ConsumerConfig struct {
CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty" yaml:"cryptoConfig"`

// +optional
PoolMessages bool `json:"poolMessages,omitempty" yaml:"poolMessages"`
PoolMessages bool `json:"poolMessages" yaml:"poolMessages"`
}

// CryptoConfig represents the configuration for the crypto of the pulsar functions and connectors
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/computeflinkdeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ type ComputeFlinkDeploymentList struct {

// VvpRestoreStrategy defines the restore strategy of the deployment
type VvpRestoreStrategy struct {
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState"`
Kind string `json:"kind,omitempty"`
}

Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/computeworkspace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ComputeWorkspaceSpec struct {

// UseExternalAccess is the flag to indicate whether the workspace will use external access.
// +optional
UseExternalAccess *bool `json:"useExternalAccess,omitempty"`
UseExternalAccess *bool `json:"useExternalAccess"`

// FlinkBlobStorage is the configuration for the Flink blob storage.
// +optional
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/pulsarconnection_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ type PulsarConnectionSpec struct {
// TLSEnableHostnameVerification indicates whether to verify the hostname of the broker.
// Only used when using secure urls.
// +optional
TLSEnableHostnameVerification bool `json:"tlsEnableHostnameVerification,omitempty"`
TLSEnableHostnameVerification bool `json:"tlsEnableHostnameVerification"`

// TLSAllowInsecureConnection indicates whether to allow insecure connection to the broker.
// +optional
TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection,omitempty"`
TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection"`

// TLSTrustCertsFilePath Path for the TLS certificate used to validate the broker endpoint when using TLS.
// +optional
Expand Down
14 changes: 7 additions & 7 deletions api/v1alpha1/pulsarfunction_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,27 @@ type PulsarFunctionSpec struct {

// CleanupSubscription is the flag to indicate whether the subscription should be cleaned up when the function is deleted
// +optional
CleanupSubscription *bool `json:"cleanupSubscription,omitempty"`
CleanupSubscription *bool `json:"cleanupSubscription"`

// RetainOrdering is the flag to indicate whether the function should retain ordering
// +optional
RetainOrdering *bool `json:"retainOrdering,omitempty"`
RetainOrdering *bool `json:"retainOrdering"`

// RetainKeyOrdering is the flag to indicate whether the function should retain key ordering
// +optional
RetainKeyOrdering *bool `json:"retainKeyOrdering,omitempty"`
RetainKeyOrdering *bool `json:"retainKeyOrdering"`

// BatchBuilder is the batch builder that the function uses
// +optional
BatchBuilder *string `json:"batchBuilder,omitempty"`

// ForwardSourceMessageProperty is the flag to indicate whether the function should forward source message properties
// +optional
ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"`
ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty"`

// AutoAck is the flag to indicate whether the function should auto ack
// +optional
AutoAck *bool `json:"autoAck,omitempty"`
AutoAck *bool `json:"autoAck"`

// Parallelism is the parallelism of the function
// +optional
Expand Down Expand Up @@ -186,11 +186,11 @@ type PulsarFunctionSpec struct {

// ExposePulsarAdminClientEnabled is the flag to indicate whether the function should expose pulsar admin client
// +optional
ExposePulsarAdminClientEnabled *bool `json:"exposePulsarAdminClientEnabled,omitempty"`
ExposePulsarAdminClientEnabled *bool `json:"exposePulsarAdminClientEnabled"`

// SkipToLatest is the flag to indicate whether the function should skip to latest
// +optional
SkipToLatest *bool `json:"skipToLatest,omitempty"`
SkipToLatest *bool `json:"skipToLatest"`

// SubscriptionPosition is the subscription position of the function
// +optional
Expand Down
16 changes: 8 additions & 8 deletions api/v1alpha1/pulsarnamespace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// TopicAutoCreationConfig defines the configuration for automatic topic creation
type TopicAutoCreationConfig struct {
// Allow specifies whether to allow automatic topic creation
Allow bool `json:"allow,omitempty"`
Allow bool `json:"allow"`

// Type specifies the type of automatically created topics
// +kubebuilder:validation:Enum=partitioned;non-partitioned
Expand Down Expand Up @@ -125,7 +125,7 @@ type InactiveTopicPolicies struct {

// DeleteWhileInactive specifies whether to delete topics while they are inactive
// +optional
DeleteWhileInactive *bool `json:"deleteWhileInactive,omitempty"`
DeleteWhileInactive *bool `json:"deleteWhileInactive"`
}

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -167,7 +167,7 @@ type PulsarNamespaceSpec struct {
// When enabled, producers must provide a schema when publishing messages.
// If not specified, the cluster's default schema validation enforcement setting will be used.
// +optional
SchemaValidationEnforced *bool `json:"schemaValidationEnforced,omitempty"`
SchemaValidationEnforced *bool `json:"schemaValidationEnforced"`

// MaxProducersPerTopic sets the maximum number of producers allowed on a single topic in the namespace.
// +optional
Expand Down Expand Up @@ -246,7 +246,7 @@ type PulsarNamespaceSpec struct {

// Deduplication controls whether to enable message deduplication for the namespace.
// +optional
Deduplication *bool `json:"deduplication,omitempty"`
Deduplication *bool `json:"deduplication"`

// BookieAffinityGroup is the name of the namespace isolation policy to apply to the namespace.
BookieAffinityGroup *BookieAffinityGroupData `json:"bookieAffinityGroup,omitempty"`
Expand Down Expand Up @@ -309,17 +309,17 @@ type PulsarNamespaceSpec struct {
// IsAllowAutoUpdateSchema specifies whether to allow automatic schema updates.
// When enabled, producers can automatically update schemas without manual approval.
// +optional
IsAllowAutoUpdateSchema *bool `json:"isAllowAutoUpdateSchema,omitempty"`
IsAllowAutoUpdateSchema *bool `json:"isAllowAutoUpdateSchema"`

// ValidateProducerName specifies whether to validate producer names.
// When enabled, producer names must follow specific naming conventions.
// +optional
ValidateProducerName *bool `json:"validateProducerName,omitempty"`
ValidateProducerName *bool `json:"validateProducerName"`

// EncryptionRequired specifies whether message encryption is required for this namespace.
// When enabled, all messages published to topics in this namespace must be encrypted.
// +optional
EncryptionRequired *bool `json:"encryptionRequired,omitempty"`
EncryptionRequired *bool `json:"encryptionRequired"`

// SubscriptionAuthMode specifies the subscription authentication mode for this namespace.
// Valid values are "None" and "Prefix".
Expand Down Expand Up @@ -370,7 +370,7 @@ type PulsarNamespaceStatus struct {
// GeoReplicationEnabled indicates whether geo-replication between two Pulsar instances (via PulsarGeoReplication)
// is enabled for the namespace
// +optional
GeoReplicationEnabled bool `json:"geoReplicationEnabled,omitempty"`
GeoReplicationEnabled bool `json:"geoReplicationEnabled"`
}

//+kubebuilder:object:root=true
Expand Down
8 changes: 4 additions & 4 deletions api/v1alpha1/pulsarsink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,19 @@ type PulsarSinkSpec struct {

// CleanupSubscription is the flag to enable or disable the cleanup of subscription
// +optional
CleanupSubscription *bool `json:"cleanupSubscription,omitempty"`
CleanupSubscription *bool `json:"cleanupSubscription"`

// RetainOrdering is the flag to enable or disable the retain ordering
// +optional
RetainOrdering *bool `json:"retainOrdering,omitempty"`
RetainOrdering *bool `json:"retainOrdering"`

// RetainKeyOrdering is the flag to enable or disable the retain key ordering
// +optional
RetainKeyOrdering *bool `json:"retainKeyOrdering,omitempty"`
RetainKeyOrdering *bool `json:"retainKeyOrdering"`

// AutoAck is the flag to enable or disable the auto ack
// +optional
AutoAck *bool `json:"autoAck,omitempty"`
AutoAck *bool `json:"autoAck"`

// Parallelism is the parallelism of the PulsarSink
// +optional
Expand Down
12 changes: 6 additions & 6 deletions api/v1alpha1/pulsartopic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type PulsarTopicSpec struct {
// Defaults to true if not specified.
// +kubebuilder:default=true
// +optional
Persistent *bool `json:"persistent,omitempty"`
Persistent *bool `json:"persistent"`

// Partitions specifies the number of partitions for a partitioned topic.
// Set to 0 for a non-partitioned topic.
Expand Down Expand Up @@ -141,7 +141,7 @@ type PulsarTopicSpec struct {

// Deduplication controls whether to enable message deduplication for the topic.
// +optional
Deduplication *bool `json:"deduplication,omitempty"`
Deduplication *bool `json:"deduplication"`

// CompactionThreshold specifies the size threshold in bytes for automatic topic compaction.
// When the topic reaches this size, compaction will be triggered automatically.
Expand Down Expand Up @@ -194,7 +194,7 @@ type PulsarTopicSpec struct {
// SchemaValidationEnforced determines whether schema validation is enforced for the topic.
// When enabled, only messages that conform to the topic's schema will be accepted.
// +optional
SchemaValidationEnforced *bool `json:"schemaValidationEnforced,omitempty"`
SchemaValidationEnforced *bool `json:"schemaValidationEnforced"`

// SubscriptionDispatchRate defines the message dispatch rate limiting policy for subscriptions.
// This controls the rate at which messages are delivered to consumers per subscription.
Expand Down Expand Up @@ -237,7 +237,7 @@ type PulsarTopicSpec struct {
type DelayedDeliveryData struct {
// Active determines whether delayed delivery is enabled for the topic
// +optional
Active *bool `json:"active,omitempty"`
Active *bool `json:"active"`

// TickTimeMillis specifies the tick time for delayed message delivery in milliseconds
// +optional
Expand Down Expand Up @@ -288,7 +288,7 @@ type OffloadPolicies struct {
// This is a local type definition that mirrors the external library's AutoSubscriptionCreationOverride
// to ensure proper Kubernetes deep copy generation.
type AutoSubscriptionCreationOverride struct {
AllowAutoSubscriptionCreation bool `json:"allowAutoSubscriptionCreation,omitempty"`
AllowAutoSubscriptionCreation bool `json:"allowAutoSubscriptionCreation"`
}

// SchemaCompatibilityStrategy defines the schema compatibility strategy for a topic.
Expand Down Expand Up @@ -321,7 +321,7 @@ type PulsarTopicStatus struct {
// GeoReplicationEnabled indicates whether geo-replication is enabled for this topic.
// This is set to true when GeoReplicationRefs are configured in the spec and successfully applied.
// +optional
GeoReplicationEnabled bool `json:"geoReplicationEnabled,omitempty"`
GeoReplicationEnabled bool `json:"geoReplicationEnabled"`
}

//+kubebuilder:object:root=true
Expand Down
84 changes: 84 additions & 0 deletions controllers/pulsartopic_boolean_persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2026 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers

import (
"context"
"encoding/json"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
)

var _ = Describe("PulsarTopic Boolean Field Persistence", func() {
Context("when PulsarTopic status has GeoReplicationEnabled set to false", func() {
It("should preserve false value in JSON serialization", func() {
ctx := context.Background()
namespace := "default"
topicName := "test-geo-replication-false"
connectionName := "test-connection"

connection := &resourcev1alpha1.PulsarConnection{
ObjectMeta: metav1.ObjectMeta{
Name: connectionName,
Namespace: namespace,
},
Spec: resourcev1alpha1.PulsarConnectionSpec{
AdminServiceURL: "http://localhost:8080",
BrokerServiceURL: "pulsar://localhost:6650",
},
}
Expect(k8sClient.Create(ctx, connection)).Should(Succeed())

topic := &resourcev1alpha1.PulsarTopic{
ObjectMeta: metav1.ObjectMeta{
Name: topicName,
Namespace: namespace,
},
Spec: resourcev1alpha1.PulsarTopicSpec{
Name: "persistent://public/default/" + topicName,
ConnectionRef: corev1.LocalObjectReference{
Name: connectionName,
},
},
Status: resourcev1alpha1.PulsarTopicStatus{
GeoReplicationEnabled: false,
},
}
Expect(k8sClient.Create(ctx, topic)).Should(Succeed())

// Get the created topic and verify status
createdTopic := &resourcev1alpha1.PulsarTopic{}
topicKey := types.NamespacedName{Name: topicName, Namespace: namespace}
Expect(k8sClient.Get(ctx, topicKey, createdTopic)).Should(Succeed())
Expect(createdTopic.Status.GeoReplicationEnabled).Should(Equal(false))

// Verify the serialization
statusJSON, err := json.Marshal(createdTopic.Status)
Expect(err).Should(Succeed())
var statusMap map[string]interface{}
Expect(json.Unmarshal(statusJSON, &statusMap)).Should(Succeed())
Expect(statusMap).Should(HaveKey("geoReplicationEnabled"))
geoRepValue, exists := statusMap["geoReplicationEnabled"]
Expect(exists).Should(BeTrue())
Expect(geoRepValue).Should(Equal(false))
})
})
})
Loading