Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.apache.fluss.utils.Preconditions.checkNotNull;

Expand All @@ -47,10 +53,15 @@ public class S3DelegationTokenProvider {
private static final String REGION_KEY = "fs.s3a.region";
private static final String ENDPOINT_KEY = "fs.s3a.endpoint";

private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn";
private static final String STS_ENDPOINT_KEY = "fs.s3a.assumed.role.sts.endpoint";

private final String scheme;
private final String region;
private final String accessKey;
private final String secretKey;
@Nullable private final String roleArn;
@Nullable private final String stsEndpoint;
private final Map<String, String> additionInfos;

public S3DelegationTokenProvider(String scheme, Configuration conf) {
Expand All @@ -59,6 +70,8 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) {
checkNotNull(region, "Region is not set.");
this.accessKey = conf.get(ACCESS_KEY_ID);
this.secretKey = conf.get(ACCESS_KEY_SECRET);
this.roleArn = conf.get(ROLE_ARN_KEY);
this.stsEndpoint = conf.get(STS_ENDPOINT_KEY);
this.additionInfos = new HashMap<>();
for (String key : Arrays.asList(REGION_KEY, ENDPOINT_KEY)) {
if (conf.get(key) != null) {
Expand All @@ -68,25 +81,59 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) {
}

public ObtainedSecurityToken obtainSecurityToken() {
LOG.info("Obtaining session credentials token with access key: {}", accessKey);
AWSSecurityTokenService stsClient = buildStsClient();
try {
Credentials credentials;

if (roleArn != null) {
LOG.info(
"Obtaining session credentials via AssumeRole with access key: {}, role: {}",
accessKey,
roleArn);
AssumeRoleRequest request =
new AssumeRoleRequest()
.withRoleArn(roleArn)
.withRoleSessionName("fluss-" + UUID.randomUUID());
AssumeRoleResult result = stsClient.assumeRole(request);
credentials = result.getCredentials();
} else {
LOG.info(
"Obtaining session credentials via GetSessionToken with access key: {}",
accessKey);
GetSessionTokenResult result = stsClient.getSessionToken();
credentials = result.getCredentials();
}

AWSSecurityTokenService stsClient =
LOG.info(
"Session credentials obtained successfully with access key: {} expiration: {}",
credentials.getAccessKeyId(),
credentials.getExpiration());

return new ObtainedSecurityToken(
scheme,
toJson(credentials),
credentials.getExpiration().getTime(),
additionInfos);
} finally {
stsClient.shutdown();
}
}

private AWSSecurityTokenService buildStsClient() {
AWSSecurityTokenServiceClientBuilder builder =
AWSSecurityTokenServiceClientBuilder.standard()
.withRegion(region)
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(accessKey, secretKey)))
.build();
GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken();
Credentials credentials = sessionTokenResult.getCredentials();

LOG.info(
"Session credentials obtained successfully with access key: {} expiration: {}",
credentials.getAccessKeyId(),
credentials.getExpiration());

return new ObtainedSecurityToken(
scheme, toJson(credentials), credentials.getExpiration().getTime(), additionInfos);
new BasicAWSCredentials(accessKey, secretKey)));

if (stsEndpoint != null) {
builder.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(stsEndpoint, region));
} else {
builder.withRegion(region);
}

return builder.build();
}

