Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions sdks/python/apache_beam/transforms/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,26 +584,68 @@ def has_expected_values(actual):

assert_that(result, has_expected_values)

def test_combining_with_sliding_windows_and_fanout_raises_error(self):
def test_combining_with_sliding_windows_and_fanout(self):
# SlidingWindows + fanout now works (fixed in 2.73.0).
# Previously this raised ValueError due to WindowInto re-evaluating
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 589-582 - let's move this to PR description

# window assignments. The fix sets windowing strategy metadata directly
# on the PCollection without re-assigning windows, mirroring Java's
# setWindowingStrategyInternal().
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with self.assertRaises(ValueError):
with TestPipeline(options=options) as p:
_ = (
p
| beam.Create([
window.TimestampedValue(0, Timestamp(seconds=1666707510)),
window.TimestampedValue(1, Timestamp(seconds=1666707511)),
window.TimestampedValue(2, Timestamp(seconds=1666707512)),
window.TimestampedValue(3, Timestamp(seconds=1666707513)),
window.TimestampedValue(5, Timestamp(seconds=1666707515)),
window.TimestampedValue(6, Timestamp(seconds=1666707516)),
window.TimestampedValue(7, Timestamp(seconds=1666707517)),
window.TimestampedValue(8, Timestamp(seconds=1666707518))
])
| beam.WindowInto(window.SlidingWindows(10, 5))
| beam.CombineGlobally(beam.combiners.ToListCombineFn()).
without_defaults().with_fanout(7))
with TestPipeline(options=options) as p:

def has_expected_values(actual):
from hamcrest.core import assert_that as hamcrest_assert
from hamcrest.library.collection import only_contains
ordered = sorted(actual)
hamcrest_assert(
ordered,
only_contains([0, 1, 2, 3], [0, 1, 2, 3, 5, 6, 7, 8], [5, 6, 7, 8]))

result = (
p
| beam.Create([
window.TimestampedValue(0, Timestamp(seconds=1666707510)),
window.TimestampedValue(1, Timestamp(seconds=1666707511)),
window.TimestampedValue(2, Timestamp(seconds=1666707512)),
window.TimestampedValue(3, Timestamp(seconds=1666707513)),
window.TimestampedValue(5, Timestamp(seconds=1666707515)),
window.TimestampedValue(6, Timestamp(seconds=1666707516)),
window.TimestampedValue(7, Timestamp(seconds=1666707517)),
window.TimestampedValue(8, Timestamp(seconds=1666707518))
])
| beam.WindowInto(window.SlidingWindows(10, 5))
| beam.CombineGlobally(beam.combiners.ToListCombineFn()).
without_defaults().with_fanout(7))
assert_that(result, has_expected_values)

def test_combining_with_session_windows_and_fanout(self):
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:

def has_expected_values(actual):
from hamcrest.core import assert_that as hamcrest_assert
from hamcrest.library.collection import only_contains
ordered = sorted(actual)
hamcrest_assert(ordered, only_contains([0, 1, 2, 3], [5, 6, 7, 8]))

result = (
p
| beam.Create([
window.TimestampedValue(0, Timestamp(seconds=1666707510)),
window.TimestampedValue(1, Timestamp(seconds=1666707511)),
window.TimestampedValue(2, Timestamp(seconds=1666707512)),
window.TimestampedValue(3, Timestamp(seconds=1666707513)),
window.TimestampedValue(5, Timestamp(seconds=1666707515)),
window.TimestampedValue(6, Timestamp(seconds=1666707516)),
window.TimestampedValue(7, Timestamp(seconds=1666707517)),
window.TimestampedValue(8, Timestamp(seconds=1666707518))
])
| beam.WindowInto(window.Sessions(2))
| beam.CombineGlobally(beam.combiners.ToListCombineFn()).
without_defaults().with_fanout(7))
assert_that(result, has_expected_values)

def test_MeanCombineFn_combine(self):
with TestPipeline() as p:
Expand Down
38 changes: 26 additions & 12 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3271,10 +3271,7 @@ def expand(self, pcoll):
combine_fn = self._combine_fn
fanout_fn = self._fanout_fn

if isinstance(pcoll.windowing.windowfn, SlidingWindows):
raise ValueError(
'CombinePerKey.with_hot_key_fanout does not yet work properly with '
'SlidingWindows. See: https://github.com/apache/beam/issues/20528')
use_direct_windowing = isinstance(pcoll.windowing.windowfn, SlidingWindows)

class SplitHotCold(DoFn):
def start_bundle(self):
Expand Down Expand Up @@ -3344,14 +3341,31 @@ def StripNonce(nonce_key_value):

cold, hot = pcoll | ParDo(SplitHotCold()).with_outputs('hot', main='cold')
cold.element_type = typehints.Any # No multi-output type hints.
precombined_hot = (
hot
# Avoid double counting that may happen with stacked accumulating mode.
| 'WindowIntoDiscarding' >> WindowInto(
pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING)
| CombinePerKey(PreCombineFn())
| Map(StripNonce)
| 'WindowIntoOriginal' >> WindowInto(pcoll.windowing))

if use_direct_windowing:
# For SlidingWindows, swap windowing strategy metadata directly on the
# PCollection without re-assigning windows. This mirrors Java's
# setWindowingStrategyInternal(). Using WindowInto would call
# windowfn.assign() which re-evaluates window assignments from
# timestamps — with SlidingWindows, this causes accumulators to leak
# into adjacent overlapping windows.
if pcoll.windowing.accumulation_mode == AccumulationMode.ACCUMULATING:
discarding_windowing = copy.copy(pcoll.windowing)
discarding_windowing.accumulation_mode = AccumulationMode.DISCARDING
hot._windowing = discarding_windowing
precombined_hot = (hot | CombinePerKey(PreCombineFn()) | Map(StripNonce))
precombined_hot._windowing = pcoll.windowing
else:
precombined_hot = (
hot
# Avoid double counting that may happen with stacked accumulating
# mode.
| 'WindowIntoDiscarding' >> WindowInto(
pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING)
| CombinePerKey(PreCombineFn())
| Map(StripNonce)
| 'WindowIntoOriginal' >> WindowInto(pcoll.windowing))

return ((cold, precombined_hot)
| Flatten()
| CombinePerKey(PostCombineFn()))
Expand Down
Loading