Skip to content
Open
36 changes: 36 additions & 0 deletions docs/docs/kafka-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ for exactly-once semantics. This requires Kafka 2.5 or later.
| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (`cores * 2`) |
| iceberg.control.commit.stale-max-blocking-retries | Number of retries before a stale group fails the connector, default is 3. Set 0 to fail immediately. |
| iceberg.coordinator.transactional.prefix | Prefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
Expand Down Expand Up @@ -364,6 +365,41 @@ See above for creating two tables.
}
```

## Commit behavior

The coordinator commits data to Iceberg tables in cycles. Each cycle, the coordinator
sends a `StartCommit` to the control topic, waits for worker responses (`DataWritten`
and `DataComplete`), and then commits the collected data files to Iceberg.

### Partial commits and stale events

When a commit times out (a worker hasn't responded within `commit.timeout-ms`), the
coordinator performs a **partial commit** with the data files received so far. Workers
that finish after the timeout send their `DataWritten` events to the control topic,
which are consumed in the next commit cycle.

These late-arriving ("stale") events carry the previous cycle's `commitId`. The
coordinator separates them from current-cycle events and commits each group in a
separate `RowDelta` with its own Iceberg sequence number. This ensures equality
deletes from the current cycle correctly apply to stale data files (because
`data_sequence_number < delete_sequence_number` is required).

### Stale event error handling

If a stale group fails to commit, the coordinator retries it up to
`stale-max-blocking-retries` times (default 3). During retries, the stale group blocks
subsequent groups for the same table to preserve sequence number ordering. After max
retries are exhausted, the coordinator throws a `ConnectException`, moving the connector
to `FAILED` state for operator intervention.

### JMX monitoring

The coordinator registers a `CommitStateMXBean` under
`org.apache.iceberg.connect:type=CommitState,connector=<name>` exposing:

- `StaleGroupCount`: Number of distinct stale commitId groups in the buffer.
- `BufferSize`: Total number of envelopes in the commit buffer.

## SMTs for the Apache Iceberg Sink Connector

This project contains some SMTs that could be useful when transforming Kafka data for use by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String COMMIT_TIMEOUT_MS_PROP = "iceberg.control.commit.timeout-ms";
private static final int COMMIT_TIMEOUT_MS_DEFAULT = 30_000;
private static final String COMMIT_THREADS_PROP = "iceberg.control.commit.threads";
private static final String COMMIT_STALE_MAX_BLOCKING_RETRIES_PROP =
"iceberg.control.commit.stale-max-blocking-retries";
private static final int COMMIT_STALE_MAX_BLOCKING_RETRIES_DEFAULT = 3;
private static final String CONNECT_GROUP_ID_PROP = "iceberg.connect.group-id";
private static final String TRANSACTIONAL_PREFIX_PROP =
"iceberg.coordinator.transactional.prefix";
Expand Down Expand Up @@ -217,6 +220,14 @@ private static ConfigDef newConfigDef() {
Runtime.getRuntime().availableProcessors() * 2,
Importance.MEDIUM,
"Coordinator threads to use for table commits, default is (cores * 2)");
configDef.define(
COMMIT_STALE_MAX_BLOCKING_RETRIES_PROP,
ConfigDef.Type.INT,
COMMIT_STALE_MAX_BLOCKING_RETRIES_DEFAULT,
ConfigDef.Range.atLeast(0),
Importance.LOW,
"Number of times a stale commit group will block subsequent groups before "
+ "failing the connector. Set to 0 to fail immediately.");
configDef.define(
TRANSACTIONAL_PREFIX_PROP,
ConfigDef.Type.STRING,
Expand Down Expand Up @@ -419,6 +430,10 @@ public int commitThreads() {
return getInt(COMMIT_THREADS_PROP);
}

public int commitStaleMaxBlockingRetries() {
return getInt(COMMIT_STALE_MAX_BLOCKING_RETRIES_PROP);
}

public String transactionalPrefix() {
String result = getString(TRANSACTIONAL_PREFIX_PROP);
if (result != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,14 @@ protected Map<Integer, Long> controlTopicOffsets() {
}

protected void commitConsumerOffsets() {
commitConsumerOffsetsTo(controlTopicOffsets());
}

protected void commitConsumerOffsetsTo(Map<Integer, Long> specificOffsets) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = Maps.newHashMap();
controlTopicOffsets()
.forEach(
(k, v) ->
offsetsToCommit.put(new TopicPartition(controlTopic, k), new OffsetAndMetadata(v)));
specificOffsets.forEach(
(k, v) ->
offsetsToCommit.put(new TopicPartition(controlTopic, k), new OffsetAndMetadata(v)));
consumer.commitSync(offsetsToCommit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.iceberg.connect.channel;

import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.connect.IcebergSinkConfig;
Expand All @@ -30,14 +33,17 @@
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.connect.events.TopicPartitionOffset;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CommitState {
class CommitState implements CommitStateMXBean {
private static final Logger LOG = LoggerFactory.getLogger(CommitState.class);

private final List<Envelope> commitBuffer = Lists.newArrayList();
private final List<DataComplete> readyBuffer = Lists.newArrayList();
private final Map<UUID, Integer> groupRetryCount = Maps.newConcurrentMap();
private long startTime;
private UUID currentCommitId;
private final IcebergSinkConfig config;
Expand All @@ -48,8 +54,8 @@ class CommitState {

void addResponse(Envelope envelope) {
commitBuffer.add(envelope);
DataWritten dataWritten = (DataWritten) envelope.event().payload();
if (!isCommitInProgress()) {
DataWritten dataWritten = (DataWritten) envelope.event().payload();
LOG.warn(
"Received commit response when no commit in progress, this can happen during recovery. Commit ID: {}",
dataWritten.commitId());
Expand Down Expand Up @@ -84,6 +90,9 @@ boolean isCommitIntervalReached() {
}

void startNewCommit() {
// Do NOT clear commitBuffer. Stale events from prior failed or timed-out cycles are
// retained for retry. Successfully committed groups are removed selectively by
// removeEnvelopes() after each group's RowDelta commit succeeds.
currentCommitId = UUID.randomUUID();
startTime = System.currentTimeMillis();
}
Expand All @@ -97,6 +106,42 @@ void clearResponses() {
commitBuffer.clear();
}

/**
* Removes only the specified envelopes from the commit buffer. Used after per-group RowDelta
* commits to selectively drain successfully committed events while retaining events from failed
* or skipped groups for retry next cycle.
*/
void removeEnvelopes(Collection<Envelope> committed) {
commitBuffer.removeAll(Sets.newHashSet(committed));

// Clean up tracking maps for commitIds no longer in the buffer.
Set<UUID> remainingIds =
commitBuffer.stream()
.map(env -> ((DataWritten) env.event().payload()).commitId())
.collect(Collectors.toSet());
groupRetryCount.keySet().retainAll(remainingIds);
}

void recordGroupFailure(UUID commitId) {
groupRetryCount.merge(commitId, 1, Integer::sum);
}

void recordGroupSuccess(UUID commitId) {
groupRetryCount.remove(commitId);
}

boolean isGroupBlocking(UUID commitId) {
return groupRetryCount.getOrDefault(commitId, 0) <= config.commitStaleMaxBlockingRetries();
}

int getRetryCount(UUID commitId) {
return groupRetryCount.getOrDefault(commitId, 0);
}

boolean isBufferEmpty() {
return commitBuffer.isEmpty();
}

boolean isCommitTimedOut() {
if (!isCommitInProgress()) {
return false;
Expand Down Expand Up @@ -137,8 +182,150 @@ boolean isCommitReady(int expectedPartitionCount) {
return false;
}

Map<TableReference, List<Envelope>> tableCommitMap() {
return commitBuffer.stream()
// ── MXBean interface methods ──

@Override
public int getStaleGroupCount() {
return staleGroupCount();
}

@Override
public int getBufferSize() {
return bufferSize();
}

// ── Internal accessors ──

int staleGroupCount() {
if (currentCommitId == null) {
return 0;
}
return (int)
commitBuffer.stream()
.map(env -> ((DataWritten) env.event().payload()).commitId())
.filter(cid -> !cid.equals(currentCommitId))
.distinct()
.count();
}

int bufferSize() {
return commitBuffer.size();
}

/**
* Returns the minimum control topic offset per partition among uncommitted envelopes remaining in
* the buffer. Used for partial consumer offset advancement: advancing to these offsets ensures
* uncommitted events survive a restart while already-committed events are not re-consumed.
*/
Map<Integer, Long> remainingEnvelopeMinOffsets() {
Map<Integer, Long> minOffsets = Maps.newHashMap();
for (Envelope env : commitBuffer) {
minOffsets.merge(env.partition(), env.offset(), Long::min);
}
return minOffsets;
}

/**
* Returns commit buffer entries grouped by table, separated by commitId.
*
* <p>After a partial commit (timeout), late-arriving DataWritten events from the previous cycle
* may be present in the buffer alongside current cycle events. Merging them into a single
* RowDelta would assign the same sequence number to both, breaking equality delete semantics
* (which require {@code data_sequence_number < delete_sequence_number}).
*
* <p>This method separates entries by commitId and returns them as an ordered list: stale
* commitIds first (in control topic consumption order), current commitId last. Each map in the
* list should be committed in a separate RowDelta to preserve sequence number ordering.
*/
List<Map<TableReference, List<Envelope>>> tableCommitMaps() {
// LinkedHashMap preserves insertion order from control topic consumption
Map<UUID, List<Envelope>> byCommitId = new LinkedHashMap<>();
for (Envelope envelope : commitBuffer) {
UUID commitId = ((DataWritten) envelope.event().payload()).commitId();
byCommitId.computeIfAbsent(commitId, k -> Lists.newArrayList()).add(envelope);
}

List<Map<TableReference, List<Envelope>>> result = Lists.newArrayList();

// Stale commitIds first (in control topic consumption order)
for (Map.Entry<UUID, List<Envelope>> entry : byCommitId.entrySet()) {
UUID commitId = entry.getKey();
if (currentCommitId != null && commitId.equals(currentCommitId)) {
continue;
}
LOG.warn(
"Stale DataWritten detected: commitId={} (current={}), envelopes={}, "
+ "will commit in separate RowDelta to preserve sequence number ordering",
commitId,
currentCommitId,
entry.getValue().size());
result.add(toTableMap(entry.getValue()));
}

// Current commitId last — ensures highest sequence number
if (currentCommitId != null) {
List<Envelope> currentEnvelopes =
byCommitId.getOrDefault(currentCommitId, Lists.newArrayList());
if (!currentEnvelopes.isEmpty()) {
result.add(toTableMap(currentEnvelopes));
}
}

return result;
}

/**
* Groups commit buffer entries by table, then by commitId within each table. CommitId groups are
* ordered by first appearance in the buffer (insertion order), which matches control topic
* consumption order. This ensures stale groups from prior failed or timed-out cycles sort before
* the current cycle's group.
*
* <p>Each group becomes a separate RowDelta commit with its own Iceberg sequence number, allowing
* equality deletes from newer groups to apply to data files from older groups.
*/
Map<TableReference, List<CommitGroup>> tableCommitGroups() {
Map<TableReference, List<Envelope>> byTable =
commitBuffer.stream()
.collect(
Collectors.groupingBy(
envelope -> ((DataWritten) envelope.event().payload()).tableReference()));

Map<TableReference, List<CommitGroup>> result = new LinkedHashMap<>();
for (Map.Entry<TableReference, List<Envelope>> entry : byTable.entrySet()) {
Map<UUID, List<Envelope>> byCommitId = new LinkedHashMap<>();
for (Envelope env : entry.getValue()) {
UUID cid = ((DataWritten) env.event().payload()).commitId();
byCommitId.computeIfAbsent(cid, k -> Lists.newArrayList()).add(env);
}
List<CommitGroup> groups =
byCommitId.entrySet().stream()
.map(e -> new CommitGroup(e.getKey(), e.getValue()))
.collect(Collectors.toList());
result.put(entry.getKey(), groups);
}
return result;
}

static class CommitGroup {
private final UUID commitId;
private final List<Envelope> envelopes;

CommitGroup(UUID commitId, List<Envelope> envelopes) {
this.commitId = commitId;
this.envelopes = envelopes;
}

UUID commitId() {
return commitId;
}

List<Envelope> envelopes() {
return envelopes;
}
}

private Map<TableReference, List<Envelope>> toTableMap(List<Envelope> envelopes) {
return envelopes.stream()
.collect(
Collectors.groupingBy(
envelope -> ((DataWritten) envelope.event().payload()).tableReference()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.connect.channel;

/**
* JMX MXBean interface for monitoring the commit buffer state. Exposes metrics for buffer size and
* stale group count. Registered under {@code
* org.apache.iceberg.connect:type=CommitState,connector=<name>}.
*/
public interface CommitStateMXBean {
/** Number of distinct stale commitId groups currently in the buffer. */
int getStaleGroupCount();

/** Total number of envelopes currently in the commit buffer. */
int getBufferSize();
}
Loading
Loading