diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java index 33caaacddc..74c178a34b 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java @@ -22,17 +22,23 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; +import com.amazonaws.services.securitytoken.model.AssumeRoleResult; import com.amazonaws.services.securitytoken.model.Credentials; import com.amazonaws.services.securitytoken.model.GetSessionTokenResult; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -47,10 +53,15 @@ public class S3DelegationTokenProvider { private static final String REGION_KEY = "fs.s3a.region"; private static final String ENDPOINT_KEY = "fs.s3a.endpoint"; + private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn"; + private static final String STS_ENDPOINT_KEY = "fs.s3a.assumed.role.sts.endpoint"; + private final String scheme; private final String region; private final String accessKey; private final String secretKey; + @Nullable private final String roleArn; + @Nullable private final String stsEndpoint; private final Map additionInfos; public S3DelegationTokenProvider(String scheme, Configuration conf) { @@ -59,6 +70,8 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) { checkNotNull(region, "Region is not set."); this.accessKey = conf.get(ACCESS_KEY_ID); this.secretKey = conf.get(ACCESS_KEY_SECRET); + this.roleArn = conf.get(ROLE_ARN_KEY); + this.stsEndpoint = conf.get(STS_ENDPOINT_KEY); this.additionInfos = new HashMap<>(); for (String key : Arrays.asList(REGION_KEY, ENDPOINT_KEY)) { if (conf.get(key) != null) { @@ -68,25 +81,59 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) { } public ObtainedSecurityToken obtainSecurityToken() { - LOG.info("Obtaining session credentials token with access key: {}", accessKey); + AWSSecurityTokenService stsClient = buildStsClient(); + try { + Credentials credentials; + + if (roleArn != null) { + LOG.info( + "Obtaining session credentials via AssumeRole with access key: {}, role: {}", + accessKey, + roleArn); + AssumeRoleRequest request = + new AssumeRoleRequest() + .withRoleArn(roleArn) + .withRoleSessionName("fluss-" + UUID.randomUUID()); + AssumeRoleResult result = stsClient.assumeRole(request); + credentials = result.getCredentials(); + } else { + LOG.info( + "Obtaining session credentials via GetSessionToken with access key: {}", + accessKey); + GetSessionTokenResult result = stsClient.getSessionToken(); + credentials = result.getCredentials(); + } - AWSSecurityTokenService stsClient = + LOG.info( + "Session credentials obtained successfully with access key: {} expiration: {}", + credentials.getAccessKeyId(), + credentials.getExpiration()); + + return new ObtainedSecurityToken( + scheme, + toJson(credentials), + credentials.getExpiration().getTime(), + additionInfos); + } finally { + stsClient.shutdown(); + } + } + + private AWSSecurityTokenService buildStsClient() { + AWSSecurityTokenServiceClientBuilder builder = AWSSecurityTokenServiceClientBuilder.standard() - .withRegion(region) .withCredentials( new AWSStaticCredentialsProvider( - new BasicAWSCredentials(accessKey, secretKey))) - .build(); - GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken(); - Credentials credentials = sessionTokenResult.getCredentials(); - - LOG.info( - "Session credentials obtained successfully with access key: {} expiration: {}", - credentials.getAccessKeyId(), - credentials.getExpiration()); - - return new ObtainedSecurityToken( - scheme, toJson(credentials), credentials.getExpiration().getTime(), additionInfos); + new BasicAWSCredentials(accessKey, secretKey))); + + if (stsEndpoint != null) { + builder.withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(stsEndpoint, region)); + } else { + builder.withRegion(region); + } + + return builder.build(); } private byte[] toJson(Credentials credentials) { diff --git a/website/docs/maintenance/filesystems/s3.md b/website/docs/maintenance/filesystems/s3.md index db0e89531c..1394c0341e 100644 --- a/website/docs/maintenance/filesystems/s3.md +++ b/website/docs/maintenance/filesystems/s3.md @@ -21,3 +21,42 @@ s3.secret-key: # region s3.region: ``` + +## S3-Compatible Storage (RustFS, MinIO, etc.) + +For S3-compatible storage services such as [RustFS](https://github.com/rustfs/rustfs) or MinIO, you need to configure a custom endpoint and enable path-style access: + +```yaml +remote.data.dir: s3:///path/to/remote/storage +s3.endpoint: http://:9000 +s3.access-key: +s3.secret-key: +s3.region: us-east-1 +s3.path-style-access: true +``` + +### AssumeRole STS Configuration + +Some S3-compatible services (such as RustFS) require the use of [AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) instead of `GetSessionToken` to obtain temporary security credentials. This is necessary for features like KV snapshots that rely on delegation tokens. + +To enable AssumeRole, add the following configurations alongside the base S3 settings above: + +```yaml +remote.data.dir: s3:///path/to/remote/storage +s3.endpoint: http://:9000 +s3.access-key: +s3.secret-key: +s3.region: us-east-1 +s3.path-style-access: true +s3.assumed.role.arn: +s3.assumed.role.sts.endpoint: http://:9000 +``` + +| Configuration | Description | +|---|---| +| `s3.assumed.role.arn` | The ARN of the IAM role to assume. When set, Fluss uses `AssumeRole` instead of `GetSessionToken` to obtain temporary credentials. The `s3.access-key` and `s3.secret-key` are still required — they authenticate the AssumeRole call itself. | +| `s3.assumed.role.sts.endpoint` | Custom STS endpoint URL. Required for S3-compatible services that host their own STS API. When not set, the default AWS STS endpoint is used. | + +:::note +Without `s3.assumed.role.arn`, Fluss falls back to `GetSessionToken` (the default AWS behavior). This is fully backward compatible — existing AWS users do not need to change their configuration. +::: diff --git a/website/docs/quickstart/flink.md b/website/docs/quickstart/flink.md index f94161f565..276d07b0e6 100644 --- a/website/docs/quickstart/flink.md +++ b/website/docs/quickstart/flink.md @@ -92,7 +92,10 @@ services: s3.endpoint: http://rustfs:9000 s3.access-key: rustfsadmin s3.secret-key: rustfsadmin + s3.region: us-east-1 s3.path-style-access: true + s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin + s3.assumed.role.sts.endpoint: http://rustfs:9000 tablet-server: image: apache/fluss:$FLUSS_DOCKER_VERSION$ command: tabletServer @@ -108,8 +111,10 @@ services: s3.endpoint: http://rustfs:9000 s3.access-key: rustfsadmin s3.secret-key: rustfsadmin + s3.region: us-east-1 s3.path-style-access: true - kv.snapshot.interval: 0s + s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin + s3.assumed.role.sts.endpoint: http://rustfs:9000 zookeeper: restart: always image: zookeeper:3.9.2 @@ -161,7 +166,7 @@ volumes: The Docker Compose environment consists of the following containers: - **RustFS:** an S3-compatible object storage for tiered storage. You can access the RustFS console at http://localhost:9001 with credentials `rustfsadmin/rustfsadmin`. An init container (`rustfs-init`) automatically creates the `fluss` bucket on startup. - **Fluss Cluster:** a Fluss `CoordinatorServer`, a Fluss `TabletServer` and a `ZooKeeper` server. - - Credentials are configured directly with `s3.access-key` and `s3.secret-key`. Production systems should use CredentialsProvider chain specific to cloud environments. + - Credentials are configured directly with `s3.access-key` and `s3.secret-key`. The `s3.assumed.role.arn` and `s3.assumed.role.sts.endpoint` options configure [AssumeRole STS](/maintenance/filesystems/s3.md#assumerole-sts-configuration) which is required by RustFS for delegation token support. Production systems should use CredentialsProvider chain specific to cloud environments. - **Flink Cluster**: a Flink `JobManager`, a Flink `TaskManager`, and a Flink SQL client container to execute queries. :::tip diff --git a/website/docs/quickstart/lakehouse.md b/website/docs/quickstart/lakehouse.md index 6223c22d45..f8fa6ae8ab 100644 --- a/website/docs/quickstart/lakehouse.md +++ b/website/docs/quickstart/lakehouse.md @@ -110,7 +110,10 @@ services: s3.endpoint: http://rustfs:9000 s3.access-key: rustfsadmin s3.secret-key: rustfsadmin - s3.path.style.access: true + s3.region: us-east-1 + s3.path-style-access: true + s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin + s3.assumed.role.sts.endpoint: http://rustfs:9000 datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: s3://fluss/paimon @@ -135,8 +138,10 @@ services: s3.endpoint: http://rustfs:9000 s3.access-key: rustfsadmin s3.secret-key: rustfsadmin - s3.path.style.access: true - kv.snapshot.interval: 0s + s3.region: us-east-1 + s3.path-style-access: true + s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin + s3.assumed.role.sts.endpoint: http://rustfs:9000 datalake.format: paimon datalake.paimon.metastore: filesystem datalake.paimon.warehouse: s3://fluss/paimon @@ -327,7 +332,10 @@ services: s3.endpoint: http://rustfs:9000 s3.access-key: rustfsadmin s3.secret-key: rustfsadmin - s3.path.style.access: true + s3.region: us-east-1 + s3.path-style-access: true + s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin + s3.assumed.role.sts.endpoint: http://rustfs:9000 datalake.format: iceberg datalake.iceberg.catalog-impl: org.apache.iceberg.jdbc.JdbcCatalog datalake.iceberg.name: fluss_catalog @@ -356,12 +364,14 @@ services: zookeeper.address: zookeeper:2181 bind.listeners: FLUSS://tablet-server:9123 data.dir: /tmp/fluss/data - kv.snapshot.interval: 0s remote.data.dir: s3://fluss/remote-data s3.endpoint: http://rustfs:9000 s3.access-key: rustfsadmin s3.secret-key: rustfsadmin - s3.path.style.access: true + s3.region: us-east-1 + s3.path-style-access: true + s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin + s3.assumed.role.sts.endpoint: http://rustfs:9000 datalake.format: iceberg datalake.iceberg.catalog-impl: org.apache.iceberg.jdbc.JdbcCatalog datalake.iceberg.name: fluss_catalog diff --git a/website/package.json b/website/package.json index 91c57d3c44..b95957884f 100644 --- a/website/package.json +++ b/website/package.json @@ -49,5 +49,9 @@ }, "engines": { "node": ">=20.0" - } + }, + "overrides": { + "webpackbar": "^7.0.0" + }, + "comment:overrides": "webpackbar 6.x passes its own options (name, color) as this.options, which webpack 5.106.0+ rejects via strict ProgressPlugin schema validation. webpackbar 7.0.0 fixes this. Remove this override once Docusaurus bumps its webpackbar dependency to ^7.0.0." }