From 9f9686de47820ab1636303b274343a85ca98acac Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 20 Mar 2026 14:16:12 -0400 Subject: [PATCH 1/2] init --- .../apache_beam/transforms/combiners_test.py | 67 ++++++++++++++++++- sdks/python/apache_beam/transforms/core.py | 45 ++++++++++--- 2 files changed, 102 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index ba9e21f85567..37748aec2889 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -584,9 +584,74 @@ def has_expected_values(actual): assert_that(result, has_expected_values) - def test_combining_with_sliding_windows_and_fanout_raises_error(self): + def test_combining_with_sliding_windows_and_fanout(self): + # SlidingWindows + fanout now works (fixed in 2.73.0). + # Previously this raised ValueError due to WindowInto re-evaluating + # window assignments. The fix sets windowing strategy metadata directly + # on the PCollection without re-assigning windows, mirroring Java's + # setWindowingStrategyInternal(). options = PipelineOptions() options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + + def has_expected_values(actual): + from hamcrest.core import assert_that as hamcrest_assert + from hamcrest.library.collection import only_contains + ordered = sorted(actual) + hamcrest_assert( + ordered, + only_contains([0, 1, 2, 3], [0, 1, 2, 3, 5, 6, 7, 8], [5, 6, 7, 8])) + + result = ( + p + | beam.Create([ + window.TimestampedValue(0, Timestamp(seconds=1666707510)), + window.TimestampedValue(1, Timestamp(seconds=1666707511)), + window.TimestampedValue(2, Timestamp(seconds=1666707512)), + window.TimestampedValue(3, Timestamp(seconds=1666707513)), + window.TimestampedValue(5, Timestamp(seconds=1666707515)), + window.TimestampedValue(6, Timestamp(seconds=1666707516)), + window.TimestampedValue(7, Timestamp(seconds=1666707517)), + window.TimestampedValue(8, Timestamp(seconds=1666707518)) + ]) + | beam.WindowInto(window.SlidingWindows(10, 5)) + | beam.CombineGlobally(beam.combiners.ToListCombineFn()). + without_defaults().with_fanout(7)) + assert_that(result, has_expected_values) + + def test_combining_with_session_windows_and_fanout(self): + options = PipelineOptions() + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + + def has_expected_values(actual): + from hamcrest.core import assert_that as hamcrest_assert + from hamcrest.library.collection import only_contains + ordered = sorted(actual) + hamcrest_assert(ordered, only_contains([0, 1, 2, 3], [5, 6, 7, 8])) + + result = ( + p + | beam.Create([ + window.TimestampedValue(0, Timestamp(seconds=1666707510)), + window.TimestampedValue(1, Timestamp(seconds=1666707511)), + window.TimestampedValue(2, Timestamp(seconds=1666707512)), + window.TimestampedValue(3, Timestamp(seconds=1666707513)), + window.TimestampedValue(5, Timestamp(seconds=1666707515)), + window.TimestampedValue(6, Timestamp(seconds=1666707516)), + window.TimestampedValue(7, Timestamp(seconds=1666707517)), + window.TimestampedValue(8, Timestamp(seconds=1666707518)) + ]) + | beam.WindowInto(window.Sessions(2)) + | beam.CombineGlobally(beam.combiners.ToListCombineFn()). + without_defaults().with_fanout(7)) + assert_that(result, has_expected_values) + + def test_combining_with_sliding_windows_and_fanout_raises_error_legacy(self): + # Legacy behavior: SlidingWindows + fanout raises ValueError when + # update_compatibility_version is prior to 2.73.0. + options = PipelineOptions(update_compatibility_version='2.72.0') + options.view_as(StandardOptions).streaming = True with self.assertRaises(ValueError): with TestPipeline(options=options) as p: _ = ( diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 6d2552a2a6a1..6b76cd413f0e 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3271,7 +3271,11 @@ def expand(self, pcoll): combine_fn = self._combine_fn fanout_fn = self._fanout_fn - if isinstance(pcoll.windowing.windowfn, SlidingWindows): + use_legacy_windowing = pcoll.pipeline.options.is_compat_version_prior_to( + "2.73.0") + + if use_legacy_windowing and isinstance(pcoll.windowing.windowfn, + SlidingWindows): raise ValueError( 'CombinePerKey.with_hot_key_fanout does not yet work properly with ' 'SlidingWindows. See: https://github.com/apache/beam/issues/20528') @@ -3344,14 +3348,37 @@ def StripNonce(nonce_key_value): cold, hot = pcoll | ParDo(SplitHotCold()).with_outputs('hot', main='cold') cold.element_type = typehints.Any # No multi-output type hints. - precombined_hot = ( - hot - # Avoid double counting that may happen with stacked accumulating mode. - | 'WindowIntoDiscarding' >> WindowInto( - pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING) - | CombinePerKey(PreCombineFn()) - | Map(StripNonce) - | 'WindowIntoOriginal' >> WindowInto(pcoll.windowing)) + + if use_legacy_windowing: + # Legacy behavior: use WindowInto to swap accumulation mode. + # This re-evaluates window assignments from timestamps, which is + # idempotent for FixedWindows but corrupts SlidingWindows (hence + # the guard above). Preserved for update compatibility with + # pipelines created before 2.73.0. + precombined_hot = ( + hot + | 'WindowIntoDiscarding' >> WindowInto( + pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING) + | CombinePerKey(PreCombineFn()) + | Map(StripNonce) + | 'WindowIntoOriginal' >> WindowInto(pcoll.windowing)) + else: + # New behavior: swap windowing strategy metadata without re-assigning + # windows. This mirrors Java's setWindowingStrategyInternal() and + # works correctly with all window types including SlidingWindows. + # Setting _windowing directly on a PCollection changes what downstream + # transforms see as the windowing strategy, without calling + # windowfn.assign() on elements — elements keep their existing window + # assignments. Using WindowInto would re-evaluate assignments, which + # corrupts SlidingWindows by leaking accumulators into adjacent + # overlapping windows. + if pcoll.windowing.accumulation_mode == AccumulationMode.ACCUMULATING: + discarding_windowing = copy.copy(pcoll.windowing) + discarding_windowing.accumulation_mode = AccumulationMode.DISCARDING + hot._windowing = discarding_windowing + precombined_hot = (hot | CombinePerKey(PreCombineFn()) | Map(StripNonce)) + precombined_hot._windowing = pcoll.windowing + return ((cold, precombined_hot) | Flatten() | CombinePerKey(PostCombineFn())) From 0b2e98c9f1f4ed5a20f0a53f1dc68e808ef146c9 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 20 Mar 2026 15:56:26 -0400 Subject: [PATCH 2/2] Dont assign windows in sliding windows case --- .../apache_beam/transforms/combiners_test.py | 23 --------- sdks/python/apache_beam/transforms/core.py | 47 +++++++------------ 2 files changed, 17 insertions(+), 53 deletions(-) diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 37748aec2889..e9256e5f9f7a 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -647,29 +647,6 @@ def has_expected_values(actual): without_defaults().with_fanout(7)) assert_that(result, has_expected_values) - def test_combining_with_sliding_windows_and_fanout_raises_error_legacy(self): - # Legacy behavior: SlidingWindows + fanout raises ValueError when - # update_compatibility_version is prior to 2.73.0. - options = PipelineOptions(update_compatibility_version='2.72.0') - options.view_as(StandardOptions).streaming = True - with self.assertRaises(ValueError): - with TestPipeline(options=options) as p: - _ = ( - p - | beam.Create([ - window.TimestampedValue(0, Timestamp(seconds=1666707510)), - window.TimestampedValue(1, Timestamp(seconds=1666707511)), - window.TimestampedValue(2, Timestamp(seconds=1666707512)), - window.TimestampedValue(3, Timestamp(seconds=1666707513)), - window.TimestampedValue(5, Timestamp(seconds=1666707515)), - window.TimestampedValue(6, Timestamp(seconds=1666707516)), - window.TimestampedValue(7, Timestamp(seconds=1666707517)), - window.TimestampedValue(8, Timestamp(seconds=1666707518)) - ]) - | beam.WindowInto(window.SlidingWindows(10, 5)) - | beam.CombineGlobally(beam.combiners.ToListCombineFn()). - without_defaults().with_fanout(7)) - def test_MeanCombineFn_combine(self): with TestPipeline() as p: input = ( diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 6b76cd413f0e..0aada0191750 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3271,14 +3271,7 @@ def expand(self, pcoll): combine_fn = self._combine_fn fanout_fn = self._fanout_fn - use_legacy_windowing = pcoll.pipeline.options.is_compat_version_prior_to( - "2.73.0") - - if use_legacy_windowing and isinstance(pcoll.windowing.windowfn, - SlidingWindows): - raise ValueError( - 'CombinePerKey.with_hot_key_fanout does not yet work properly with ' - 'SlidingWindows. See: https://github.com/apache/beam/issues/20528') + use_direct_windowing = isinstance(pcoll.windowing.windowfn, SlidingWindows) class SplitHotCold(DoFn): def start_bundle(self): @@ -3349,35 +3342,29 @@ def StripNonce(nonce_key_value): cold, hot = pcoll | ParDo(SplitHotCold()).with_outputs('hot', main='cold') cold.element_type = typehints.Any # No multi-output type hints. - if use_legacy_windowing: - # Legacy behavior: use WindowInto to swap accumulation mode. - # This re-evaluates window assignments from timestamps, which is - # idempotent for FixedWindows but corrupts SlidingWindows (hence - # the guard above). Preserved for update compatibility with - # pipelines created before 2.73.0. + if use_direct_windowing: + # For SlidingWindows, swap windowing strategy metadata directly on the + # PCollection without re-assigning windows. This mirrors Java's + # setWindowingStrategyInternal(). Using WindowInto would call + # windowfn.assign() which re-evaluates window assignments from + # timestamps — with SlidingWindows, this causes accumulators to leak + # into adjacent overlapping windows. + if pcoll.windowing.accumulation_mode == AccumulationMode.ACCUMULATING: + discarding_windowing = copy.copy(pcoll.windowing) + discarding_windowing.accumulation_mode = AccumulationMode.DISCARDING + hot._windowing = discarding_windowing + precombined_hot = (hot | CombinePerKey(PreCombineFn()) | Map(StripNonce)) + precombined_hot._windowing = pcoll.windowing + else: precombined_hot = ( hot + # Avoid double counting that may happen with stacked accumulating + # mode. | 'WindowIntoDiscarding' >> WindowInto( pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING) | CombinePerKey(PreCombineFn()) | Map(StripNonce) | 'WindowIntoOriginal' >> WindowInto(pcoll.windowing)) - else: - # New behavior: swap windowing strategy metadata without re-assigning - # windows. This mirrors Java's setWindowingStrategyInternal() and - # works correctly with all window types including SlidingWindows. - # Setting _windowing directly on a PCollection changes what downstream - # transforms see as the windowing strategy, without calling - # windowfn.assign() on elements — elements keep their existing window - # assignments. Using WindowInto would re-evaluate assignments, which - # corrupts SlidingWindows by leaking accumulators into adjacent - # overlapping windows. - if pcoll.windowing.accumulation_mode == AccumulationMode.ACCUMULATING: - discarding_windowing = copy.copy(pcoll.windowing) - discarding_windowing.accumulation_mode = AccumulationMode.DISCARDING - hot._windowing = discarding_windowing - precombined_hot = (hot | CombinePerKey(PreCombineFn()) | Map(StripNonce)) - precombined_hot._windowing = pcoll.windowing return ((cold, precombined_hot) | Flatten()