From 01e8240dce85c7f377e86ac38ddfce9467658046 Mon Sep 17 00:00:00 2001 From: Greg Felice Date: Thu, 26 Feb 2026 00:30:32 -0500 Subject: [PATCH 1/2] Fix chained MERGE not seeing sibling MERGE's changes (#1446) When multiple MERGEs are chained (e.g. MATCH ... MERGE ... MERGE ...), the non-terminal (first) MERGE returned rows one at a time to the parent plan node. The parent MERGE's lateral join would materialize its hash table on the first row, before the child MERGE had finished all its iterations. This caused the second MERGE to not see entities created by the first MERGE, leading to duplicate nodes. Fix by making non-terminal MERGE eager: it processes ALL input rows and buffers the projected results before returning any to the parent. This ensures all entity creations are committed before any parent plan node scans the tables. Co-Authored-By: Claude Opus 4.6 --- regress/expected/cypher_merge.out | 135 +++++++++++++++++++++ regress/sql/cypher_merge.sql | 57 +++++++++ src/backend/executor/cypher_merge.c | 177 ++++++++++++++++++++-------- src/include/executor/cypher_utils.h | 3 + 4 files changed, 321 insertions(+), 51 deletions(-) diff --git a/regress/expected/cypher_merge.out b/regress/expected/cypher_merge.out index 56a23f513..10c28573a 100644 --- a/regress/expected/cypher_merge.out +++ b/regress/expected/cypher_merge.out @@ -1717,6 +1717,116 @@ SELECT * FROM cypher('issue_1907', $$ MATCH ()-[r]->() RETURN r $$) AS (r agtype {"id": 1125899906842626, "label": "RELATED_TO", "end_id": 281474976710660, "start_id": 281474976710659, "properties": {"property1": "something", "property2": "else"}}::edge (1 row) +-- +-- Fix issue 1446: First MERGE does not see the second MERGE's changes +-- +-- When chained MERGEs appear (MATCH ... MERGE ... MERGE ...), the first +-- (non-terminal) MERGE returned rows one at a time, so the second MERGE's +-- lateral join was materialized before the first finished all iterations. +-- This caused duplicate nodes. The fix makes non-terminal MERGE eager: +-- it processes ALL input rows before returning any. +-- +SELECT * FROM create_graph('issue_1446'); +NOTICE: graph "issue_1446" has been created + create_graph +-------------- + +(1 row) + +-- Reporter's exact setup: two initial nodes +SELECT * FROM cypher('issue_1446', $$ CREATE (:A), (:C) $$) AS (a agtype); + a +--- +(0 rows) + +-- Reporter's exact reproduction case: two chained MERGEs +-- Without fix: C is created multiple times (once per MATCH row) because the +-- second MERGE's lateral join materializes before the first MERGE finishes. +-- With fix: returns 2 rows, C is found and reused by the second MERGE. +SELECT * FROM cypher('issue_1446', $$ + MATCH (x) + MERGE (x)-[:r]->(:t) + MERGE (:C)-[:r]->(:t) + RETURN x +$$) AS (a agtype); + a +------------------------------------------------------------------ + {"id": 844424930131969, "label": "A", "properties": {}}::vertex + {"id": 1125899906842625, "label": "C", "properties": {}}::vertex +(2 rows) + +-- Verify: A(1), C(1), t(2) = 4 nodes, 2 edges +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN labels(n) AS label, count(*) AS cnt + ORDER BY label +$$) AS (label agtype, cnt agtype); + label | cnt +-------+----- + ["A"] | 1 + ["C"] | 1 + ["t"] | 2 +(3 rows) + +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + edge_count +------------ + 2 +(1 row) + +-- Test with 3 initial nodes: ensures eager buffering works for larger sets +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1446', $$ CREATE (:X), (:Y), (:Z) $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + MERGE (n)-[:link]->(:shared) + MERGE (:hub)-[:link]->(:shared) + RETURN n +$$) AS (a agtype); + a +------------------------------------------------------------------ + {"id": 1970324836974593, "label": "X", "properties": {}}::vertex + {"id": 2251799813685249, "label": "Y", "properties": {}}::vertex + {"id": 2533274790395905, "label": "Z", "properties": {}}::vertex +(3 rows) + +-- Without fix: hub is created 3 times (once per MATCH row). +-- With fix: hub(1), shared(4), X(1), Y(1), Z(1) = 8 nodes, 4 edges +-- (3 n->shared edges + 1 hub->shared edge; hub reused for rows 2 & 3) +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN labels(n) AS label, count(*) AS cnt + ORDER BY label +$$) AS (label agtype, cnt agtype); + label | cnt +------------+----- + ["X"] | 1 + ["Y"] | 1 + ["Z"] | 1 + ["hub"] | 1 + ["shared"] | 4 +(5 rows) + +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + edge_count +------------ + 4 +(1 row) + -- -- clean up graphs -- @@ -1735,6 +1845,11 @@ SELECT * FROM cypher('issue_1709', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype --- (0 rows) +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); + a +--- +(0 rows) + -- -- delete graphs -- @@ -1812,6 +1927,26 @@ NOTICE: graph "issue_1709" has been dropped (1 row) +SELECT drop_graph('issue_1446', true); +NOTICE: drop cascades to 12 other objects +DETAIL: drop cascades to table issue_1446._ag_label_vertex +drop cascades to table issue_1446._ag_label_edge +drop cascades to table issue_1446."A" +drop cascades to table issue_1446."C" +drop cascades to table issue_1446.r +drop cascades to table issue_1446.t +drop cascades to table issue_1446."X" +drop cascades to table issue_1446."Y" +drop cascades to table issue_1446."Z" +drop cascades to table issue_1446.link +drop cascades to table issue_1446.shared +drop cascades to table issue_1446.hub +NOTICE: graph "issue_1446" has been dropped + drop_graph +------------ + +(1 row) + -- -- End -- diff --git a/regress/sql/cypher_merge.sql b/regress/sql/cypher_merge.sql index 02c9d21c2..6e270e5a4 100644 --- a/regress/sql/cypher_merge.sql +++ b/regress/sql/cypher_merge.sql @@ -785,12 +785,68 @@ SELECT * FROM cypher('issue_1907', $$ MERGE (a {name: 'Test Node A'})-[r:RELATED -- should return properties added SELECT * FROM cypher('issue_1907', $$ MATCH ()-[r]->() RETURN r $$) AS (r agtype); +-- +-- Fix issue 1446: First MERGE does not see the second MERGE's changes +-- +-- When chained MERGEs appear (MATCH ... MERGE ... MERGE ...), the first +-- (non-terminal) MERGE returned rows one at a time, so the second MERGE's +-- lateral join was materialized before the first finished all iterations. +-- This caused duplicate nodes. The fix makes non-terminal MERGE eager: +-- it processes ALL input rows before returning any. +-- +SELECT * FROM create_graph('issue_1446'); +-- Reporter's exact setup: two initial nodes +SELECT * FROM cypher('issue_1446', $$ CREATE (:A), (:C) $$) AS (a agtype); +-- Reporter's exact reproduction case: two chained MERGEs +-- Without fix: C is created multiple times (once per MATCH row) because the +-- second MERGE's lateral join materializes before the first MERGE finishes. +-- With fix: returns 2 rows, C is found and reused by the second MERGE. +SELECT * FROM cypher('issue_1446', $$ + MATCH (x) + MERGE (x)-[:r]->(:t) + MERGE (:C)-[:r]->(:t) + RETURN x +$$) AS (a agtype); +-- Verify: A(1), C(1), t(2) = 4 nodes, 2 edges +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN labels(n) AS label, count(*) AS cnt + ORDER BY label +$$) AS (label agtype, cnt agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + +-- Test with 3 initial nodes: ensures eager buffering works for larger sets +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); +SELECT * FROM cypher('issue_1446', $$ CREATE (:X), (:Y), (:Z) $$) AS (a agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + MERGE (n)-[:link]->(:shared) + MERGE (:hub)-[:link]->(:shared) + RETURN n +$$) AS (a agtype); +-- Without fix: hub is created 3 times (once per MATCH row). +-- With fix: hub(1), shared(4), X(1), Y(1), Z(1) = 8 nodes, 4 edges +-- (3 n->shared edges + 1 hub->shared edge; hub reused for rows 2 & 3) +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN labels(n) AS label, count(*) AS cnt + ORDER BY label +$$) AS (label agtype, cnt agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + -- -- clean up graphs -- SELECT * FROM cypher('cypher_merge', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); SELECT * FROM cypher('issue_1630', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); SELECT * FROM cypher('issue_1709', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); -- -- delete graphs @@ -800,6 +856,7 @@ SELECT drop_graph('cypher_merge', true); SELECT drop_graph('issue_1630', true); SELECT drop_graph('issue_1691', true); SELECT drop_graph('issue_1709', true); +SELECT drop_graph('issue_1446', true); -- -- End diff --git a/src/backend/executor/cypher_merge.c b/src/backend/executor/cypher_merge.c index a1bb4686c..8b8fad44e 100644 --- a/src/backend/executor/cypher_merge.c +++ b/src/backend/executor/cypher_merge.c @@ -596,23 +596,126 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) * it did, we don't need to create the pattern. If the lateral join did * not find the whole path, create the whole path. * - * If this is a terminal clause, process all tuples. If not, pass the - * tuple to up the execution tree. + * For non-terminal MERGE, we eagerly process ALL input rows before + * returning any results. This ensures that all entities created by + * this MERGE are committed to the database before any parent plan + * node (such as another MERGE's lateral join) scans the tables. + * Without eager processing, chained MERGEs cannot see each other's + * changes because the parent's join scan is materialized before the + * child MERGE finishes all its iterations. + * + * For terminal MERGE, all tuples are processed in a single pass + * and NULL is returned. + */ + + /* + * Non-terminal: eagerly process all rows and buffer the results. + */ + if (!terminal && !css->eager_buffer_filled) + { + css->eager_tuples = NIL; + css->eager_tuples_index = 0; + + while (true) + { + TupleTableSlot *projected; + HeapTuple htup; + + /* Process the subtree first */ + Decrement_Estate_CommandId(estate) + slot = ExecProcNode(node->ss.ps.lefttree); + Increment_Estate_CommandId(estate) + + if (TupIsNull(slot)) + { + break; + } + + /* setup the scantuple that the process_path needs */ + econtext->ecxt_scantuple = + node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple; + + /* + * Check the subtree to see if the lateral join + * representing the MERGE path found results. If not, + * we need to create the path. + */ + if (check_path(css, econtext->ecxt_scantuple)) + { + path_entry **prebuilt_path_array = NULL; + path_entry **found_path_array = NULL; + int path_length = + list_length(css->path->target_nodes); + + prebuilt_path_array = prebuild_path(node); + + found_path_array = + find_duplicate_path(node, prebuilt_path_array); + + if (found_path_array) + { + free_path_entry_array(prebuilt_path_array, + path_length); + process_path(css, found_path_array, false); + } + else + { + created_path *new_path = + palloc0(sizeof(created_path)); + + new_path->next = css->created_paths_list; + new_path->entry = prebuilt_path_array; + css->created_paths_list = new_path; + + process_path(css, prebuilt_path_array, true); + } + } + + /* Project the result and save a copy */ + econtext->ecxt_scantuple = + ExecProject(node->ss.ps.lefttree->ps_ProjInfo); + projected = ExecProject(node->ss.ps.ps_ProjInfo); + + htup = ExecCopySlotHeapTuple(projected); + css->eager_tuples = + lappend(css->eager_tuples, htup); + } + + css->eager_buffer_filled = true; + } + + /* Non-terminal: return the next buffered row */ + if (!terminal && css->eager_tuples != NIL) + { + if (css->eager_tuples_index < list_length(css->eager_tuples)) + { + HeapTuple htup; + TupleTableSlot *result_slot = + node->ss.ps.ps_ResultTupleSlot; + + htup = (HeapTuple) + list_nth(css->eager_tuples, css->eager_tuples_index); + css->eager_tuples_index++; + + ExecForceStoreHeapTuple(htup, result_slot, false); + return result_slot; + } + + return NULL; + } + + /* + * Terminal: process all tuples and return NULL. */ do { - /*Process the subtree first */ + /* Process the subtree first */ Decrement_Estate_CommandId(estate) slot = ExecProcNode(node->ss.ps.lefttree); Increment_Estate_CommandId(estate) - /* - * We are done processing the subtree, mark as terminal - * so the function returns NULL. - */ if (TupIsNull(slot)) { - terminal = true; break; } @@ -622,7 +725,7 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) /* * Check the subtree to see if the lateral join representing the - * MERGE path found results. If not, we need to create the path + * MERGE path found results. If not, we need to create the path. */ if (check_path(css, econtext->ecxt_scantuple)) { @@ -630,71 +733,31 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) path_entry **found_path_array = NULL; int path_length = list_length(css->path->target_nodes); - /* - * Prebuild our path and verify that it wasn't already created. - * - * Note: This is currently only needed when there is a previous - * clause. This is due to the fact that MERGE can't see - * what it has just created. This isn't due to transaction - * or command ids, it's due to the join's scan not being - * able to add in the newly inserted tuples and rescan - * with these tuples. - * - * Note: The prebuilt path is purposely generic as it needs to - * only match a path. The more specific items will be - * added by merge_vertex and merge_edge if it is inserted. - * - * Note: The IDs are purposely not created here because we may - * need to throw them away if a path was previously - * created. Remember, the IDs are automatically - * incremented when fetched. - */ prebuilt_path_array = prebuild_path(node); found_path_array = find_duplicate_path(node, prebuilt_path_array); - /* if found we don't need to insert anything, just reuse it */ if (found_path_array) { - /* we don't need our prebuilt path anymore */ free_path_entry_array(prebuilt_path_array, path_length); - - /* as this path exists, we don't need to insert it */ process_path(css, found_path_array, false); } - /* otherwise, we need to insert the new, prebuilt, path */ else { created_path *new_path = palloc0(sizeof(created_path)); - /* build the next linked list entry for our created_paths */ - new_path = palloc0(sizeof(created_path)); new_path->next = css->created_paths_list; new_path->entry = prebuilt_path_array; - - /* we need to push our prebuilt path onto the list */ css->created_paths_list = new_path; - /* - * We need to pass in the prebuilt path so that it can get - * filled in with more specific information - */ process_path(css, prebuilt_path_array, true); } } - } while (terminal); - - /* if this was a terminal MERGE just return NULL */ - if (terminal) - { - return NULL; - } - - econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo); + } while (true); - return ExecProject(node->ss.ps.ps_ProjInfo); + return NULL; } else if (terminal) @@ -910,6 +973,18 @@ static void end_cypher_merge(CustomScanState *node) css->created_paths_list = next; } + /* free the eager buffer if it was used */ + if (css->eager_tuples != NIL) + { + ListCell *lc2; + + foreach(lc2, css->eager_tuples) + { + heap_freetuple((HeapTuple) lfirst(lc2)); + } + list_free(css->eager_tuples); + css->eager_tuples = NIL; + } } /* diff --git a/src/include/executor/cypher_utils.h b/src/include/executor/cypher_utils.h index fc4067455..278094e07 100644 --- a/src/include/executor/cypher_utils.h +++ b/src/include/executor/cypher_utils.h @@ -108,6 +108,9 @@ typedef struct cypher_merge_custom_scan_state bool found_a_path; CommandId base_currentCommandId; struct created_path *created_paths_list; + List *eager_tuples; + int eager_tuples_index; + bool eager_buffer_filled; } cypher_merge_custom_scan_state; TupleTableSlot *populate_vertex_tts(TupleTableSlot *elemTupleSlot, From 2eee605da069c223ca43cdcd916b01e2fa63d798 Mon Sep 17 00:00:00 2001 From: Greg Felice Date: Thu, 26 Feb 2026 22:05:50 -0500 Subject: [PATCH 2/2] Fix non-terminal MERGE empty-buffer fallthrough and add test When a non-terminal MERGE receives no input rows from its predecessor (e.g., MATCH returns 0 rows), the eager buffer is filled but empty. The condition at line 688 checked `css->eager_tuples != NIL`, which evaluated to false for an empty buffer, causing execution to fall through to the terminal MERGE code path. This could incorrectly create entities when none should be created. Fix by checking `css->eager_buffer_filled` instead, which correctly distinguishes "buffer not yet filled" from "buffer filled but empty". Add regression test for chained MERGE with empty MATCH result. Co-Authored-By: Claude Opus 4.6 --- regress/expected/cypher_merge.out | 38 +++++++++++++++++++++++++++++ regress/sql/cypher_merge.sql | 20 +++++++++++++++ src/backend/executor/cypher_merge.c | 4 +-- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/regress/expected/cypher_merge.out b/regress/expected/cypher_merge.out index 10c28573a..8c37dc2de 100644 --- a/regress/expected/cypher_merge.out +++ b/regress/expected/cypher_merge.out @@ -1827,6 +1827,44 @@ $$) AS (edge_count agtype); 4 (1 row) +-- Test chained MERGE with empty MATCH result (empty buffer scenario) +-- Without fix: non-terminal MERGE falls through to terminal logic, +-- incorrectly creating entities when MATCH returns 0 rows. +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1446', $$ + MATCH (x:NonExistent) + MERGE (x)-[:r]->(:t) + MERGE (:C)-[:r]->(:t) + RETURN count(*) AS cnt +$$) AS (cnt agtype); + cnt +----- + 0 +(1 row) + +-- Verify no nodes or edges were created +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN count(*) AS node_count +$$) AS (node_count agtype); + node_count +------------ + 0 +(1 row) + +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + edge_count +------------ + 0 +(1 row) + -- -- clean up graphs -- diff --git a/regress/sql/cypher_merge.sql b/regress/sql/cypher_merge.sql index 6e270e5a4..cc900e73d 100644 --- a/regress/sql/cypher_merge.sql +++ b/regress/sql/cypher_merge.sql @@ -840,6 +840,26 @@ SELECT * FROM cypher('issue_1446', $$ RETURN count(*) AS edge_count $$) AS (edge_count agtype); +-- Test chained MERGE with empty MATCH result (empty buffer scenario) +-- Without fix: non-terminal MERGE falls through to terminal logic, +-- incorrectly creating entities when MATCH returns 0 rows. +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH (x:NonExistent) + MERGE (x)-[:r]->(:t) + MERGE (:C)-[:r]->(:t) + RETURN count(*) AS cnt +$$) AS (cnt agtype); +-- Verify no nodes or edges were created +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN count(*) AS node_count +$$) AS (node_count agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + -- -- clean up graphs -- diff --git a/src/backend/executor/cypher_merge.c b/src/backend/executor/cypher_merge.c index 8b8fad44e..1edfc812d 100644 --- a/src/backend/executor/cypher_merge.c +++ b/src/backend/executor/cypher_merge.c @@ -684,8 +684,8 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) css->eager_buffer_filled = true; } - /* Non-terminal: return the next buffered row */ - if (!terminal && css->eager_tuples != NIL) + /* Non-terminal: return the next buffered row (or NULL if empty) */ + if (!terminal && css->eager_buffer_filled) { if (css->eager_tuples_index < list_length(css->eager_tuples)) {