Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions flink-filesystems/flink-s3-fs-native/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,29 @@ input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"),
| s3.assume-role.session-name | flink-s3-session | Session name for the assumed role |
| s3.assume-role.session-duration | 3600 | Session duration in seconds (900-43200) |

## Bucket-Level Configuration

The Native S3 FileSystem supports per-bucket configuration overrides, allowing different S3 buckets to use different connection settings within the same Flink cluster. This enables scenarios like:

- **Checkpointing to one bucket** with specific credentials
- **Savepoints to another bucket** with different region/endpoint
- **Data sinks to third-party buckets** with cross-account IAM roles

### Format

Bucket-level configuration uses the format: `s3.bucket.<bucket-name>.<property>`

Bucket names containing dots (e.g., `my.company.data`) are fully supported through longest-suffix matching.

### Supported Properties

All global S3 configuration properties can be overridden at the bucket level:

- **Connection:** `region`, `endpoint`, `path-style-access`
- **Credentials:** `access-key`, `secret-key`, `credentials.provider`
- **Encryption:** `sse.type`, `sse.kms-key-id`
- **IAM Assume Role:** `assume-role.arn`, `assume-role.external-id`, `assume-role.session-name`, `assume-role.session-duration`

## Server-Side Encryption (SSE)

The filesystem supports server-side encryption for data at rest:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.s3native;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/**
* Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}.
*
* <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket
* names containing dots are supported; properties are matched by longest suffix first.
*
* <p>Immutable and thread-safe after construction.
*/
@Internal
final class BucketConfigProvider {

private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class);
static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
static final Map<String, BiConsumer<S3BucketConfig.Builder, String>> PROPERTY_APPLICATORS;
static final List<String> KNOWN_PROPERTIES_BY_LENGTH;

static {
final Map<String, BiConsumer<S3BucketConfig.Builder, String>> applicators =
new LinkedHashMap<>();
applicators.put("access-key", (b, v) -> b.accessKey(v));
applicators.put("assume-role.arn", (b, v) -> b.assumeRoleArn(v));
applicators.put("assume-role.external-id", (b, v) -> b.assumeRoleExternalId(v));
applicators.put(
"assume-role.session-duration",
(b, v) -> {
try {
b.assumeRoleSessionDurationSeconds(Integer.parseInt(v));
} catch (NumberFormatException e) {
throw new IllegalConfigurationException(
String.format(
"Invalid assume-role.session-duration '%s' for bucket '%s'. "
+ "Must be a valid integer (e.g., 3600)",
v, b.getBucketName()),
e);
}
});
applicators.put("assume-role.session-name", (b, v) -> b.assumeRoleSessionName(v));
applicators.put("credentials.provider", (b, v) -> b.credentialsProvider(v));
applicators.put("endpoint", (b, v) -> b.endpoint(v));
applicators.put("path-style-access", (b, v) -> b.pathStyleAccess(Boolean.parseBoolean(v)));
applicators.put("region", (b, v) -> b.region(v));
applicators.put("sse.kms-key-id", (b, v) -> b.sseKmsKeyId(v));
applicators.put("sse.type", (b, v) -> b.sseType(v));
applicators.put("secret-key", (b, v) -> b.secretKey(v));
PROPERTY_APPLICATORS = Collections.unmodifiableMap(applicators);

KNOWN_PROPERTIES_BY_LENGTH =
applicators.keySet().stream()
.sorted(Comparator.comparingInt(String::length).reversed())
.collect(Collectors.toList());
}

private final Map<String, S3BucketConfig> bucketConfigs;

BucketConfigProvider(Configuration flinkConfig) {
this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
}

@Nullable
S3BucketConfig getBucketConfig(String bucketName) {
return bucketConfigs.get(bucketName);
}

@VisibleForTesting
boolean hasBucketConfig(String bucketName) {
return bucketConfigs.containsKey(bucketName);
}

@VisibleForTesting
int size() {
return bucketConfigs.size();
}

private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) {
final Map<String, Map<String, String>> rawConfigs = new HashMap<>();

for (final String key : flinkConfig.keySet()) {
if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
Comment thread
Samrat002 marked this conversation as resolved.
continue;
}
final String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
final String value = flinkConfig.getString(key, null);
if (value == null) {
continue;
}

for (final String prop : KNOWN_PROPERTIES_BY_LENGTH) {
if (suffix.endsWith("." + prop)) {
final String bucketName =
suffix.substring(0, suffix.length() - prop.length() - 1);
if (bucketName.isEmpty()) {
LOG.warn(
"Ignoring bucket config key '{}': "
+ "resolved bucket name is empty (missing bucket name between "
+ "'s3.bucket.' prefix and '.{}' property?).",
key,
prop);
} else {
rawConfigs
.computeIfAbsent(bucketName, k -> new HashMap<>())
.put(prop, value);
}
break;
}
}
}

final Map<String, S3BucketConfig> result = new HashMap<>();
for (final Map.Entry<String, Map<String, String>> entry : rawConfigs.entrySet()) {
final String bucketName = entry.getKey();
final Map<String, String> props = entry.getValue();

final S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props);
if (bucketConfig.hasAnyOverride()) {
result.put(bucketName, bucketConfig);
LOG.info(
Comment thread
Samrat002 marked this conversation as resolved.
"Registered bucket-specific configuration for bucket '{}': {}",
bucketName,
bucketConfig);
}
}

return result;
}

private static S3BucketConfig buildBucketConfig(String bucketName, Map<String, String> props) {
final S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName);

for (final Map.Entry<String, BiConsumer<S3BucketConfig.Builder, String>> entry :
PROPERTY_APPLICATORS.entrySet()) {
final String value = props.get(entry.getKey());
if (value != null) {
entry.getValue().accept(builder, value);
}
}

return builder.build();
}
}
Loading