diff --git a/regress/expected/cypher_merge.out b/regress/expected/cypher_merge.out index 56a23f513..8c37dc2de 100644 --- a/regress/expected/cypher_merge.out +++ b/regress/expected/cypher_merge.out @@ -1717,6 +1717,154 @@ SELECT * FROM cypher('issue_1907', $$ MATCH ()-[r]->() RETURN r $$) AS (r agtype {"id": 1125899906842626, "label": "RELATED_TO", "end_id": 281474976710660, "start_id": 281474976710659, "properties": {"property1": "something", "property2": "else"}}::edge (1 row) +-- +-- Fix issue 1446: First MERGE does not see the second MERGE's changes +-- +-- When chained MERGEs appear (MATCH ... MERGE ... MERGE ...), the first +-- (non-terminal) MERGE returned rows one at a time, so the second MERGE's +-- lateral join was materialized before the first finished all iterations. +-- This caused duplicate nodes. The fix makes non-terminal MERGE eager: +-- it processes ALL input rows before returning any. +-- +SELECT * FROM create_graph('issue_1446'); +NOTICE: graph "issue_1446" has been created + create_graph +-------------- + +(1 row) + +-- Reporter's exact setup: two initial nodes +SELECT * FROM cypher('issue_1446', $$ CREATE (:A), (:C) $$) AS (a agtype); + a +--- +(0 rows) + +-- Reporter's exact reproduction case: two chained MERGEs +-- Without fix: C is created multiple times (once per MATCH row) because the +-- second MERGE's lateral join materializes before the first MERGE finishes. +-- With fix: returns 2 rows, C is found and reused by the second MERGE. +SELECT * FROM cypher('issue_1446', $$ + MATCH (x) + MERGE (x)-[:r]->(:t) + MERGE (:C)-[:r]->(:t) + RETURN x +$$) AS (a agtype); + a +------------------------------------------------------------------ + {"id": 844424930131969, "label": "A", "properties": {}}::vertex + {"id": 1125899906842625, "label": "C", "properties": {}}::vertex +(2 rows) + +-- Verify: A(1), C(1), t(2) = 4 nodes, 2 edges +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN labels(n) AS label, count(*) AS cnt + ORDER BY label +$$) AS (label agtype, cnt agtype); + label | cnt +-------+----- + ["A"] | 1 + ["C"] | 1 + ["t"] | 2 +(3 rows) + +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + edge_count +------------ + 2 +(1 row) + +-- Test with 3 initial nodes: ensures eager buffering works for larger sets +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1446', $$ CREATE (:X), (:Y), (:Z) $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + MERGE (n)-[:link]->(:shared) + MERGE (:hub)-[:link]->(:shared) + RETURN n +$$) AS (a agtype); + a +------------------------------------------------------------------ + {"id": 1970324836974593, "label": "X", "properties": {}}::vertex + {"id": 2251799813685249, "label": "Y", "properties": {}}::vertex + {"id": 2533274790395905, "label": "Z", "properties": {}}::vertex +(3 rows) + +-- Without fix: hub is created 3 times (once per MATCH row). +-- With fix: hub(1), shared(4), X(1), Y(1), Z(1) = 8 nodes, 4 edges +-- (3 n->shared edges + 1 hub->shared edge; hub reused for rows 2 & 3) +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN labels(n) AS label, count(*) AS cnt + ORDER BY label +$$) AS (label agtype, cnt agtype); + label | cnt +------------+----- + ["X"] | 1 + ["Y"] | 1 + ["Z"] | 1 + ["hub"] | 1 + ["shared"] | 4 +(5 rows) + +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + edge_count +------------ + 4 +(1 row) + +-- Test chained MERGE with empty MATCH result (empty buffer scenario) +-- Without fix: non-terminal MERGE falls through to terminal logic, +-- incorrectly creating entities when MATCH returns 0 rows. +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1446', $$ + MATCH (x:NonExistent) + MERGE (x)-[:r]->(:t) + MERGE (:C)-[:r]->(:t) + RETURN count(*) AS cnt +$$) AS (cnt agtype); + cnt +----- + 0 +(1 row) + +-- Verify no nodes or edges were created +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN count(*) AS node_count +$$) AS (node_count agtype); + node_count +------------ + 0 +(1 row) + +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + edge_count +------------ + 0 +(1 row) + -- -- clean up graphs -- @@ -1735,6 +1883,11 @@ SELECT * FROM cypher('issue_1709', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype --- (0 rows) +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); + a +--- +(0 rows) + -- -- delete graphs -- @@ -1812,6 +1965,26 @@ NOTICE: graph "issue_1709" has been dropped (1 row) +SELECT drop_graph('issue_1446', true); +NOTICE: drop cascades to 12 other objects +DETAIL: drop cascades to table issue_1446._ag_label_vertex +drop cascades to table issue_1446._ag_label_edge +drop cascades to table issue_1446."A" +drop cascades to table issue_1446."C" +drop cascades to table issue_1446.r +drop cascades to table issue_1446.t +drop cascades to table issue_1446."X" +drop cascades to table issue_1446."Y" +drop cascades to table issue_1446."Z" +drop cascades to table issue_1446.link +drop cascades to table issue_1446.shared +drop cascades to table issue_1446.hub +NOTICE: graph "issue_1446" has been dropped + drop_graph +------------ + +(1 row) + -- -- End -- diff --git a/regress/sql/cypher_merge.sql b/regress/sql/cypher_merge.sql index 02c9d21c2..cc900e73d 100644 --- a/regress/sql/cypher_merge.sql +++ b/regress/sql/cypher_merge.sql @@ -785,12 +785,88 @@ SELECT * FROM cypher('issue_1907', $$ MERGE (a {name: 'Test Node A'})-[r:RELATED -- should return properties added SELECT * FROM cypher('issue_1907', $$ MATCH ()-[r]->() RETURN r $$) AS (r agtype); +-- +-- Fix issue 1446: First MERGE does not see the second MERGE's changes +-- +-- When chained MERGEs appear (MATCH ... MERGE ... MERGE ...), the first +-- (non-terminal) MERGE returned rows one at a time, so the second MERGE's +-- lateral join was materialized before the first finished all iterations. +-- This caused duplicate nodes. The fix makes non-terminal MERGE eager: +-- it processes ALL input rows before returning any. +-- +SELECT * FROM create_graph('issue_1446'); +-- Reporter's exact setup: two initial nodes +SELECT * FROM cypher('issue_1446', $$ CREATE (:A), (:C) $$) AS (a agtype); +-- Reporter's exact reproduction case: two chained MERGEs +-- Without fix: C is created multiple times (once per MATCH row) because the +-- second MERGE's lateral join materializes before the first MERGE finishes. +-- With fix: returns 2 rows, C is found and reused by the second MERGE. +SELECT * FROM cypher('issue_1446', $$ + MATCH (x) + MERGE (x)-[:r]->(:t) + MERGE (:C)-[:r]->(:t) + RETURN x +$$) AS (a agtype); +-- Verify: A(1), C(1), t(2) = 4 nodes, 2 edges +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN labels(n) AS label, count(*) AS cnt + ORDER BY label +$$) AS (label agtype, cnt agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + +-- Test with 3 initial nodes: ensures eager buffering works for larger sets +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); +SELECT * FROM cypher('issue_1446', $$ CREATE (:X), (:Y), (:Z) $$) AS (a agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + MERGE (n)-[:link]->(:shared) + MERGE (:hub)-[:link]->(:shared) + RETURN n +$$) AS (a agtype); +-- Without fix: hub is created 3 times (once per MATCH row). +-- With fix: hub(1), shared(4), X(1), Y(1), Z(1) = 8 nodes, 4 edges +-- (3 n->shared edges + 1 hub->shared edge; hub reused for rows 2 & 3) +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN labels(n) AS label, count(*) AS cnt + ORDER BY label +$$) AS (label agtype, cnt agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + +-- Test chained MERGE with empty MATCH result (empty buffer scenario) +-- Without fix: non-terminal MERGE falls through to terminal logic, +-- incorrectly creating entities when MATCH returns 0 rows. +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH (x:NonExistent) + MERGE (x)-[:r]->(:t) + MERGE (:C)-[:r]->(:t) + RETURN count(*) AS cnt +$$) AS (cnt agtype); +-- Verify no nodes or edges were created +SELECT * FROM cypher('issue_1446', $$ + MATCH (n) + RETURN count(*) AS node_count +$$) AS (node_count agtype); +SELECT * FROM cypher('issue_1446', $$ + MATCH ()-[e]->() + RETURN count(*) AS edge_count +$$) AS (edge_count agtype); + -- -- clean up graphs -- SELECT * FROM cypher('cypher_merge', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); SELECT * FROM cypher('issue_1630', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); SELECT * FROM cypher('issue_1709', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); +SELECT * FROM cypher('issue_1446', $$ MATCH (n) DETACH DELETE n $$) AS (a agtype); -- -- delete graphs @@ -800,6 +876,7 @@ SELECT drop_graph('cypher_merge', true); SELECT drop_graph('issue_1630', true); SELECT drop_graph('issue_1691', true); SELECT drop_graph('issue_1709', true); +SELECT drop_graph('issue_1446', true); -- -- End diff --git a/src/backend/executor/cypher_merge.c b/src/backend/executor/cypher_merge.c index a1bb4686c..1edfc812d 100644 --- a/src/backend/executor/cypher_merge.c +++ b/src/backend/executor/cypher_merge.c @@ -596,23 +596,126 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) * it did, we don't need to create the pattern. If the lateral join did * not find the whole path, create the whole path. * - * If this is a terminal clause, process all tuples. If not, pass the - * tuple to up the execution tree. + * For non-terminal MERGE, we eagerly process ALL input rows before + * returning any results. This ensures that all entities created by + * this MERGE are committed to the database before any parent plan + * node (such as another MERGE's lateral join) scans the tables. + * Without eager processing, chained MERGEs cannot see each other's + * changes because the parent's join scan is materialized before the + * child MERGE finishes all its iterations. + * + * For terminal MERGE, all tuples are processed in a single pass + * and NULL is returned. + */ + + /* + * Non-terminal: eagerly process all rows and buffer the results. + */ + if (!terminal && !css->eager_buffer_filled) + { + css->eager_tuples = NIL; + css->eager_tuples_index = 0; + + while (true) + { + TupleTableSlot *projected; + HeapTuple htup; + + /* Process the subtree first */ + Decrement_Estate_CommandId(estate) + slot = ExecProcNode(node->ss.ps.lefttree); + Increment_Estate_CommandId(estate) + + if (TupIsNull(slot)) + { + break; + } + + /* setup the scantuple that the process_path needs */ + econtext->ecxt_scantuple = + node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple; + + /* + * Check the subtree to see if the lateral join + * representing the MERGE path found results. If not, + * we need to create the path. + */ + if (check_path(css, econtext->ecxt_scantuple)) + { + path_entry **prebuilt_path_array = NULL; + path_entry **found_path_array = NULL; + int path_length = + list_length(css->path->target_nodes); + + prebuilt_path_array = prebuild_path(node); + + found_path_array = + find_duplicate_path(node, prebuilt_path_array); + + if (found_path_array) + { + free_path_entry_array(prebuilt_path_array, + path_length); + process_path(css, found_path_array, false); + } + else + { + created_path *new_path = + palloc0(sizeof(created_path)); + + new_path->next = css->created_paths_list; + new_path->entry = prebuilt_path_array; + css->created_paths_list = new_path; + + process_path(css, prebuilt_path_array, true); + } + } + + /* Project the result and save a copy */ + econtext->ecxt_scantuple = + ExecProject(node->ss.ps.lefttree->ps_ProjInfo); + projected = ExecProject(node->ss.ps.ps_ProjInfo); + + htup = ExecCopySlotHeapTuple(projected); + css->eager_tuples = + lappend(css->eager_tuples, htup); + } + + css->eager_buffer_filled = true; + } + + /* Non-terminal: return the next buffered row (or NULL if empty) */ + if (!terminal && css->eager_buffer_filled) + { + if (css->eager_tuples_index < list_length(css->eager_tuples)) + { + HeapTuple htup; + TupleTableSlot *result_slot = + node->ss.ps.ps_ResultTupleSlot; + + htup = (HeapTuple) + list_nth(css->eager_tuples, css->eager_tuples_index); + css->eager_tuples_index++; + + ExecForceStoreHeapTuple(htup, result_slot, false); + return result_slot; + } + + return NULL; + } + + /* + * Terminal: process all tuples and return NULL. */ do { - /*Process the subtree first */ + /* Process the subtree first */ Decrement_Estate_CommandId(estate) slot = ExecProcNode(node->ss.ps.lefttree); Increment_Estate_CommandId(estate) - /* - * We are done processing the subtree, mark as terminal - * so the function returns NULL. - */ if (TupIsNull(slot)) { - terminal = true; break; } @@ -622,7 +725,7 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) /* * Check the subtree to see if the lateral join representing the - * MERGE path found results. If not, we need to create the path + * MERGE path found results. If not, we need to create the path. */ if (check_path(css, econtext->ecxt_scantuple)) { @@ -630,71 +733,31 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) path_entry **found_path_array = NULL; int path_length = list_length(css->path->target_nodes); - /* - * Prebuild our path and verify that it wasn't already created. - * - * Note: This is currently only needed when there is a previous - * clause. This is due to the fact that MERGE can't see - * what it has just created. This isn't due to transaction - * or command ids, it's due to the join's scan not being - * able to add in the newly inserted tuples and rescan - * with these tuples. - * - * Note: The prebuilt path is purposely generic as it needs to - * only match a path. The more specific items will be - * added by merge_vertex and merge_edge if it is inserted. - * - * Note: The IDs are purposely not created here because we may - * need to throw them away if a path was previously - * created. Remember, the IDs are automatically - * incremented when fetched. - */ prebuilt_path_array = prebuild_path(node); found_path_array = find_duplicate_path(node, prebuilt_path_array); - /* if found we don't need to insert anything, just reuse it */ if (found_path_array) { - /* we don't need our prebuilt path anymore */ free_path_entry_array(prebuilt_path_array, path_length); - - /* as this path exists, we don't need to insert it */ process_path(css, found_path_array, false); } - /* otherwise, we need to insert the new, prebuilt, path */ else { created_path *new_path = palloc0(sizeof(created_path)); - /* build the next linked list entry for our created_paths */ - new_path = palloc0(sizeof(created_path)); new_path->next = css->created_paths_list; new_path->entry = prebuilt_path_array; - - /* we need to push our prebuilt path onto the list */ css->created_paths_list = new_path; - /* - * We need to pass in the prebuilt path so that it can get - * filled in with more specific information - */ process_path(css, prebuilt_path_array, true); } } - } while (terminal); - - /* if this was a terminal MERGE just return NULL */ - if (terminal) - { - return NULL; - } - - econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo); + } while (true); - return ExecProject(node->ss.ps.ps_ProjInfo); + return NULL; } else if (terminal) @@ -910,6 +973,18 @@ static void end_cypher_merge(CustomScanState *node) css->created_paths_list = next; } + /* free the eager buffer if it was used */ + if (css->eager_tuples != NIL) + { + ListCell *lc2; + + foreach(lc2, css->eager_tuples) + { + heap_freetuple((HeapTuple) lfirst(lc2)); + } + list_free(css->eager_tuples); + css->eager_tuples = NIL; + } } /* diff --git a/src/include/executor/cypher_utils.h b/src/include/executor/cypher_utils.h index fc4067455..278094e07 100644 --- a/src/include/executor/cypher_utils.h +++ b/src/include/executor/cypher_utils.h @@ -108,6 +108,9 @@ typedef struct cypher_merge_custom_scan_state bool found_a_path; CommandId base_currentCommandId; struct created_path *created_paths_list; + List *eager_tuples; + int eager_tuples_index; + bool eager_buffer_filled; } cypher_merge_custom_scan_state; TupleTableSlot *populate_vertex_tts(TupleTableSlot *elemTupleSlot,