diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java index 517a48669e688..4e913dd4100e1 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapTools.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.RpcOptions; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.util.NetUtils; @@ -32,7 +33,9 @@ import java.io.IOException; import java.net.BindException; import java.util.Iterator; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; /** Tools for starting the Actor Systems used to run the JobManager and TaskManager actors. */ public class ActorSystemBootstrapTools { @@ -241,6 +244,21 @@ public static ActorSystem startLocalActorSystem( } } + /** + * Converts the given Pekko {@link Config} into a flattened {@link Map}. + * + * @param config The Pekko configuration + * @return A map of configuration keys to string values + */ + static Map toMaskedMap(Config config) { + return ConfigurationUtils.hideSensitiveValues( + config.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> String.valueOf(entry.getValue().unwrapped())))); + } + /** * Starts an Actor System with given Pekko config. * @@ -251,7 +269,9 @@ public static ActorSystem startLocalActorSystem( */ private static ActorSystem startActorSystem( Config config, String actorSystemName, Logger logger) { - logger.debug("Using pekko configuration\n {}", config); + if (logger.isDebugEnabled()) { + logger.debug("Using pekko configuration\n {}", toMaskedMap(config)); + } ActorSystem actorSystem = PekkoUtils.createActorSystem(actorSystemName, config); logger.info("Actor system started at {}", PekkoUtils.getAddress(actorSystem)); diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java index 5343c4011cf1c..1ebecb27fb925 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/pekko/ActorSystemBootstrapToolsTest.java @@ -18,11 +18,14 @@ package org.apache.flink.runtime.rpc.pekko; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.CheckedSupplier; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.apache.pekko.actor.ActorSystem; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -32,6 +35,7 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; @@ -40,6 +44,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the {@link ActorSystemBootstrapTools}. */ @@ -111,4 +116,35 @@ void testActorSystemInstantiationFailureWhenPortOccupied() throws Exception { portOccupier.close(); } } + + @Test + void testToMaskedMapMasksOnlySensitiveKeys() { + Config config = + ConfigFactory.parseMap( + Map.of( + "pekko.loglevel", "OFF", + "pekko.remote.artery.enabled", "false", + "pekko.remote.classic.netty.ssl.security.key-password", "secret", + "pekko.remote.classic.netty.ssl.security.key-store-password", + "secret2", + "pekko.remote.classic.netty.ssl.security.trust-store-password", + "secret3")); + + Map result = ActorSystemBootstrapTools.toMaskedMap(config); + + // Non-sensitive values should remain the same + assertThat(result.get("pekko.loglevel")).isEqualTo("OFF"); + assertThat(result.get("pekko.remote.artery.enabled")).isEqualTo("false"); + + // Sensitive values should be masked + assertThat(result.get("pekko.remote.classic.netty.ssl.security.key-password")) + .isNotEqualTo("secret") + .isEqualTo(GlobalConfiguration.HIDDEN_CONTENT); + assertThat(result.get("pekko.remote.classic.netty.ssl.security.key-store-password")) + .isNotEqualTo("secret2") + .isEqualTo(GlobalConfiguration.HIDDEN_CONTENT); + assertThat(result.get("pekko.remote.classic.netty.ssl.security.trust-store-password")) + .isNotEqualTo("secret3") + .isEqualTo(GlobalConfiguration.HIDDEN_CONTENT); + } }