From 4351e0a54e37be83c1483ed3de0308df55abb724 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 3 Apr 2026 01:08:36 +0100 Subject: [PATCH 1/2] [filesystem] Support AssumeRole STS for RustFS --- .../s3/token/S3DelegationTokenProvider.java | 62 +++++++++++++++---- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java index 33caaacddc..1d586b456a 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java @@ -22,17 +22,23 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; +import com.amazonaws.services.securitytoken.model.AssumeRoleResult; import com.amazonaws.services.securitytoken.model.Credentials; import com.amazonaws.services.securitytoken.model.GetSessionTokenResult; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -47,10 +53,15 @@ public class S3DelegationTokenProvider { private static final String REGION_KEY = "fs.s3a.region"; private static final String ENDPOINT_KEY = "fs.s3a.endpoint"; + private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn"; + private static final String STS_ENDPOINT_KEY = "fs.s3a.assumed.role.sts.endpoint"; + private final String scheme; private final String region; private final String accessKey; private final String secretKey; + @Nullable private final String roleArn; + @Nullable private final String stsEndpoint; private final Map additionInfos; public S3DelegationTokenProvider(String scheme, Configuration conf) { @@ -59,6 +70,8 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) { checkNotNull(region, "Region is not set."); this.accessKey = conf.get(ACCESS_KEY_ID); this.secretKey = conf.get(ACCESS_KEY_SECRET); + this.roleArn = conf.get(ROLE_ARN_KEY); + this.stsEndpoint = conf.get(STS_ENDPOINT_KEY); this.additionInfos = new HashMap<>(); for (String key : Arrays.asList(REGION_KEY, ENDPOINT_KEY)) { if (conf.get(key) != null) { @@ -68,17 +81,27 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) { } public ObtainedSecurityToken obtainSecurityToken() { - LOG.info("Obtaining session credentials token with access key: {}", accessKey); - - AWSSecurityTokenService stsClient = - AWSSecurityTokenServiceClientBuilder.standard() - .withRegion(region) - .withCredentials( - new AWSStaticCredentialsProvider( - new BasicAWSCredentials(accessKey, secretKey))) - .build(); - GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken(); - Credentials credentials = sessionTokenResult.getCredentials(); + AWSSecurityTokenService stsClient = buildStsClient(); + Credentials credentials; + + if (roleArn != null) { + LOG.info( + "Obtaining session credentials via AssumeRole with access key: {}, role: {}", + accessKey, + roleArn); + AssumeRoleRequest request = + new AssumeRoleRequest() + .withRoleArn(roleArn) + .withRoleSessionName("fluss-" + UUID.randomUUID()); + AssumeRoleResult result = stsClient.assumeRole(request); + credentials = result.getCredentials(); + } else { + LOG.info( + "Obtaining session credentials via GetSessionToken with access key: {}", + accessKey); + GetSessionTokenResult result = stsClient.getSessionToken(); + credentials = result.getCredentials(); + } LOG.info( "Session credentials obtained successfully with access key: {} expiration: {}", @@ -89,6 +112,23 @@ public ObtainedSecurityToken obtainSecurityToken() { scheme, toJson(credentials), credentials.getExpiration().getTime(), additionInfos); } + private AWSSecurityTokenService buildStsClient() { + AWSSecurityTokenServiceClientBuilder builder = + AWSSecurityTokenServiceClientBuilder.standard() + .withCredentials( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(accessKey, secretKey))); + + if (stsEndpoint != null) { + builder.withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(stsEndpoint, region)); + } else { + builder.withRegion(region); + } + + return builder.build(); + } + private byte[] toJson(Credentials credentials) { org.apache.fluss.fs.token.Credentials flussCredentials = new org.apache.fluss.fs.token.Credentials( From bae50269acd27b4f2d70b87c0472758c60137b93 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 4 Apr 2026 10:50:00 +0100 Subject: [PATCH 2/2] address comment --- .../s3/token/S3DelegationTokenProvider.java | 57 +++++++++++-------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java index 1d586b456a..74c178a34b 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java @@ -82,34 +82,41 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) { public ObtainedSecurityToken obtainSecurityToken() { AWSSecurityTokenService stsClient = buildStsClient(); - Credentials credentials; + try { + Credentials credentials; + + if (roleArn != null) { + LOG.info( + "Obtaining session credentials via AssumeRole with access key: {}, role: {}", + accessKey, + roleArn); + AssumeRoleRequest request = + new AssumeRoleRequest() + .withRoleArn(roleArn) + .withRoleSessionName("fluss-" + UUID.randomUUID()); + AssumeRoleResult result = stsClient.assumeRole(request); + credentials = result.getCredentials(); + } else { + LOG.info( + "Obtaining session credentials via GetSessionToken with access key: {}", + accessKey); + GetSessionTokenResult result = stsClient.getSessionToken(); + credentials = result.getCredentials(); + } - if (roleArn != null) { - LOG.info( - "Obtaining session credentials via AssumeRole with access key: {}, role: {}", - accessKey, - roleArn); - AssumeRoleRequest request = - new AssumeRoleRequest() - .withRoleArn(roleArn) - .withRoleSessionName("fluss-" + UUID.randomUUID()); - AssumeRoleResult result = stsClient.assumeRole(request); - credentials = result.getCredentials(); - } else { LOG.info( - "Obtaining session credentials via GetSessionToken with access key: {}", - accessKey); - GetSessionTokenResult result = stsClient.getSessionToken(); - credentials = result.getCredentials(); + "Session credentials obtained successfully with access key: {} expiration: {}", + credentials.getAccessKeyId(), + credentials.getExpiration()); + + return new ObtainedSecurityToken( + scheme, + toJson(credentials), + credentials.getExpiration().getTime(), + additionInfos); + } finally { + stsClient.shutdown(); } - - LOG.info( - "Session credentials obtained successfully with access key: {} expiration: {}", - credentials.getAccessKeyId(), - credentials.getExpiration()); - - return new ObtainedSecurityToken( - scheme, toJson(credentials), credentials.getExpiration().getTime(), additionInfos); } private AWSSecurityTokenService buildStsClient() {