private byte[] toJson(Credentials credentials) {
Expand Down
39 changes: 39 additions & 0 deletions website/docs/maintenance/filesystems/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,42 @@ s3.secret-key: <your-secret-key>
# region
s3.region: <your-s3-region>
```

## S3-Compatible Storage (RustFS, MinIO, etc.)

For S3-compatible storage services such as [RustFS](https://github.com/rustfs/rustfs) or MinIO, you need to configure a custom endpoint and enable path-style access:

```yaml
remote.data.dir: s3://<your-bucket>/path/to/remote/storage
s3.endpoint: http://<your-s3-compatible-endpoint>:9000
s3.access-key: <your-access-key>
s3.secret-key: <your-secret-key>
s3.region: us-east-1
s3.path-style-access: true
```

### AssumeRole STS Configuration

Some S3-compatible services (such as RustFS) require the use of [AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) instead of `GetSessionToken` to obtain temporary security credentials. This is necessary for features like KV snapshots that rely on delegation tokens.

To enable AssumeRole, add the following configurations alongside the base S3 settings above:

```yaml
remote.data.dir: s3://<your-bucket>/path/to/remote/storage
s3.endpoint: http://<your-s3-compatible-endpoint>:9000
s3.access-key: <your-access-key>
s3.secret-key: <your-secret-key>
s3.region: us-east-1
s3.path-style-access: true
s3.assumed.role.arn: <your-role-arn>
s3.assumed.role.sts.endpoint: http://<your-s3-compatible-endpoint>:9000
```

| Configuration | Description |
|---|---|
| `s3.assumed.role.arn` | The ARN of the IAM role to assume. When set, Fluss uses `AssumeRole` instead of `GetSessionToken` to obtain temporary credentials. The `s3.access-key` and `s3.secret-key` are still required — they authenticate the AssumeRole call itself. |
| `s3.assumed.role.sts.endpoint` | Custom STS endpoint URL. Required for S3-compatible services that host their own STS API. When not set, the default AWS STS endpoint is used. |

:::note
Without `s3.assumed.role.arn`, Fluss falls back to `GetSessionToken` (the default AWS behavior). This is fully backward compatible — existing AWS users do not need to change their configuration.
:::
9 changes: 7 additions & 2 deletions website/docs/quickstart/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ services:
s3.endpoint: http://rustfs:9000
s3.access-key: rustfsadmin
s3.secret-key: rustfsadmin
s3.region: us-east-1
s3.path-style-access: true
s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin
s3.assumed.role.sts.endpoint: http://rustfs:9000
tablet-server:
image: apache/fluss:$FLUSS_DOCKER_VERSION$
command: tabletServer
Expand All @@ -108,8 +111,10 @@ services:
s3.endpoint: http://rustfs:9000
s3.access-key: rustfsadmin
s3.secret-key: rustfsadmin
s3.region: us-east-1
s3.path-style-access: true
kv.snapshot.interval: 0s
s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin
s3.assumed.role.sts.endpoint: http://rustfs:9000
zookeeper:
restart: always
image: zookeeper:3.9.2
Expand Down Expand Up @@ -161,7 +166,7 @@ volumes:
The Docker Compose environment consists of the following containers:
- **RustFS:** an S3-compatible object storage for tiered storage. You can access the RustFS console at http://localhost:9001 with credentials `rustfsadmin/rustfsadmin`. An init container (`rustfs-init`) automatically creates the `fluss` bucket on startup.
- **Fluss Cluster:** a Fluss `CoordinatorServer`, a Fluss `TabletServer` and a `ZooKeeper` server.
- Credentials are configured directly with `s3.access-key` and `s3.secret-key`. Production systems should use CredentialsProvider chain specific to cloud environments.
- Credentials are configured directly with `s3.access-key` and `s3.secret-key`. The `s3.assumed.role.arn` and `s3.assumed.role.sts.endpoint` options configure [AssumeRole STS](/maintenance/filesystems/s3.md#assumerole-sts-configuration) which is required by RustFS for delegation token support. Production systems should use CredentialsProvider chain specific to cloud environments.
- **Flink Cluster**: a Flink `JobManager`, a Flink `TaskManager`, and a Flink SQL client container to execute queries.

:::tip
Expand Down
22 changes: 16 additions & 6 deletions website/docs/quickstart/lakehouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ services:
s3.endpoint: http://rustfs:9000
s3.access-key: rustfsadmin
s3.secret-key: rustfsadmin
s3.path.style.access: true
s3.region: us-east-1
s3.path-style-access: true
s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin
s3.assumed.role.sts.endpoint: http://rustfs:9000
datalake.format: paimon
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: s3://fluss/paimon
Expand All @@ -135,8 +138,10 @@ services:
s3.endpoint: http://rustfs:9000
s3.access-key: rustfsadmin
s3.secret-key: rustfsadmin
s3.path.style.access: true
kv.snapshot.interval: 0s
s3.region: us-east-1
s3.path-style-access: true
s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin
s3.assumed.role.sts.endpoint: http://rustfs:9000
datalake.format: paimon
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: s3://fluss/paimon
Expand Down Expand Up @@ -327,7 +332,10 @@ services:
s3.endpoint: http://rustfs:9000
s3.access-key: rustfsadmin
s3.secret-key: rustfsadmin
s3.path.style.access: true
s3.region: us-east-1
s3.path-style-access: true
s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin
s3.assumed.role.sts.endpoint: http://rustfs:9000
datalake.format: iceberg
datalake.iceberg.catalog-impl: org.apache.iceberg.jdbc.JdbcCatalog
datalake.iceberg.name: fluss_catalog
Expand Down Expand Up @@ -356,12 +364,14 @@ services:
zookeeper.address: zookeeper:2181
bind.listeners: FLUSS://tablet-server:9123
data.dir: /tmp/fluss/data
kv.snapshot.interval: 0s
remote.data.dir: s3://fluss/remote-data
s3.endpoint: http://rustfs:9000
s3.access-key: rustfsadmin
s3.secret-key: rustfsadmin
s3.path.style.access: true
s3.region: us-east-1
s3.path-style-access: true
s3.assumed.role.arn: arn:aws:iam::000000000000:role/rustfsadmin
s3.assumed.role.sts.endpoint: http://rustfs:9000
datalake.format: iceberg
datalake.iceberg.catalog-impl: org.apache.iceberg.jdbc.JdbcCatalog
datalake.iceberg.name: fluss_catalog
Expand Down
6 changes: 5 additions & 1 deletion website/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,9 @@
},
"engines": {
"node": ">=20.0"
}
},
"overrides": {
"webpackbar": "^7.0.0"
},
"comment:overrides": "webpackbar 6.x passes its own options (name, color) as this.options, which webpack 5.106.0+ rejects via strict ProgressPlugin schema validation. webpackbar 7.0.0 fixes this. Remove this override once Docusaurus bumps its webpackbar dependency to ^7.0.0."
}