Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions .agent/skills/adding-new-metadata/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
---
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: adding-new-metadata
description: Guide on how to add and propagate new metadata fields in Apache Beam's WindowedValue, extending protos, windmill persistence, and runner interfaces to avoid metadata loss.
---

# Adding New Metadata to WindowedValue

This skill provides a comprehensive guide on adding new metadata (e.g., CDC metadata, drain mode flags, OpenTelemetry trace context) to Apache Beam's `WindowedValue` and ensuring it propagates correctly through the execution engine. Failing to propagate metadata in all necessary places will result in metadata loss during pipeline execution.

## 1. Extending the Proto Model

When adding new metadata that must cross worker boundaries or be serialized by the Fn API, the proto definitions must be updated.

* **Key Files:** `model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto`
* **Action:** Add the new metadata field to the appropriate message (`ElementMetadata`).
* **Note:** Add proper documentation in proto. Type of the field can be different from the type in WindowedValue, see OpenTelemetry trace context for example.

## 2. WindowedValue Interface and Implementations

The `WindowedValue` is the core container for elements flowing through a Beam pipeline. It holds the value, timestamp, windows, pane info, and any additional metadata.

### Core Interface Updates
* **Key File:** `sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java`
* **Action:** Add getter methods for your new metadata.

### Concrete Implementations
You must update **all** concrete implementations of `WindowedValue` to store and return the new metadata. If you miss one, metadata will be silently dropped.
* `ValueInGlobalWindow`
* `ValueInSingleWindow`
* `ValueInEmptyWindows` (often used inside runners, like Dataflow's worker package)
* **Action:** Update constructors, factory methods (`of()`), fields in these classes and coders.

### OutputBuilder vs. Context Output
* **IMPORTANT:** Do **not** add new arguments to legacy methods like `context.outputWindowedValue(...)` or `WindowedValue.of(value, timestamp, windows, pane)`. This causes brittleness and breaks the API for every new metadata field.
* **Action:** Modify `OutputBuilder` (`sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java`) to accept the new metadata (e.g., `.withDrainMode(...)`, `.withTraceContext(...)`). Use the builder pattern when constructing outputs to propagate offset and record IDs smoothly.

## 3. Windmill Persistence (Dataflow Streaming Engine) Runner v1

For the Dataflow streaming runner, metadata must survive serialization to and from the Windmill backend.

* **Serialization (Sink):**
* **File:** `runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java`
* **Action:** Extract the metadata from the `WindowedValue`, and add it to already created ElementMetadata proto builder.
* **Deserialization (Reader):**
* **Files:** `runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java` and `WindowingWindmillReader.java`
* **Action:** Extract the metadata from ElementMetadata proto and reconstruct the `WindowedValue` using the updated factory methods/builders that include the metadata. This is incremental work, as plenty of metadata is already extracted from the proto.

## 4. Propagation Across Core Classes

Metadata must be explicitly copied or forwarded whenever a `WindowedValue` is transformed, buffered, or processed.

### DoFn Runners (Java Core)
You must ensure that when a DoFn processes an element and outputs a new element, the appropriate metadata from the *input* is propagated to the *output* (unless explicitly changed by the logic).
* `runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java`
* `runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java`
* `runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java`
* `runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java`
* `sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java`

**Action:** When these runners call `outputWindowedValue()`, they should extract the metadata from the input or current context and attach it using the `OutputBuilder` or the new `WindowedValue` interfaces.

### Grouping and Reducing
* `runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java`
* `runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java`
* **Action:** Ensure that during GroupByKey/Combine operations, if metadata needs to be preserved (e.g., `CausedByDrain`), it is correctly passed into the `ReduceFnContextFactory` and propagated when outputting the grouped results.

### Splittable DoFns (SDF)
* `runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java`
* `sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java`

### Timers
If metadata needs to survive timer firings (e.g., knowing an `@OnTimer` fired because of a system drain), it must be added to Timer data structures. This is a bit of uncharted area which was only implemented for CausedByDrain metadata that comes from backend, not from persisted metadata. In order to persist all WindowedValue metadata across timer, more work has to be done, below are some pointers:
* `runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java` and implementations (e.g., `WindmillTimerInternals.java` in Dataflow).
* `runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java` (or generic `TimerData`).
* **Action:** Add the field to `TimerData`, next to `CausedByDrain`. Propagate it when setting the timer and expose it when the timer fires so it bubbles up.
* Eventually, metadata from Timer lands in WindowedValue, so it can be exposed to users. Keep field names, types, and getters similar to WindowedValue as much as possible, as common interface may be introduced eventually.

## 5. Exposing Metadata to the User (DoFn Signatures)

User needs to access the metadata in their `DoFn` (e.g., `@ProcessElement public void process(ProcessContext c, CausedByDrain drain) { ... }`), you must update the reflection and bytecode generation logic.

* **Files:**
* `sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java`
* `sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java`
* `sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java`
* `sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java`
* **Action:** Add logic to detect the new parameter type in the DoFn method signature. Generate bytecode using ByteBuddy to extract the property from the `WindowedValue` or context and pass it as an argument during method invocation.

## Checklist for Adding New Metadata

1. [ ] Define the metadata in `beam_fn_api.proto` (if applicable).
2. [ ] Add getters to the `WindowedValue` interface.
3. [ ] Update `ValueInGlobalWindow`, `ValueInSingleWindow`, `ValueInEmptyWindows` to store the metadata.
4. [ ] Update `OutputBuilder` to accept the metadata.
5. [ ] Update `WindmillSink` to serialize the metadata to the backend.
6. [ ] Update `UngroupedWindmillReader` and `WindowingWindmillReader` to deserialize the metadata.
7. [ ] Update `WindmillKeyedWorkItem`.
8. [ ] Update `SimpleDoFnRunner`, `StatefulDoFnRunner`, and `FnApiDoFnRunner` to propagate the metadata from input to output.
9. [ ] Update `ReduceFnRunner` and `OutputAndTimeBoundedSplittableProcessElementInvoker` for complex transform propagation.
10. [ ] If required by timers, update `TimerData` and `TimerInternals`.
11. [ ] If exposed to the user, update `DoFnSignatures` and `ByteBuddyDoFnInvokerFactory`.
12. [ ] Update other runners (Flink, Spark, Samza) to ensure they propagate the new `WindowedValue` fields correctly in their specific operators/runners.
Loading