Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,16 @@ public class ConfigOptions {
+ "Each listener can be associated with a specific authentication protocol. "
+ "Listeners not included in the map will use PLAINTEXT by default, which does not require authentication.");

public static final ConfigOption<List<String>> SERVER_SASL_USERS =
key("security.sasl.users")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
"List of user credentials for SASL/PLAIN authentication in 'username:password' format. "
+ "For example: 'admin:admin-secret,bob:bob-secret'. "
+ "This is syntactic sugar that auto-generates the JAAS config string.");
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option stores plaintext credentials and is now exposed as a regular cluster config. To reduce accidental leakage (logs, get_cluster_configs, UIs), consider marking this option as sensitive (if the config framework supports it) and/or ensuring config listing APIs mask/redact values for security.sasl.users (and the derived security.sasl.plain.jaas.config). Also consider documenting the security implications explicitly in the description.

Suggested change
+ "This is syntactic sugar that auto-generates the JAAS config string.");
+ "This is syntactic sugar that auto-generates the JAAS config string. "
+ "Warning: this option stores plaintext credentials and should be handled as sensitive configuration. "
+ "Avoid placing it in shared config files or exposing it through logs, config listing APIs, or UIs.");

Copilot uses AI. Check for mistakes.

public static final ConfigOption<Integer> TABLET_SERVER_ID =
key("tablet-server.id")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public static Supplier<ClientAuthenticator> loadClientAuthenticatorSupplier(
* authenticators.
*/
public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticatorSuppliers(
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes a public static method signature from (Configuration) to (Supplier<Configuration>), which is a source/binary breaking API change for any external callers. Consider re-introducing an overload loadServerAuthenticatorSuppliers(Configuration configuration) that delegates to the new supplier-based method (optionally deprecate it) to preserve compatibility.

Suggested change
public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticatorSuppliers(
public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticatorSuppliers(
Configuration configuration) {
return loadServerAuthenticatorSuppliers(() -> configuration);
}
/**
* Loads suppliers for server authenticators for each endpoint, based on listener-specific
* protocols.
*
* @param configurationSupplier The configuration supplier containing authentication settings
* and protocol definitions.
* @return A map mapping listener names to suppliers for their corresponding server
* authenticators.
*/
public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticatorSuppliers(

Copilot uses AI. Check for mistakes.
Configuration configuration) {
Supplier<Configuration> configurationSupplier) {
Configuration configuration = configurationSupplier.get();
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
Map<String, Supplier<ServerAuthenticator>> serverAuthenticators = new HashMap<>();
Map<String, String> protocolMap =
Expand All @@ -98,7 +99,9 @@ public static Map<String, Supplier<ServerAuthenticator>> loadServerAuthenticator

serverAuthenticators.put(
protocolEntry.getKey(),
() -> serverAuthenticatorPlugin.createServerAuthenticator(configuration));
() ->
serverAuthenticatorPlugin.createServerAuthenticator(
configurationSupplier.get()));
}
return serverAuthenticators;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ void testConflictingAuthenticationPlugin() {
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining(errorMsg);
assertThatThrownBy(
() -> AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration))
() ->
AuthenticationFactory.loadServerAuthenticatorSuppliers(
() -> configuration))
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining(errorMsg);
}
Expand All @@ -66,7 +68,9 @@ void testNoAuthenticationPlugin() {
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining(errorMsg);
assertThatThrownBy(
() -> AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration))
() ->
AuthenticationFactory.loadServerAuthenticatorSuppliers(
() -> configuration))
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining(errorMsg);
}
Expand All @@ -79,7 +83,7 @@ void testIdentifierCaseInsensitive() {
assertThat(AuthenticationFactory.loadClientAuthenticatorSupplier(configuration).get())
.isInstanceOf(TestIdentifierClientAuthenticator.class);
assertThat(
AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration)
AuthenticationFactory.loadServerAuthenticatorSuppliers(() -> configuration)
.values()
.stream()
.findAny()
Expand All @@ -93,7 +97,7 @@ void testIdentifierCaseInsensitive() {
assertThat(AuthenticationFactory.loadClientAuthenticatorSupplier(configuration2).get())
.isInstanceOf(TestIdentifierClientAuthenticator.class);
assertThat(
AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration)
AuthenticationFactory.loadServerAuthenticatorSuppliers(() -> configuration)
.values()
.stream()
.findAny()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.procedure;

import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.AlterConfigOpType;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.util.ArrayList;
import java.util.List;

/**
* Procedure to append values to list-type cluster configurations dynamically.
*
* <p>This procedure appends new values to existing list-type configurations. The APPEND operation
* only works on configurations defined as list types (e.g., {@code security.sasl.users}). The
* changes are:
*
* <ul>
* <li>Validated by the CoordinatorServer before persistence
* <li>Persisted in ZooKeeper for durability
* <li>Applied to all relevant servers (Coordinator and TabletServers)
* <li>Survive server restarts
* </ul>
*
* <p>Usage examples:
*
* <pre>
* -- Append a user to the SASL user list
* CALL sys.append_cluster_configs('security.sasl.users', 'bob:bob-secret');
*
* -- Append multiple key-value pairs at one time
* CALL sys.append_cluster_configs('security.sasl.users', 'bob:bob-secret', 'security.sasl.users', 'alice:alice-secret');
* </pre>
*
* <p><b>Note:</b> APPEND operations are only supported for list-type configuration keys. The server
* will reject the change if the configuration key is not a list type.
*/
public class AppendClusterConfigsProcedure extends ProcedureBase {

@ProcedureHint(
argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))},
isVarArgs = true)
public String[] call(ProcedureContext context, String... configPairs) throws Exception {
try {
if (configPairs.length == 0) {
throw new IllegalArgumentException(
"config_pairs cannot be null or empty. "
+ "Please specify valid configuration pairs.");
}

if (configPairs.length % 2 != 0) {
throw new IllegalArgumentException(
"config_pairs must be set in pairs. "
+ "Please specify valid configuration pairs.");
}

List<AlterConfig> configList = new ArrayList<>();
List<String> resultMessage = new ArrayList<>();

for (int i = 0; i < configPairs.length; i += 2) {
String configKey = configPairs[i].trim();
if (configKey.isEmpty()) {
throw new IllegalArgumentException(
"Config key cannot be null or empty. "
+ "Please specify a valid configuration key.");
}
String configValue = configPairs[i + 1];

AlterConfig alterConfig =
new AlterConfig(configKey, configValue, AlterConfigOpType.APPEND);
configList.add(alterConfig);
resultMessage.add(
String.format(
"Successfully appended '%s' to configuration '%s'. ",
configValue, configKey));
}

admin.alterClusterConfigs(configList).get();

return resultMessage.toArray(new String[0]);
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to append cluster config: %s", e.getMessage()), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ private enum ProcedureEnum {
SET_CLUSTER_CONFIGS("sys.set_cluster_configs", SetClusterConfigsProcedure.class),
GET_CLUSTER_CONFIGS("sys.get_cluster_configs", GetClusterConfigsProcedure.class),
RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class),
APPEND_CLUSTER_CONFIGS("sys.append_cluster_configs", AppendClusterConfigsProcedure.class),
SUBTRACT_CLUSTER_CONFIGS(
"sys.subtract_cluster_configs", SubtractClusterConfigsProcedure.class),
ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class),
REBALANCE("sys.rebalance", RebalanceProcedure.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.procedure;

import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.AlterConfigOpType;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.util.ArrayList;
import java.util.List;

/**
* Procedure to subtract (remove) values from list-type cluster configurations dynamically.
*
* <p>This procedure removes specific values from existing list-type configurations. The SUBTRACT
* operation only works on configurations defined as list types (e.g., {@code security.sasl.users}).
* If the list becomes empty after subtraction, the configuration key is removed entirely. The
* changes are:
*
* <ul>
* <li>Validated by the CoordinatorServer before persistence
* <li>Persisted in ZooKeeper for durability
* <li>Applied to all relevant servers (Coordinator and TabletServers)
* <li>Survive server restarts
* </ul>
*
* <p>Usage examples:
*
* <pre>
* -- Remove a user from the SASL user list
* CALL sys.subtract_cluster_configs('security.sasl.users', 'bob:bob-secret');
*
* -- Remove multiple key-value pairs at one time
* CALL sys.subtract_cluster_configs('security.sasl.users', 'bob:bob-secret', 'security.sasl.users', 'alice:alice-secret');
* </pre>
*
* <p><b>Note:</b> SUBTRACT operations are only supported for list-type configuration keys. The
* server will reject the change if the configuration key is not a list type. Subtracting a value
* that does not exist in the list is a no-op.
*/
public class SubtractClusterConfigsProcedure extends ProcedureBase {

@ProcedureHint(
argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))},
isVarArgs = true)
public String[] call(ProcedureContext context, String... configPairs) throws Exception {
try {
if (configPairs.length == 0) {
throw new IllegalArgumentException(
"config_pairs cannot be null or empty. "
+ "Please specify valid configuration pairs.");
}

if (configPairs.length % 2 != 0) {
throw new IllegalArgumentException(
"config_pairs must be set in pairs. "
+ "Please specify valid configuration pairs.");
}

List<AlterConfig> configList = new ArrayList<>();
List<String> resultMessage = new ArrayList<>();

for (int i = 0; i < configPairs.length; i += 2) {
String configKey = configPairs[i].trim();
if (configKey.isEmpty()) {
throw new IllegalArgumentException(
"Config key cannot be null or empty. "
+ "Please specify a valid configuration key.");
}
String configValue = configPairs[i + 1];

AlterConfig alterConfig =
new AlterConfig(configKey, configValue, AlterConfigOpType.SUBTRACT);
configList.add(alterConfig);
resultMessage.add(
String.format(
"Successfully subtracted '%s' from configuration '%s'. ",
configValue, configKey));
}

admin.alterClusterConfigs(configList).get();

return resultMessage.toArray(new String[0]);
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to subtract cluster config: %s", e.getMessage()), e);
}
}
}
Loading