diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 8da2246b0a..6b4b3d274b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -476,6 +476,16 @@ public class ConfigOptions { + "Each listener can be associated with a specific authentication protocol. " + "Listeners not included in the map will use PLAINTEXT by default, which does not require authentication."); + public static final ConfigOption> SERVER_SASL_USERS = + key("security.sasl.users") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "List of user credentials for SASL/PLAIN authentication in 'username:password' format. " + + "For example: 'admin:admin-secret,bob:bob-secret'. " + + "This is syntactic sugar that auto-generates the JAAS config string."); + public static final ConfigOption TABLET_SERVER_ID = key("tablet-server.id") .intType() diff --git a/fluss-common/src/main/java/org/apache/fluss/security/auth/AuthenticationFactory.java b/fluss-common/src/main/java/org/apache/fluss/security/auth/AuthenticationFactory.java index 65cbfff9c5..dc49936894 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/auth/AuthenticationFactory.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/auth/AuthenticationFactory.java @@ -83,7 +83,8 @@ public static Supplier loadClientAuthenticatorSupplier( * authenticators. */ public static Map> loadServerAuthenticatorSuppliers( - Configuration configuration) { + Supplier configurationSupplier) { + Configuration configuration = configurationSupplier.get(); PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); Map> serverAuthenticators = new HashMap<>(); Map protocolMap = @@ -98,7 +99,9 @@ public static Map> loadServerAuthenticator serverAuthenticators.put( protocolEntry.getKey(), - () -> serverAuthenticatorPlugin.createServerAuthenticator(configuration)); + () -> + serverAuthenticatorPlugin.createServerAuthenticator( + configurationSupplier.get())); } return serverAuthenticators; } diff --git a/fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java b/fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java index c71edeca0d..d313a58c6a 100644 --- a/fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/security/auth/AuthenticationFactoryTest.java @@ -49,7 +49,9 @@ void testConflictingAuthenticationPlugin() { .isExactlyInstanceOf(ValidationException.class) .hasMessageContaining(errorMsg); assertThatThrownBy( - () -> AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration)) + () -> + AuthenticationFactory.loadServerAuthenticatorSuppliers( + () -> configuration)) .isExactlyInstanceOf(ValidationException.class) .hasMessageContaining(errorMsg); } @@ -66,7 +68,9 @@ void testNoAuthenticationPlugin() { .isExactlyInstanceOf(ValidationException.class) .hasMessageContaining(errorMsg); assertThatThrownBy( - () -> AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration)) + () -> + AuthenticationFactory.loadServerAuthenticatorSuppliers( + () -> configuration)) .isExactlyInstanceOf(ValidationException.class) .hasMessageContaining(errorMsg); } @@ -79,7 +83,7 @@ void testIdentifierCaseInsensitive() { assertThat(AuthenticationFactory.loadClientAuthenticatorSupplier(configuration).get()) .isInstanceOf(TestIdentifierClientAuthenticator.class); assertThat( - AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration) + AuthenticationFactory.loadServerAuthenticatorSuppliers(() -> configuration) .values() .stream() .findAny() @@ -93,7 +97,7 @@ void testIdentifierCaseInsensitive() { assertThat(AuthenticationFactory.loadClientAuthenticatorSupplier(configuration2).get()) .isInstanceOf(TestIdentifierClientAuthenticator.class); assertThat( - AuthenticationFactory.loadServerAuthenticatorSuppliers(configuration) + AuthenticationFactory.loadServerAuthenticatorSuppliers(() -> configuration) .values() .stream() .findAny() diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/AppendClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/AppendClusterConfigsProcedure.java new file mode 100644 index 0000000000..a7d410a685 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/AppendClusterConfigsProcedure.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.AlterConfig; +import org.apache.fluss.config.cluster.AlterConfigOpType; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * Procedure to append values to list-type cluster configurations dynamically. + * + *

This procedure appends new values to existing list-type configurations. The APPEND operation + * only works on configurations defined as list types (e.g., {@code security.sasl.users}). The + * changes are: + * + *

    + *
  • Validated by the CoordinatorServer before persistence + *
  • Persisted in ZooKeeper for durability + *
  • Applied to all relevant servers (Coordinator and TabletServers) + *
  • Survive server restarts + *
+ * + *

Usage examples: + * + *

+ * -- Append a user to the SASL user list
+ * CALL sys.append_cluster_configs('security.sasl.users', 'bob:bob-secret');
+ *
+ * -- Append multiple key-value pairs at one time
+ * CALL sys.append_cluster_configs('security.sasl.users', 'bob:bob-secret', 'security.sasl.users', 'alice:alice-secret');
+ * 
+ * + *

Note: APPEND operations are only supported for list-type configuration keys. The server + * will reject the change if the configuration key is not a list type. + */ +public class AppendClusterConfigsProcedure extends ProcedureBase { + + @ProcedureHint( + argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))}, + isVarArgs = true) + public String[] call(ProcedureContext context, String... configPairs) throws Exception { + try { + if (configPairs.length == 0) { + throw new IllegalArgumentException( + "config_pairs cannot be null or empty. " + + "Please specify valid configuration pairs."); + } + + if (configPairs.length % 2 != 0) { + throw new IllegalArgumentException( + "config_pairs must be set in pairs. " + + "Please specify valid configuration pairs."); + } + + List configList = new ArrayList<>(); + List resultMessage = new ArrayList<>(); + + for (int i = 0; i < configPairs.length; i += 2) { + String configKey = configPairs[i].trim(); + if (configKey.isEmpty()) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify a valid configuration key."); + } + String configValue = configPairs[i + 1]; + + AlterConfig alterConfig = + new AlterConfig(configKey, configValue, AlterConfigOpType.APPEND); + configList.add(alterConfig); + resultMessage.add( + String.format( + "Successfully appended '%s' to configuration '%s'. ", + configValue, configKey)); + } + + admin.alterClusterConfigs(configList).get(); + + return resultMessage.toArray(new String[0]); + } catch (IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to append cluster config: %s", e.getMessage()), e); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java index fc2632e856..654f707c2e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java @@ -73,6 +73,9 @@ private enum ProcedureEnum { SET_CLUSTER_CONFIGS("sys.set_cluster_configs", SetClusterConfigsProcedure.class), GET_CLUSTER_CONFIGS("sys.get_cluster_configs", GetClusterConfigsProcedure.class), RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class), + APPEND_CLUSTER_CONFIGS("sys.append_cluster_configs", AppendClusterConfigsProcedure.class), + SUBTRACT_CLUSTER_CONFIGS( + "sys.subtract_cluster_configs", SubtractClusterConfigsProcedure.class), ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class), REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class), REBALANCE("sys.rebalance", RebalanceProcedure.class), diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SubtractClusterConfigsProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SubtractClusterConfigsProcedure.java new file mode 100644 index 0000000000..5434013a27 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SubtractClusterConfigsProcedure.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.AlterConfig; +import org.apache.fluss.config.cluster.AlterConfigOpType; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.ArrayList; +import java.util.List; + +/** + * Procedure to subtract (remove) values from list-type cluster configurations dynamically. + * + *

This procedure removes specific values from existing list-type configurations. The SUBTRACT + * operation only works on configurations defined as list types (e.g., {@code security.sasl.users}). + * If the list becomes empty after subtraction, the configuration key is removed entirely. The + * changes are: + * + *

    + *
  • Validated by the CoordinatorServer before persistence + *
  • Persisted in ZooKeeper for durability + *
  • Applied to all relevant servers (Coordinator and TabletServers) + *
  • Survive server restarts + *
+ * + *

Usage examples: + * + *

+ * -- Remove a user from the SASL user list
+ * CALL sys.subtract_cluster_configs('security.sasl.users', 'bob:bob-secret');
+ *
+ * -- Remove multiple key-value pairs at one time
+ * CALL sys.subtract_cluster_configs('security.sasl.users', 'bob:bob-secret', 'security.sasl.users', 'alice:alice-secret');
+ * 
+ * + *

Note: SUBTRACT operations are only supported for list-type configuration keys. The + * server will reject the change if the configuration key is not a list type. Subtracting a value + * that does not exist in the list is a no-op. + */ +public class SubtractClusterConfigsProcedure extends ProcedureBase { + + @ProcedureHint( + argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))}, + isVarArgs = true) + public String[] call(ProcedureContext context, String... configPairs) throws Exception { + try { + if (configPairs.length == 0) { + throw new IllegalArgumentException( + "config_pairs cannot be null or empty. " + + "Please specify valid configuration pairs."); + } + + if (configPairs.length % 2 != 0) { + throw new IllegalArgumentException( + "config_pairs must be set in pairs. " + + "Please specify valid configuration pairs."); + } + + List configList = new ArrayList<>(); + List resultMessage = new ArrayList<>(); + + for (int i = 0; i < configPairs.length; i += 2) { + String configKey = configPairs[i].trim(); + if (configKey.isEmpty()) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify a valid configuration key."); + } + String configValue = configPairs[i + 1]; + + AlterConfig alterConfig = + new AlterConfig(configKey, configValue, AlterConfigOpType.SUBTRACT); + configList.add(alterConfig); + resultMessage.add( + String.format( + "Successfully subtracted '%s' from configuration '%s'. ", + configValue, configKey)); + } + + admin.alterClusterConfigs(configList).get(); + + return resultMessage.toArray(new String[0]); + } catch (IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to subtract cluster config: %s", e.getMessage()), e); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index 8b2817e32f..f13edb8126 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -139,6 +139,8 @@ void testShowProcedures() throws Exception { "+I[sys.list_acl]", "+I[sys.set_cluster_configs]", "+I[sys.reset_cluster_configs]", + "+I[sys.append_cluster_configs]", + "+I[sys.subtract_cluster_configs]", "+I[sys.add_server_tag]", "+I[sys.remove_server_tag]", "+I[sys.rebalance]", @@ -783,6 +785,119 @@ void testListRebalanceProgress() throws Exception { }); } + @Test + void testAddAndDeleteUser() throws Exception { + String bobBootstrapServers = + String.join( + ",", + FLUSS_CLUSTER_EXTENSION + .getClientConfig("CLIENT") + .get(ConfigOptions.BOOTSTRAP_SERVERS)); + String bobCatalog = "bob_catalog"; + String createCatalogDDL = + String.format( + "create catalog %s with (" + + "'type' = 'fluss', " + + "'bootstrap.servers' = '%s', " + + "'client.security.protocol' = 'sasl', " + + "'client.security.sasl.mechanism' = 'PLAIN', " + + "'client.security.sasl.username' = 'bob', " + + "'client.security.sasl.password' = 'bob_pass'" + + ")", + bobCatalog, bobBootstrapServers); + + assertThatThrownBy(() -> tEnv.executeSql(createCatalogDDL).await()) + .hasMessageContaining("Invalid username or password"); + + // Step 1: Add user "bob" via append_cluster_configs + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.append_cluster_configs('%s', 'bob:bob_pass')", + CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(1); + assertThat(results.get(0).getField(0)) + .asString() + .contains("Successfully appended") + .contains(ConfigOptions.SERVER_SASL_USERS.key()); + } + + // Verify user "bob" was added + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.get_cluster_configs('%s')", + CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(1); + assertThat((String) results.get(0).getField(1)).contains("bob:bob_pass"); + } + + // Verify "bob" can authenticate by creating a catalog with bob's credentials + tEnv.executeSql(createCatalogDDL).await(); + + // Grant bob DESCRIBE permission on cluster so bob can query configs + tEnv.executeSql( + String.format( + "Call %s.sys.add_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')", + CATALOG_NAME)) + .await(); + + // Bob should be able to get cluster configs + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.get_cluster_configs('%s')", + bobCatalog, ConfigOptions.SERVER_SASL_USERS.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(1); + } + tEnv.executeSql("drop catalog " + bobCatalog); + + // Step 2: Delete user "bob" via subtract_cluster_configs + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.subtract_cluster_configs('%s', 'bob:bob_pass')", + CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(1); + assertThat(results.get(0).getField(0)) + .asString() + .contains("Successfully subtracted") + .contains(ConfigOptions.SERVER_SASL_USERS.key()); + } + + // Verify "bob" was deleted from config + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.get_cluster_configs('%s')", + CATALOG_NAME, ConfigOptions.SERVER_SASL_USERS.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + // After subtracting the only dynamically-added entry, the config may be empty + if (!results.isEmpty()) { + assertThat((String) results.get(0).getField(1)).doesNotContain("bob"); + } + } + + // Verify "bob" can no longer authenticate + assertThatThrownBy(() -> tEnv.executeSql(createCatalogDDL).await()) + .hasMessageContaining("Invalid username or password"); + // Cleanup: remove bob's ACL + tEnv.executeSql( + String.format( + "Call %s.sys.drop_acl('CLUSTER', 'ALLOW', 'User:bob', 'DESCRIBE', '*')", + CATALOG_NAME)) + .await(); + } + @Test void testDropKvSnapshotLeaseProcedure() throws Exception { tEnv.executeSql( diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussProtocolPlugin.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussProtocolPlugin.java index 3754b98f98..77df1c4b19 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussProtocolPlugin.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussProtocolPlugin.java @@ -21,6 +21,8 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.cluster.ServerReconfigurable; +import org.apache.fluss.exception.ConfigException; import org.apache.fluss.rpc.RpcGatewayService; import org.apache.fluss.rpc.protocol.ApiManager; import org.apache.fluss.rpc.protocol.NetworkProtocolPlugin; @@ -28,15 +30,21 @@ import org.apache.fluss.security.auth.PlainTextAuthenticationPlugin; import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandler; +import javax.annotation.Nullable; + +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; + +import static org.apache.fluss.utils.Preconditions.checkArgument; /** Build-in protocol plugin for Fluss. */ -public class FlussProtocolPlugin implements NetworkProtocolPlugin { +public class FlussProtocolPlugin implements NetworkProtocolPlugin, ServerReconfigurable { private final ApiManager apiManager; private final List listeners; private final RequestsMetrics requestsMetrics; - private Configuration conf; + private volatile Configuration conf; public FlussProtocolPlugin( ServerType serverType, List listeners, RequestsMetrics requestsMetrics) { @@ -52,7 +60,7 @@ public String name() { @Override public void setup(Configuration conf) { - this.conf = conf; + this.conf = enrichWithJaasConfig(conf); } @Override @@ -71,7 +79,8 @@ public ChannelHandler createChannelHandler( requestsMetrics, conf.get(ConfigOptions.NETTY_CONNECTION_MAX_IDLE_TIME).getSeconds(), Optional.ofNullable( - AuthenticationFactory.loadServerAuthenticatorSuppliers(conf) + AuthenticationFactory.loadServerAuthenticatorSuppliers( + () -> this.conf) .get(listenerName)) .orElse(PlainTextAuthenticationPlugin.PlainTextServerAuthenticator::new)); } @@ -81,8 +90,73 @@ public RequestHandler createRequestHandler(RpcGatewayService service) { return new FlussRequestHandler(service); } + // --- ServerReconfigurable --- + + @Override + public void validate(Configuration newConfig) throws ConfigException { + List users = newConfig.get(ConfigOptions.SERVER_SASL_USERS); + if (users == null) { + return; + } + Set uniqueUsernames = new HashSet<>(); + for (int i = 0; i < users.size(); i++) { + String entry = users.get(i).trim(); + int colonIdx = entry.indexOf(':'); + if (colonIdx <= 0 || colonIdx == entry.length() - 1) { + throw new ConfigException( + String.format( + "security.sasl.users[%d] must be in 'username:password' format, but got '%s'.", + i, entry)); + } + String username = entry.substring(0, colonIdx); + if (!uniqueUsernames.add(username)) { + throw new ConfigException( + "security.sasl.users must not contain duplicate usernames: '" + + username + + "'."); + } + } + } + + @Override + public void reconfigure(Configuration newConfig) throws ConfigException { + this.conf = enrichWithJaasConfig(newConfig); + } + + /** + * Enriches the given configuration with a generated JAAS config string derived from the + * security.sasl.users list (format: 'username:password'). If the list is not present, the + * original configuration is returned unchanged. + */ + private static Configuration enrichWithJaasConfig(Configuration config) { + List users = config.get(ConfigOptions.SERVER_SASL_USERS); + if (users == null) { + return config; + } + StringBuilder sb = + new StringBuilder( + "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required"); + for (String entry : users) { + int colonIdx = entry.indexOf(':'); + checkArgument(colonIdx > 0, "Invalid user entry format: '%s'", entry); + String username = entry.substring(0, colonIdx); + String password = entry.substring(colonIdx + 1); + sb.append(String.format(" user_%s=\"%s\"", username, password)); + } + sb.append(";"); + Configuration enriched = new Configuration(config); + enriched.setString("security.sasl.plain.jaas.config", sb.toString()); + return enriched; + } + @VisibleForTesting ApiManager getApiManager() { return apiManager; } + + @VisibleForTesting + @Nullable + Configuration getConf() { + return conf; + } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServer.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServer.java index 7f2198566c..6b0b7d5cc9 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServer.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServer.java @@ -272,6 +272,16 @@ private static NetworkProtocolPlugin loadProtocolPlugin(String protocolName) { protocolName, protocols.keySet())); } + /** Returns the FlussProtocolPlugin instance used by this server. */ + public FlussProtocolPlugin getFlussProtocolPlugin() { + for (NetworkProtocolPlugin protocol : protocols) { + if (protocol instanceof FlussProtocolPlugin) { + return (FlussProtocolPlugin) protocol; + } + } + throw new IllegalStateException("FlussProtocolPlugin not found in loaded protocols."); + } + @Override public ScheduledExecutorService getScheduledExecutor() { checkState(isRunning, "Netty server has not been started yet."); diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java index e96fa283ab..7ab833746a 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/AuthenticationTest.java @@ -223,11 +223,7 @@ private void buildNettyServer() throws Exception { configuration.setString( ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT1:mutual,CLIENT2:sasl"); configuration.setString("security.sasl.enabled.mechanisms", "plain"); - configuration.setString( - "security.sasl.plain.jaas.config", - "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required " - + " user_root=\"password\" " - + " user_guest=\"password2\";"); + configuration.setString("security.sasl.users", "root:password,guest:password2"); configuration.set(ConfigOptions.SUPER_USERS, "User:root"); configuration.set(ConfigOptions.AUTHORIZER_ENABLED, true); // 3 worker threads is enough for this test diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java index 92e837e18b..35702ed416 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/authenticate/SaslAuthenticationITCase.java @@ -30,6 +30,7 @@ import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; import org.apache.fluss.rpc.netty.client.NettyClient; +import org.apache.fluss.rpc.netty.server.FlussProtocolPlugin; import org.apache.fluss.rpc.netty.server.NettyServer; import org.apache.fluss.rpc.netty.server.RequestsMetrics; import org.apache.fluss.rpc.protocol.ApiKeys; @@ -180,6 +181,100 @@ void testSimplifyUsernameAndPassword() throws Exception { testAuthentication(clientConfig); } + @Test + void testAddAndDeleteUser() throws Exception { + // Start a server with username/password list config (admin and alice) + Configuration serverConfig = new Configuration(); + serverConfig.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl"); + serverConfig.setString("security.sasl.enabled.mechanisms", "plain"); + serverConfig.setString("security.sasl.users", "admin:admin-secret,alice:alice-secret"); + serverConfig.setString(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS.key(), "3"); + + MetricGroup metricGroup = NOPMetricsGroup.newInstance(); + TestingAuthenticateGatewayService service = new TestingAuthenticateGatewayService(); + try (NetUtils.Port port = getAvailablePort(); + NettyServer nettyServer = + new NettyServer( + serverConfig, + Collections.singletonList( + new Endpoint("localhost", port.getPort(), "CLIENT")), + service, + metricGroup, + RequestsMetrics.createCoordinatorServerRequestMetrics( + metricGroup))) { + nettyServer.start(); + ServerNode serverNode = + new ServerNode(1, "localhost", port.getPort(), ServerType.TABLET_SERVER); + FlussProtocolPlugin plugin = nettyServer.getFlussProtocolPlugin(); + + // Verify "admin" can authenticate + try (NettyClient client = createSaslClient("admin", "admin-secret")) { + verifyListTables(client, serverNode); + } + + // Verify "bob" cannot authenticate initially + try (NettyClient client = createSaslClient("bob", "bob-secret")) { + assertThatThrownBy(() -> verifyListTables(client, serverNode)) + .cause() + .isExactlyInstanceOf(AuthenticationException.class) + .hasMessageContaining("Invalid username or password"); + } + + // Add user "bob" via reconfigure + Configuration addBobConfig = new Configuration(); + addBobConfig.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl"); + addBobConfig.setString("security.sasl.enabled.mechanisms", "plain"); + addBobConfig.setString( + "security.sasl.users", "admin:admin-secret,alice:alice-secret,bob:bob-secret"); + plugin.validate(addBobConfig); + plugin.reconfigure(addBobConfig); + + // Verify "bob" can now authenticate + try (NettyClient client = createSaslClient("bob", "bob-secret")) { + verifyListTables(client, serverNode); + } + + // Delete user "admin" via reconfigure + Configuration removeAdminConfig = new Configuration(); + removeAdminConfig.setString( + ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl"); + removeAdminConfig.setString("security.sasl.enabled.mechanisms", "plain"); + removeAdminConfig.setString("security.sasl.users", "alice:alice-secret,bob:bob-secret"); + plugin.validate(removeAdminConfig); + plugin.reconfigure(removeAdminConfig); + + // Verify "admin" can no longer authenticate + try (NettyClient client = createSaslClient("admin", "admin-secret")) { + assertThatThrownBy(() -> verifyListTables(client, serverNode)) + .cause() + .isExactlyInstanceOf(AuthenticationException.class) + .hasMessageContaining("Invalid username or password"); + } + + // Verify "bob" still works + try (NettyClient client = createSaslClient("bob", "bob-secret")) { + verifyListTables(client, serverNode); + } + } + } + + private NettyClient createSaslClient(String username, String password) { + Configuration clientConfig = new Configuration(); + clientConfig.setString("client.security.protocol", "sasl"); + clientConfig.setString("client.security.sasl.mechanism", "plain"); + clientConfig.setString("client.security.sasl.username", username); + clientConfig.setString("client.security.sasl.password", password); + return new NettyClient(clientConfig, TestingClientMetricGroup.newInstance()); + } + + private void verifyListTables(NettyClient client, ServerNode serverNode) throws Exception { + ListTablesRequest request = new ListTablesRequest().setDatabaseName("test-database"); + ListTablesResponse response = + (ListTablesResponse) + client.sendRequest(serverNode, ApiKeys.LIST_TABLES, request).get(); + assertThat(response.getTableNamesList()).isEqualTo(Collections.singletonList("test-table")); + } + private void testAuthentication(Configuration clientConfig) throws Exception { testAuthentication(clientConfig, getDefaultServerConfig()); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java index a6b678c2ef..ab23e2d586 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java @@ -18,6 +18,8 @@ package org.apache.fluss.server; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOption; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.ConfigEntry; @@ -34,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -145,6 +148,30 @@ private void prepareIncrementalConfigs( case DELETE: configsProps.remove(configPropName); break; + case APPEND: + validateListType(configPropName); + String existingAppend = configsProps.getOrDefault(configPropName, ""); + if (existingAppend.isEmpty()) { + configsProps.put(configPropName, configPropValue); + } else { + configsProps.put( + configPropName, existingAppend + "," + configPropValue); + } + break; + case SUBTRACT: + validateListType(configPropName); + String existingSubtract = configsProps.get(configPropName); + if (existingSubtract != null) { + List items = + new ArrayList<>(Arrays.asList(existingSubtract.split(","))); + items.remove(configPropValue); + if (items.isEmpty()) { + configsProps.remove(configPropName); + } else { + configsProps.put(configPropName, String.join(",", items)); + } + } + break; default: throw new ConfigException( "Unsupported config operation type " + alterConfigOp.opType()); @@ -160,6 +187,17 @@ protected void alterServerConfigs(Map configsProps) throws Excep zooKeeperClient.upsertServerEntityConfig(configsProps); } + private static void validateListType(String configKey) { + ConfigOption configOption = ConfigOptions.getConfigOption(configKey); + if (configOption == null || !configOption.isList()) { + throw new ConfigException( + String.format( + "APPEND/SUBTRACT operations are only supported for list-typed config keys, " + + "but '%s' is not a list type.", + configKey)); + } + } + private class ConfigChangedNotificationHandler implements ZkNodeChangeNotificationWatcher.NotificationHandler { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java index d4430bb50a..e37ca3ee6c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java @@ -45,6 +45,7 @@ import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC; import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL; import static org.apache.fluss.config.ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER; +import static org.apache.fluss.config.ConfigOptions.SERVER_SASL_USERS; import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; @@ -64,7 +65,8 @@ class DynamicServerConfig { DATALAKE_FORMAT.key(), LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(), KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), - KV_SNAPSHOT_INTERVAL.key())); + KV_SNAPSHOT_INTERVAL.key(), + SERVER_SASL_USERS.key())); private static final Set ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake."); private final ReadWriteLock lock = new ReentrantReadWriteLock(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 25a0d866fe..db3912fed1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -27,6 +27,7 @@ import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.RpcServer; import org.apache.fluss.rpc.metrics.ClientMetricGroup; +import org.apache.fluss.rpc.netty.server.NettyServer; import org.apache.fluss.rpc.netty.server.RequestsMetrics; import org.apache.fluss.server.DynamicConfigManager; import org.apache.fluss.server.ServerBase; @@ -285,6 +286,8 @@ protected void initCoordinatorStandby() throws Exception { serverMetricGroup, RequestsMetrics.createCoordinatorServerRequestMetrics( serverMetricGroup)); + // Register FlussProtocolPlugin for dynamic SASL config updates + dynamicConfigManager.register(((NettyServer) rpcServer).getFlussProtocolPlugin()); rpcServer.start(); registerCoordinatorServer(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 26f45d23c9..cc2feac05a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -32,6 +32,7 @@ import org.apache.fluss.rpc.messages.ControlledShutdownRequest; import org.apache.fluss.rpc.messages.ControlledShutdownResponse; import org.apache.fluss.rpc.metrics.ClientMetricGroup; +import org.apache.fluss.rpc.netty.server.NettyServer; import org.apache.fluss.rpc.netty.server.RequestsMetrics; import org.apache.fluss.server.DynamicConfigManager; import org.apache.fluss.server.ServerBase; @@ -301,6 +302,8 @@ protected void startServices() throws Exception { tabletService, tabletServerMetricGroup, requestsMetrics); + // Register FlussProtocolPlugin for dynamic SASL config updates + dynamicConfigManager.register(((NettyServer) rpcServer).getFlussProtocolPlugin()); rpcServer.start(); registerTabletServer(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java index b20f39bc59..03ce47e95b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -41,6 +41,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -528,4 +529,109 @@ public void reconfigure(Configuration newConfig) { // Verify the reconfigurable was notified with the new value assertThat(reconfiguredValue.get()).isEqualTo(2); } + + @Test + void testAppendAndSubtractOnListConfig() throws Exception { + Configuration configuration = new Configuration(); + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + + AtomicReference> reconfiguredUsers = new AtomicReference<>(); + dynamicConfigManager.register( + new ServerReconfigurable() { + @Override + public void validate(Configuration newConfig) throws ConfigException {} + + @Override + public void reconfigure(Configuration newConfig) { + reconfiguredUsers.set(newConfig.get(ConfigOptions.SERVER_SASL_USERS)); + } + }); + dynamicConfigManager.startup(); + + // APPEND first user + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.SERVER_SASL_USERS.key(), + "admin:admin-secret", + AlterConfigOpType.APPEND))); + + Map zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.get(ConfigOptions.SERVER_SASL_USERS.key())) + .isEqualTo("admin:admin-secret"); + assertThat(reconfiguredUsers.get()).containsExactly("admin:admin-secret"); + + // APPEND second user + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.SERVER_SASL_USERS.key(), + "bob:bob-secret", + AlterConfigOpType.APPEND))); + + zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.get(ConfigOptions.SERVER_SASL_USERS.key())) + .isEqualTo("admin:admin-secret,bob:bob-secret"); + assertThat(reconfiguredUsers.get()).containsExactly("admin:admin-secret", "bob:bob-secret"); + + // SUBTRACT first user + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.SERVER_SASL_USERS.key(), + "admin:admin-secret", + AlterConfigOpType.SUBTRACT))); + + zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.get(ConfigOptions.SERVER_SASL_USERS.key())).isEqualTo("bob:bob-secret"); + assertThat(reconfiguredUsers.get()).containsExactly("bob:bob-secret"); + + // SUBTRACT last user - should remove key entirely + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.SERVER_SASL_USERS.key(), + "bob:bob-secret", + AlterConfigOpType.SUBTRACT))); + + zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.containsKey(ConfigOptions.SERVER_SASL_USERS.key())).isFalse(); + assertThat(reconfiguredUsers.get()).isNull(); + } + + @Test + void testAppendOnNonListConfigIsRejected() throws Exception { + Configuration configuration = new Configuration(); + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + dynamicConfigManager.startup(); + + // APPEND on a non-list config (kv.snapshot.interval is Duration type) + // should be rejected immediately by the list-type check. + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.KV_SNAPSHOT_INTERVAL.key(), + "5min", + AlterConfigOpType.APPEND)))) + .isInstanceOf(ConfigException.class) + .hasMessageContaining( + "APPEND/SUBTRACT operations are only supported for list-typed config keys"); + + // SUBTRACT on a non-list config should also be rejected. + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.KV_SNAPSHOT_INTERVAL.key(), + "5min", + AlterConfigOpType.SUBTRACT)))) + .isInstanceOf(ConfigException.class) + .hasMessageContaining( + "APPEND/SUBTRACT operations are only supported for list-typed config keys"); + } }