From a05482e92d977a0ea6b8dd39f681f1a58d4c87e8 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 13 Mar 2026 17:05:47 +0100 Subject: [PATCH 1/4] - adds new outputWindowedValue(WV) public interface with implementations - add tests to cover metadata propagation across parDos in single and mulitple stages for direct runner - plumbs metadata propagation, where it was lost previously. --- ...oundedSplittableProcessElementInvoker.java | 20 +++++ .../beam/runners/core/SimpleDoFnRunner.java | 30 +++++++ .../org/apache/beam/sdk/transforms/DoFn.java | 5 ++ .../sdk/transforms/DoFnOutputReceivers.java | 14 +-- .../beam/sdk/transforms/DoFnTester.java | 32 +++++++ .../beam/sdk/transforms/Redistribute.java | 1 + .../org/apache/beam/sdk/transforms/Reify.java | 11 ++- .../SplittableParDoNaiveBounded.java | 19 ++-- .../beam/sdk/values/ValueInSingleWindow.java | 4 + .../beam/sdk/values/WindowedValues.java | 27 ++++-- .../transforms/MetadataPropagationTest.java | 87 +++++++++++++++++++ .../beam/fn/harness/FnApiDoFnRunner.java | 40 +++++++++ 12 files changed, 263 insertions(+), 27 deletions(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index d39801a75587..445bd723f9ed 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -477,6 +477,26 @@ public void outputWindowedValue( element.causedByDrain())); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + noteOutput(); + if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { + ((TimestampObservingWatermarkEstimator) watermarkEstimator) + .observeTimestamp(windowedValue.getTimestamp()); + } + outputReceiver.output(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + noteOutput(); + if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { + ((TimestampObservingWatermarkEstimator) watermarkEstimator) + .observeTimestamp(windowedValue.getTimestamp()); + } + outputReceiver.output(tag, windowedValue); + } + private void noteOutput() { checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()"); checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()"); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 74f5a4d09001..cbd58f51e45f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -443,6 +443,16 @@ public void output(TupleTag tag, T output) { SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); @@ -1027,6 +1037,16 @@ public void outputWindowedValue( .output(); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( @@ -1286,6 +1306,16 @@ public void outputWindowedValue( .output(); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 41beb93a5cbe..0d892ab12d33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @@ -284,6 +285,10 @@ public abstract void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo); + + public abstract void outputWindowedValue(TupleTag tag, WindowedValue windowedValue); + + public abstract void outputWindowedValue(WindowedValue windowedValue); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java index e2c0825e0274..6873c1792de5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java @@ -120,19 +120,9 @@ public OutputBuilder builder(T value) { @Override public void output(WindowedValue windowedValue) { if (outputTag != null) { - context.outputWindowedValue( - outputTag, - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo()); + context.outputWindowedValue(outputTag, windowedValue); } else { - ((DoFn.WindowedContext) context) - .outputWindowedValue( - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo()); + ((DoFn.WindowedContext) context).outputWindowedValue(windowedValue); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index a4f3cba21050..5eccffe94234 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -649,6 +649,38 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp CausedByDrain.NORMAL)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + for (BoundedWindow w : windowedValue.getWindows()) { + getMutableOutput(mainOutputTag) + .add( + ValueInSingleWindow.of( + windowedValue.getValue(), + windowedValue.getTimestamp(), + w, + windowedValue.getPaneInfo(), + windowedValue.getRecordId(), + windowedValue.getRecordOffset(), + windowedValue.causedByDrain())); + } + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + for (BoundedWindow w : windowedValue.getWindows()) { + getMutableOutput(tag) + .add( + ValueInSingleWindow.of( + windowedValue.getValue(), + windowedValue.getTimestamp(), + w, + windowedValue.getPaneInfo(), + windowedValue.getRecordId(), + windowedValue.getRecordOffset(), + windowedValue.causedByDrain())); + } + } + @Override public void outputWindowedValue( TupleTag tag, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java index 5fb4ea61e8ef..44f27824382d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java @@ -187,6 +187,7 @@ public void processElement( .setTimestamp(kv.getValue().getTimestamp()) .setWindow(kv.getValue().getWindow()) .setPaneInfo(kv.getValue().getPaneInfo()) + .setCausedByDrain(kv.getValue().getCausedByDrain()) .output(); } })); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java index af125d9e63e8..9974d29bfa0f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -141,16 +142,24 @@ public PCollection>> expand(PCollection> i new DoFn, KV>>() { @ProcessElement public void processElement( + ProcessContext pc, @Element KV element, @DoFn.Timestamp Instant timestamp, BoundedWindow window, PaneInfo paneInfo, + CausedByDrain causedByDrain, OutputReceiver>> r) { r.output( KV.of( element.getKey(), ValueInSingleWindow.of( - element.getValue(), timestamp, window, paneInfo))); + element.getValue(), + timestamp, + window, + paneInfo, + pc.currentRecordId(), + pc.currentRecordOffset(), + causedByDrain))); } })) .setCoder( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index 6d058b3b6ada..201040d25f16 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.checkerframework.checker.nullness.qual.Nullable; @@ -560,13 +561,7 @@ public OutputReceiver outputReceiver(DoFn doFn) { public OutputBuilder builder(OutputT value) { return outputBuilderSupplier .builder(value) - .setReceiver( - windowedValue -> - outerContext.outputWindowedValue( - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + .setReceiver(windowedValue -> outerContext.outputWindowedValue(windowedValue)); } }; } @@ -659,6 +654,16 @@ public void outputWindowedValue( outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outerContext.outputWindowedValue(windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outerContext.outputWindowedValue(tag, windowedValue); + } + @Override public InputT element() { return element; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index 30c06c5f0d9a..0d8b2f7515e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -123,6 +123,10 @@ public void encode(ValueInSingleWindow windowedElem, OutputStream outStream, BeamFnApi.Elements.ElementMetadata.Builder builder = BeamFnApi.Elements.ElementMetadata.newBuilder(); // todo #33176 specify additional metadata in the future + builder.setDrain( + windowedElem.getCausedByDrain() == CausedByDrain.CAUSED_BY_DRAIN + ? BeamFnApi.Elements.DrainMode.Enum.DRAINING + : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING); BeamFnApi.Elements.ElementMetadata metadata = builder.build(); ByteArrayCoder.of().encode(metadata.toByteArray(), outStream); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index ebe26be91c95..31ec7a724068 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -77,7 +77,10 @@ public static Builder builder(WindowedValue template) { .setValue(template.getValue()) .setTimestamp(template.getTimestamp()) .setWindows(template.getWindows()) - .setPaneInfo(template.getPaneInfo()); + .setPaneInfo(template.getPaneInfo()) + .setRecordOffset(template.getRecordOffset()) + .setRecordId(template.getRecordId()) + .setCausedByDrain(template.causedByDrain()); } public static class Builder implements OutputBuilder { @@ -271,7 +274,14 @@ public static WindowedValue of( checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); + return of( + value, + timestamp, + windows.iterator().next(), + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain); } else { return new TimestampedValueInMultipleWindows<>( value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); @@ -287,7 +297,7 @@ static WindowedValue createWithoutValidation( PaneInfo paneInfo, CausedByDrain causedByDrain) { if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); + return of(value, timestamp, windows.iterator().next(), paneInfo, null, null, causedByDrain); } else { return new TimestampedValueInMultipleWindows<>( value, timestamp, windows, paneInfo, null, null, causedByDrain); @@ -299,7 +309,7 @@ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); - return of(value, timestamp, window, paneInfo, CausedByDrain.NORMAL); + return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL); } /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ @@ -308,18 +318,21 @@ public static WindowedValue of( Instant timestamp, BoundedWindow window, PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset, CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { - return new ValueInGlobalWindow<>(value, paneInfo, null, null, causedByDrain); + return new ValueInGlobalWindow<>( + value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } else if (isGlobal) { return new TimestampedValueInGlobalWindow<>( - value, timestamp, paneInfo, null, null, causedByDrain); + value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } else { return new TimestampedValueInSingleWindow<>( - value, timestamp, window, paneInfo, null, null, causedByDrain); + value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java new file mode 100644 index 000000000000..d2f0e247eb5b --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.CausedByDrain; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowedValues; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +public class MetadataPropagationTest { + private static final Integer[] EMPTY = new Integer[] {}; + private static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5}; + private static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5}; + + @RunWith(JUnit4.class) + public static class MiscTest { + + /** Tests for metadata propagation. */ + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + static class CausedByDrainSettingDoFn extends DoFn { + @ProcessElement + public void process(OutputReceiver r) { + r.builder("value").setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output(); + } + } + + static class CausedByDrainExtractingDoFn extends DoFn { + @ProcessElement + public void process(ProcessContext pc, OutputReceiver r) { + r.output(pc.causedByDrain().toString()); + } + } + + @Test + @Category(NeedsRunner.class) + public void testMetadataPropagationAcrossShuffleParameter() { + WindowedValues.WindowedValueCoder.setMetadataSupported(); + PCollection results = + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new CausedByDrainSettingDoFn())) + .apply(Redistribute.arbitrarily()) + .apply(ParDo.of(new CausedByDrainExtractingDoFn())); + + PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMetadataPropagationParameter() { + PCollection results = + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new CausedByDrainSettingDoFn())) + .apply(ParDo.of(new CausedByDrainExtractingDoFn())); + + PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); + + pipeline.run(); + } + } +} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index de69f49ecc3c..1a49c31a42d8 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1787,6 +1787,16 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp WindowedValues.of(output, timestamp, currentWindow, currentElement.getPaneInfo())); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @Override public void outputWindowedValue( TupleTag tag, @@ -1937,6 +1947,16 @@ public void outputWindowedValue( outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @Override public CausedByDrain causedByDrain() { return currentElement.causedByDrain(); @@ -2354,6 +2374,16 @@ public void outputWindowedValue( outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @SuppressWarnings( "deprecation") // Allowed Skew is deprecated for users, but must be respected private void checkOnWindowExpirationTimestamp(Instant timestamp) { @@ -2644,6 +2674,16 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp WindowedValues.of(output, timestamp, currentWindow, currentTimer.getPaneInfo())); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @Override public void outputWindowedValue( TupleTag tag, From 36d2b4832f37ac839e570e1ba7a93d0802023380 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 18 Mar 2026 10:31:02 +0100 Subject: [PATCH 2/4] fix npe in ReduceRunner.. --- .../beam/runners/core/ReduceFnRunner.java | 2 +- .../beam/runners/core/SimpleDoFnRunner.java | 6 ++ .../beam/sdk/testing/TestOutputReceiver.java | 2 + .../sdk/transforms/DoFnOutputReceivers.java | 12 ++-- .../SplittableParDoNaiveBounded.java | 8 +-- .../beam/sdk/values/WindowedValues.java | 3 +- .../transforms/reflect/DoFnInvokersTest.java | 3 + .../beam/fn/harness/FnApiDoFnRunner.java | 60 +++++-------------- 8 files changed, 34 insertions(+), 62 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 85cf9cefde15..0721ddc4685e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -376,7 +376,7 @@ public void processElements(Iterable> values) throws Excep emit( contextFactory.base(window, StateStyle.DIRECT), contextFactory.base(window, StateStyle.RENAMED), - null); + CausedByDrain.NORMAL); } // We're all done with merging and emitting elements so can compress the activeWindow state. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index cbd58f51e45f..4081746bde21 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -445,11 +445,13 @@ public void output(TupleTag tag, T output) { @Override public void outputWindowedValue(WindowedValue windowedValue) { + checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp()); SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); } @Override public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp()); SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); } @@ -1039,11 +1041,13 @@ public void outputWindowedValue( @Override public void outputWindowedValue(WindowedValue windowedValue) { + checkTimestamp(timestamp(), windowedValue.getTimestamp()); SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); } @Override public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(timestamp(), windowedValue.getTimestamp()); SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); } @@ -1308,11 +1312,13 @@ public void outputWindowedValue( @Override public void outputWindowedValue(WindowedValue windowedValue) { + checkTimestamp(this.timestamp, windowedValue.getTimestamp()); SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); } @Override public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(this.timestamp, windowedValue.getTimestamp()); SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java index 83d2af7b66bb..2f53bf2e9bc4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.WindowedValues; import org.joda.time.Instant; @@ -54,6 +55,7 @@ public OutputBuilder builder(T value) { .setWindow(fakeWindow) .setPaneInfo(PaneInfo.NO_FIRING) .setTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE) + .setCausedByDrain(CausedByDrain.NORMAL) .setReceiver(windowedValue -> records.add(windowedValue.getValue())); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java index 6873c1792de5..37850a9fcf84 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java @@ -70,10 +70,8 @@ public OutputBuilder builder(Row value) { rowWithMetadata -> { ((DoFn.WindowedContext) context) .outputWindowedValue( - schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()), - rowWithMetadata.getTimestamp(), - rowWithMetadata.getWindows(), - rowWithMetadata.getPaneInfo()); + rowWithMetadata.withValue( + schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()))); }); } else { @@ -84,10 +82,8 @@ public OutputBuilder builder(Row value) { rowWithMetadata -> { context.outputWindowedValue( tag, - schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()), - rowWithMetadata.getTimestamp(), - rowWithMetadata.getWindows(), - rowWithMetadata.getPaneInfo()); + rowWithMetadata.withValue( + schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()))); }); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index 201040d25f16..5adf3d32a1a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -577,13 +577,7 @@ public OutputBuilder builder(T value) { return outputBuilderSupplier .builder(value) .setReceiver( - windowedValue -> - outerContext.outputWindowedValue( - tag, - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue -> outerContext.outputWindowedValue(tag, windowedValue)); } }; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index 31ec7a724068..9ccbd0ecf070 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -272,7 +272,7 @@ public static WindowedValue of( CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); - + checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null"); if (windows.size() == 1) { return of( value, @@ -322,6 +322,7 @@ public static WindowedValue of( @Nullable Long currentRecordOffset, CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); + checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 186d58e33189..e9269deb18ea 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -77,6 +77,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; @@ -561,6 +562,7 @@ public OutputBuilder builder(SomeRestriction value) { .setTimestamp(mockTimestamp) .setWindow(mockWindow) .setPaneInfo(PaneInfo.NO_FIRING) + .setCausedByDrain(CausedByDrain.NORMAL) .setReceiver(windowedValue -> outputs.add(windowedValue.getValue())); } }; @@ -801,6 +803,7 @@ public OutputBuilder builder(String value) { .setTimestamp(mockTimestamp) .setWindow(mockWindow) .setPaneInfo(PaneInfo.NO_FIRING) + .setCausedByDrain(CausedByDrain.NORMAL) .setReceiver( windowedValue -> { assertFalse(invoked); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 1a49c31a42d8..8cb932030bcc 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -2098,10 +2098,8 @@ public OutputBuilder builder(Row value) { .setReceiver( windowedRow -> ProcessBundleContextBase.this.outputWindowedValue( - fromRowFunction.apply(windowedRow.getValue()), - windowedRow.getTimestamp(), - windowedRow.getWindows(), - windowedRow.getPaneInfo())); + windowedRow.withValue( + fromRowFunction.apply(windowedRow.getValue())))); } }; @@ -2134,12 +2132,7 @@ public OutputBuilder builder(T value) { .withValue(value) .setReceiver( windowedValue -> - ProcessBundleContextBase.this.outputWindowedValue( - tag, - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + ProcessBundleContextBase.this.outputWindowedValue(tag, windowedValue)); } }; } @@ -2174,10 +2167,8 @@ public OutputBuilder builder(Row value) { windowedRow -> ProcessBundleContextBase.this.outputWindowedValue( tag, - fromRowFunction.apply(windowedRow.getValue()), - windowedRow.getTimestamp(), - windowedRow.getWindows(), - windowedRow.getPaneInfo())); + windowedRow.withValue( + fromRowFunction.apply(windowedRow.getValue())))); } }; } @@ -2456,10 +2447,8 @@ public OutputBuilder builder(Row value) { .setReceiver( windowedValue -> context.outputWindowedValue( - fromRowFunction.apply(windowedValue.getValue()), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue.withValue( + fromRowFunction.apply(windowedValue.getValue())))); } }; @@ -2490,14 +2479,7 @@ public OutputBuilder builder(T value) { .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) .setCausedByDrain(causedByDrain) - .setReceiver( - windowedValue -> - context.outputWindowedValue( - tag, - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + .setReceiver(windowedValue -> context.outputWindowedValue(tag, windowedValue)); } }; } @@ -2532,10 +2514,8 @@ public OutputBuilder builder(Row value) { windowedValue -> context.outputWindowedValue( tag, - fromRowFunction.apply(windowedValue.getValue()), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue.withValue( + fromRowFunction.apply(windowedValue.getValue())))); } }; } @@ -2775,10 +2755,8 @@ public OutputBuilder builder(Row value) { .setReceiver( windowedValue -> context.outputWindowedValue( - fromRowFunction.apply(windowedValue.getValue()), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue.withValue( + fromRowFunction.apply(windowedValue.getValue())))); } }; @@ -2810,13 +2788,7 @@ public OutputBuilder builder(T value) { .setWindow(currentWindow) .setCausedByDrain(causedByDrain) .setPaneInfo(currentTimer.getPaneInfo()) - .setReceiver( - windowedValue -> - context.outputWindowedValue( - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + .setReceiver(windowedValue -> context.outputWindowedValue(windowedValue)); } }; } @@ -2851,10 +2823,8 @@ public OutputBuilder builder(Row value) { .setReceiver( windowedValue -> context.outputWindowedValue( - fromRowFunction.apply(windowedValue.getValue()), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue.withValue( + fromRowFunction.apply(windowedValue.getValue())))); } }; } From 80c7a49e74397e7a27ecad1f9e22bfd70dc722f4 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 18 Mar 2026 18:59:35 +0100 Subject: [PATCH 3/4] find npe --- .../beam/sdk/util/construction/Timer.java | 2 +- .../beam/sdk/values/WindowedValues.java | 2 ++ .../beam/fn/harness/FnApiDoFnRunner.java | 22 ++++++++++++++----- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java index 6e5bb3303c39..bd6e0d22a719 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java @@ -126,7 +126,7 @@ public static Timer cleared( */ public abstract @Nullable PaneInfo getPaneInfo(); - public abstract @Nullable CausedByDrain causedByDrain(); + public abstract CausedByDrain causedByDrain(); @Override public final boolean equals(@Nullable Object other) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index 9ccbd0ecf070..79f4b4a62ac2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -148,6 +148,7 @@ public Builder setRecordOffset(@Nullable Long recordOffset) { @Override public Builder setCausedByDrain(CausedByDrain causedByDrain) { + checkStateNotNull(causedByDrain, "CausedByDrain is null"); this.causedByDrain = causedByDrain; return this; } @@ -202,6 +203,7 @@ public PaneInfo getPaneInfo() { @Override public CausedByDrain causedByDrain() { + checkStateNotNull(causedByDrain, "CausedByDrain not set"); return causedByDrain; } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 8cb932030bcc..2eb5816a864f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -624,15 +624,18 @@ private void startBundle() { private void processElementForParDo(WindowedValue elem) { currentElement = elem; + causedByDrain = currentElement.causedByDrain(); try { doFnInvoker.invokeProcessElement(processContext); } finally { currentElement = null; + causedByDrain = null; } } private void processElementForWindowObservingParDo(WindowedValue elem) { currentElement = elem; + causedByDrain = currentElement.causedByDrain(); try { Iterator windowIterator = (Iterator) elem.getWindows().iterator(); @@ -643,12 +646,14 @@ private void processElementForWindowObservingParDo(WindowedValue elem) { } finally { currentElement = null; currentWindow = null; + causedByDrain = null; } } private void processElementForWindowObservingSizedElementAndRestriction( WindowedValue>, Double>> elem) { currentElement = elem.withValue(elem.getValue().getKey().getKey()); + causedByDrain = elem.causedByDrain(); windowCurrentIndex = -1; windowStopIndex = currentElement.getWindows().size(); currentWindows = ImmutableList.copyOf(currentElement.getWindows()); @@ -660,6 +665,7 @@ private void processElementForWindowObservingSizedElementAndRestriction( windowCurrentIndex = -1; windowStopIndex = 0; currentElement = null; + causedByDrain = null; currentWindows = null; currentRestriction = null; currentWatermarkEstimatorState = null; @@ -1202,7 +1208,8 @@ private void processTimer( checkNotNull(timerBundleTracker); try { currentKey = timer.getUserKey(); - causedByDrain = timer.causedByDrain(); + causedByDrain = Preconditions.checkNotNull(timer.causedByDrain()); + // add drain Iterator windowIterator = (Iterator) timer.getWindows().iterator(); @@ -1286,6 +1293,7 @@ private void processOnWindowExpiration(Timer timer) { try { currentKey = timer.getUserKey(); currentTimer = timer; + causedByDrain = timer.causedByDrain(); Iterator windowIterator = (Iterator) timer.getWindows().iterator(); while (windowIterator.hasNext()) { @@ -1296,6 +1304,7 @@ private void processOnWindowExpiration(Timer timer) { currentKey = null; currentTimer = null; currentWindow = null; + causedByDrain = null; } } @@ -2315,7 +2324,7 @@ public OutputBuilder builder(OutputT value) { .setWindow(currentWindow) .setTimestamp(currentTimer.getHoldTimestamp()) .setPaneInfo(currentTimer.getPaneInfo()) - .setCausedByDrain(causedByDrain) + .setCausedByDrain(currentTimer.causedByDrain()) .setReceiver( windowedValue -> { checkOnWindowExpirationTimestamp(windowedValue.getTimestamp()); @@ -2336,7 +2345,10 @@ public void output(TupleTag tag, T output) { output, currentTimer.getHoldTimestamp(), currentWindow, - currentTimer.getPaneInfo())); + currentTimer.getPaneInfo(), + null, + null, + currentTimer.causedByDrain())); } @Override @@ -2786,7 +2798,7 @@ public OutputBuilder builder(T value) { .setValue(value) .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) - .setCausedByDrain(causedByDrain) + .setCausedByDrain(currentTimer.causedByDrain()) .setPaneInfo(currentTimer.getPaneInfo()) .setReceiver(windowedValue -> context.outputWindowedValue(windowedValue)); } @@ -2819,7 +2831,7 @@ public OutputBuilder builder(Row value) { .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) .setPaneInfo(currentTimer.getPaneInfo()) - .setCausedByDrain(causedByDrain) + .setCausedByDrain(currentTimer.causedByDrain()) .setReceiver( windowedValue -> context.outputWindowedValue( From 7a5c982231835ced473fbb3ab4152fcfe0e3f6e3 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 20 Mar 2026 16:49:46 +0100 Subject: [PATCH 4/4] review feedback - remove duplicated code and unused variables --- ...dTimeBoundedSplittableProcessElementInvoker.java | 7 +------ .../apache/beam/runners/core/SimpleDoFnRunner.java | 9 +++------ .../org/apache/beam/sdk/transforms/DoFnTester.java | 13 +------------ .../sdk/transforms/MetadataPropagationTest.java | 3 --- 4 files changed, 5 insertions(+), 27 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 445bd723f9ed..07e4756885dd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -479,12 +479,7 @@ public void outputWindowedValue( @Override public void outputWindowedValue(WindowedValue windowedValue) { - noteOutput(); - if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { - ((TimestampObservingWatermarkEstimator) watermarkEstimator) - .observeTimestamp(windowedValue.getTimestamp()); - } - outputReceiver.output(mainOutputTag, windowedValue); + outputWindowedValue(mainOutputTag, windowedValue); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 4081746bde21..46c14bf6dee6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -445,8 +445,7 @@ public void output(TupleTag tag, T output) { @Override public void outputWindowedValue(WindowedValue windowedValue) { - checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp()); - SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); + outputWindowedValue(mainOutputTag, windowedValue); } @Override @@ -1041,8 +1040,7 @@ public void outputWindowedValue( @Override public void outputWindowedValue(WindowedValue windowedValue) { - checkTimestamp(timestamp(), windowedValue.getTimestamp()); - SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); + outputWindowedValue(mainOutputTag, windowedValue); } @Override @@ -1312,8 +1310,7 @@ public void outputWindowedValue( @Override public void outputWindowedValue(WindowedValue windowedValue) { - checkTimestamp(this.timestamp, windowedValue.getTimestamp()); - SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); + outputWindowedValue(mainOutputTag, windowedValue); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 5eccffe94234..4de2c3d2c9c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -651,18 +651,7 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp @Override public void outputWindowedValue(WindowedValue windowedValue) { - for (BoundedWindow w : windowedValue.getWindows()) { - getMutableOutput(mainOutputTag) - .add( - ValueInSingleWindow.of( - windowedValue.getValue(), - windowedValue.getTimestamp(), - w, - windowedValue.getPaneInfo(), - windowedValue.getRecordId(), - windowedValue.getRecordOffset(), - windowedValue.causedByDrain())); - } + outputWindowedValue(mainOutputTag, windowedValue); } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java index d2f0e247eb5b..5cfc3d3a1eb6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java @@ -30,9 +30,6 @@ import org.junit.runners.JUnit4; public class MetadataPropagationTest { - private static final Integer[] EMPTY = new Integer[] {}; - private static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5}; - private static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5}; @RunWith(JUnit4.class) public static class MiscTest {