diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index ba9e21f85567..e9256e5f9f7a 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -584,26 +584,68 @@ def has_expected_values(actual): assert_that(result, has_expected_values) - def test_combining_with_sliding_windows_and_fanout_raises_error(self): + def test_combining_with_sliding_windows_and_fanout(self): + # SlidingWindows + fanout now works (fixed in 2.73.0). + # Previously this raised ValueError due to WindowInto re-evaluating + # window assignments. The fix sets windowing strategy metadata directly + # on the PCollection without re-assigning windows, mirroring Java's + # setWindowingStrategyInternal(). options = PipelineOptions() options.view_as(StandardOptions).streaming = True - with self.assertRaises(ValueError): - with TestPipeline(options=options) as p: - _ = ( - p - | beam.Create([ - window.TimestampedValue(0, Timestamp(seconds=1666707510)), - window.TimestampedValue(1, Timestamp(seconds=1666707511)), - window.TimestampedValue(2, Timestamp(seconds=1666707512)), - window.TimestampedValue(3, Timestamp(seconds=1666707513)), - window.TimestampedValue(5, Timestamp(seconds=1666707515)), - window.TimestampedValue(6, Timestamp(seconds=1666707516)), - window.TimestampedValue(7, Timestamp(seconds=1666707517)), - window.TimestampedValue(8, Timestamp(seconds=1666707518)) - ]) - | beam.WindowInto(window.SlidingWindows(10, 5)) - | beam.CombineGlobally(beam.combiners.ToListCombineFn()). - without_defaults().with_fanout(7)) + with TestPipeline(options=options) as p: + + def has_expected_values(actual): + from hamcrest.core import assert_that as hamcrest_assert + from hamcrest.library.collection import only_contains + ordered = sorted(actual) + hamcrest_assert( + ordered, + only_contains([0, 1, 2, 3], [0, 1, 2, 3, 5, 6, 7, 8], [5, 6, 7, 8])) + + result = ( + p + | beam.Create([ + window.TimestampedValue(0, Timestamp(seconds=1666707510)), + window.TimestampedValue(1, Timestamp(seconds=1666707511)), + window.TimestampedValue(2, Timestamp(seconds=1666707512)), + window.TimestampedValue(3, Timestamp(seconds=1666707513)), + window.TimestampedValue(5, Timestamp(seconds=1666707515)), + window.TimestampedValue(6, Timestamp(seconds=1666707516)), + window.TimestampedValue(7, Timestamp(seconds=1666707517)), + window.TimestampedValue(8, Timestamp(seconds=1666707518)) + ]) + | beam.WindowInto(window.SlidingWindows(10, 5)) + | beam.CombineGlobally(beam.combiners.ToListCombineFn()). + without_defaults().with_fanout(7)) + assert_that(result, has_expected_values) + + def test_combining_with_session_windows_and_fanout(self): + options = PipelineOptions() + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + + def has_expected_values(actual): + from hamcrest.core import assert_that as hamcrest_assert + from hamcrest.library.collection import only_contains + ordered = sorted(actual) + hamcrest_assert(ordered, only_contains([0, 1, 2, 3], [5, 6, 7, 8])) + + result = ( + p + | beam.Create([ + window.TimestampedValue(0, Timestamp(seconds=1666707510)), + window.TimestampedValue(1, Timestamp(seconds=1666707511)), + window.TimestampedValue(2, Timestamp(seconds=1666707512)), + window.TimestampedValue(3, Timestamp(seconds=1666707513)), + window.TimestampedValue(5, Timestamp(seconds=1666707515)), + window.TimestampedValue(6, Timestamp(seconds=1666707516)), + window.TimestampedValue(7, Timestamp(seconds=1666707517)), + window.TimestampedValue(8, Timestamp(seconds=1666707518)) + ]) + | beam.WindowInto(window.Sessions(2)) + | beam.CombineGlobally(beam.combiners.ToListCombineFn()). + without_defaults().with_fanout(7)) + assert_that(result, has_expected_values) def test_MeanCombineFn_combine(self): with TestPipeline() as p: diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 6d2552a2a6a1..0aada0191750 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3271,10 +3271,7 @@ def expand(self, pcoll): combine_fn = self._combine_fn fanout_fn = self._fanout_fn - if isinstance(pcoll.windowing.windowfn, SlidingWindows): - raise ValueError( - 'CombinePerKey.with_hot_key_fanout does not yet work properly with ' - 'SlidingWindows. See: https://github.com/apache/beam/issues/20528') + use_direct_windowing = isinstance(pcoll.windowing.windowfn, SlidingWindows) class SplitHotCold(DoFn): def start_bundle(self): @@ -3344,14 +3341,31 @@ def StripNonce(nonce_key_value): cold, hot = pcoll | ParDo(SplitHotCold()).with_outputs('hot', main='cold') cold.element_type = typehints.Any # No multi-output type hints. - precombined_hot = ( - hot - # Avoid double counting that may happen with stacked accumulating mode. - | 'WindowIntoDiscarding' >> WindowInto( - pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING) - | CombinePerKey(PreCombineFn()) - | Map(StripNonce) - | 'WindowIntoOriginal' >> WindowInto(pcoll.windowing)) + + if use_direct_windowing: + # For SlidingWindows, swap windowing strategy metadata directly on the + # PCollection without re-assigning windows. This mirrors Java's + # setWindowingStrategyInternal(). Using WindowInto would call + # windowfn.assign() which re-evaluates window assignments from + # timestamps — with SlidingWindows, this causes accumulators to leak + # into adjacent overlapping windows. + if pcoll.windowing.accumulation_mode == AccumulationMode.ACCUMULATING: + discarding_windowing = copy.copy(pcoll.windowing) + discarding_windowing.accumulation_mode = AccumulationMode.DISCARDING + hot._windowing = discarding_windowing + precombined_hot = (hot | CombinePerKey(PreCombineFn()) | Map(StripNonce)) + precombined_hot._windowing = pcoll.windowing + else: + precombined_hot = ( + hot + # Avoid double counting that may happen with stacked accumulating + # mode. + | 'WindowIntoDiscarding' >> WindowInto( + pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING) + | CombinePerKey(PreCombineFn()) + | Map(StripNonce) + | 'WindowIntoOriginal' >> WindowInto(pcoll.windowing)) + return ((cold, precombined_hot) | Flatten() | CombinePerKey(PostCombineFn()))