diff --git a/.changeset/nine-eagles-fetch.md b/.changeset/nine-eagles-fetch.md new file mode 100644 index 0000000000..e4102c04fb --- /dev/null +++ b/.changeset/nine-eagles-fetch.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Fix Materializer crash that was happening when processing changes with PK updates. diff --git a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex index c668f17745..df6baf2f5e 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex @@ -377,11 +377,15 @@ defmodule Electric.Shapes.Consumer.Materializer do %Changes.UpdatedRecord{ key: key, + old_key: old_key, record: record, move_tags: move_tags, removed_move_tags: removed_move_tags }, {{index, tag_indices}, counts_and_events} -> + # When the primary key doesn't change, old_key may be nil; default to key + old_key = old_key || key + # TODO: this is written as if it supports multiple selected columns, but it doesn't for now columns_present = Enum.any?(state.columns, &is_map_key(record, &1)) has_tag_updates = removed_move_tags != [] @@ -389,12 +393,12 @@ defmodule Electric.Shapes.Consumer.Materializer do if columns_present or has_tag_updates do tag_indices = tag_indices - |> remove_row_from_tag_indices(key, removed_move_tags) + |> remove_row_from_tag_indices(old_key, removed_move_tags) |> add_row_to_tag_indices(key, move_tags) if columns_present do {value, original_string} = cast!(record, state) - old_value = Map.fetch!(index, key) + {old_value, index} = Map.pop!(index, old_key) index = Map.put(index, key, value) # Skip decrement/increment dance if value hasn't changed to avoid diff --git a/packages/sync-service/test/electric/shapes/consumer/materializer_test.exs b/packages/sync-service/test/electric/shapes/consumer/materializer_test.exs index 5f4df78afc..213f57d90a 100644 --- a/packages/sync-service/test/electric/shapes/consumer/materializer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer/materializer_test.exs @@ -350,6 +350,60 @@ defmodule Electric.Shapes.Consumer.MaterializerTest do end) end + @tag snapshot_data: { + [%Changes.NewRecord{record: %{"id" => "1", "value" => "10"}}], + [pk_cols: ["id"]] + } + test "update that changes the primary key is handled correctly", ctx do + ctx = with_materializer(ctx) + + assert Materializer.get_link_values(ctx) == MapSet.new([10]) + + # Update where the PK changes from "1" to "2" + Materializer.new_changes( + ctx, + [ + %Changes.UpdatedRecord{ + record: %{"id" => "2", "value" => "20"}, + old_record: %{"id" => "1", "value" => "10"} + } + ] + |> prep_changes() + ) + + assert Materializer.get_link_values(ctx) == MapSet.new([20]) + + assert_receive {:materializer_changes, _, %{move_out: [{10, "10"}], move_in: [{20, "20"}]}} + end + + @tag snapshot_data: { + [%Changes.NewRecord{record: %{"id" => "1", "value" => "10"}}], + [pk_cols: ["id"]] + } + test "update that changes the primary key but keeps the same value", ctx do + ctx = with_materializer(ctx) + + assert Materializer.get_link_values(ctx) == MapSet.new([10]) + + # Update where the PK changes but tracked value stays the same + Materializer.new_changes( + ctx, + [ + %Changes.UpdatedRecord{ + record: %{"id" => "2", "value" => "10"}, + old_record: %{"id" => "1", "value" => "10"} + } + ] + |> prep_changes() + ) + + # Value should still be present + assert Materializer.get_link_values(ctx) == MapSet.new([10]) + + # No events since the tracked value didn't change + refute_received {:materializer_changes, _, _} + end + test "events are accumulated across uncommitted fragments", ctx do ctx = with_materializer(ctx)