Skip to content

Commit 7a837aa

Browse files
refactor: redesign skip_duplicates='match' for correctness
Address several edge cases in the 'match' mode implementation: - Batch PK fetch: replace row-by-row SELECT with a single batch query for all incoming PKs. Reduces O(n) queries to O(1). - Better error messages: report the specific unique index and column values that differ, not just "values differ". - Non-PK unique conflicts: when a row's PK is new but it collides on a secondary unique index with a *different* existing row, the DB-level DuplicateError is now caught and re-raised with context explaining the likely cause (secondary unique or race condition). - Race conditions: if a concurrent insert creates a PK that passed the pre-check, the DB-level error is caught and reported clearly. - Columns not supplied: if a unique index column is not in the incoming row dict (DB will use its default), skip comparison for that column rather than false-flagging a mismatch. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent ba40a9c commit 7a837aa

File tree

1 file changed

+67
-32
lines changed

1 file changed

+67
-32
lines changed

src/datajoint/table.py

Lines changed: 67 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -840,9 +840,16 @@ def _filter_match_duplicates(self, rows):
840840
"""
841841
Filter rows for skip_duplicates='match'.
842842
843-
For each row: if a row with the same primary key already exists and all
844-
secondary unique index values also match, skip the row silently.
845-
If the primary key exists but unique index values differ, raise DuplicateError.
843+
For each row whose primary key already exists: skip silently if all
844+
secondary unique index values also match the existing row; raise
845+
DuplicateError if the primary key exists but any unique index value
846+
differs.
847+
848+
Rows whose primary key is not yet in the table are kept for insert.
849+
If a kept row subsequently violates a *non-PK* unique constraint at
850+
insert time (e.g. a different row already owns that unique value, or a
851+
concurrent insert created a conflict), the caller is responsible for
852+
catching the resulting DuplicateError.
846853
847854
Parameters
848855
----------
@@ -852,50 +859,70 @@ def _filter_match_duplicates(self, rows):
852859
Returns
853860
-------
854861
list
855-
Rows that should be inserted.
862+
Rows that should be inserted (PK-duplicate matches removed).
856863
"""
857864
unique_col_sets = [list(cols) for cols, info in self.heading.indexes.items() if info["unique"]]
858865

859-
result = []
866+
# --- normalise every row to a dict so we can inspect values ---
867+
row_dicts = []
860868
for row in rows:
861-
# Normalize row to dict
862869
if isinstance(row, np.void):
863-
row_dict = {name: row[name] for name in row.dtype.fields}
870+
row_dicts.append({name: row[name] for name in row.dtype.fields})
864871
elif isinstance(row, collections.abc.Mapping):
865-
row_dict = dict(row)
872+
row_dicts.append(dict(row))
866873
else:
867-
row_dict = dict(zip(self.heading.names, row))
868-
869-
# Build PK restriction
870-
pk_dict = {pk: row_dict[pk] for pk in self.primary_key if pk in row_dict}
871-
if len(pk_dict) < len(self.primary_key):
874+
row_dicts.append(dict(zip(self.heading.names, row)))
875+
876+
# --- batch-fetch existing rows whose PK matches any incoming row ---
877+
pk_lookups = []
878+
for rd in row_dicts:
879+
pk = {k: rd[k] for k in self.primary_key if k in rd}
880+
if len(pk) == len(self.primary_key):
881+
pk_lookups.append(pk)
882+
883+
existing_by_pk: dict[tuple, dict] = {}
884+
if pk_lookups:
885+
existing_rows = (self & pk_lookups).fetch(as_dict=True)
886+
for er in existing_rows:
887+
pk_tuple = tuple(er[k] for k in self.primary_key)
888+
existing_by_pk[pk_tuple] = er
889+
890+
# --- decide per row: insert, skip, or raise ---
891+
result = []
892+
for row, rd in zip(rows, row_dicts):
893+
pk = {k: rd[k] for k in self.primary_key if k in rd}
894+
if len(pk) < len(self.primary_key):
895+
# incomplete PK — let the DB raise the real error
872896
result.append(row)
873897
continue
874898

875-
existing = (self & pk_dict).fetch(limit=1, as_dict=True)
876-
if not existing:
899+
pk_tuple = tuple(pk[k] for k in self.primary_key)
900+
existing_row = existing_by_pk.get(pk_tuple)
901+
if existing_row is None:
902+
# PK not yet in table — include for insert.
903+
# If this row collides on a secondary unique index with a
904+
# *different* existing row, the DB will raise at insert time.
877905
result.append(row)
878906
continue
879907

880-
existing_row = existing[0]
881-
882-
# Check all unique index columns for a match
883-
all_match = True
908+
# PK exists — check every secondary unique index
884909
for cols in unique_col_sets:
885910
for col in cols:
886-
if col in row_dict and col in existing_row:
887-
if row_dict[col] != existing_row[col]:
888-
all_match = False
889-
break
890-
if not all_match:
891-
break
911+
if col not in rd:
912+
# Column not supplied by the caller; the DB will use
913+
# its default. We cannot compare, so skip this column.
914+
continue
915+
new_val = rd[col]
916+
old_val = existing_row.get(col)
917+
if new_val != old_val:
918+
raise DuplicateError(
919+
f"Unique index conflict in {self.table_name}: "
920+
f"primary key {pk} exists but unique index "
921+
f"({', '.join(cols)}) differs — "
922+
f"existing {col}={old_val!r}, new {col}={new_val!r}."
923+
)
892924

893-
if not all_match:
894-
raise DuplicateError(
895-
f"Unique index conflict in {self.table_name}: "
896-
f"a row with the same primary key exists but unique index values differ."
897-
)
898-
# else: silently skip — existing row is an exact match
925+
# All unique index values match (or there are none) — skip row.
899926

900927
return result
901928

@@ -914,7 +941,8 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields):
914941
ignore_extra_fields : bool
915942
If True, ignore unknown fields.
916943
"""
917-
if skip_duplicates == "match":
944+
match_mode = skip_duplicates == "match"
945+
if match_mode:
918946
rows = self._filter_match_duplicates(list(rows))
919947
skip_duplicates = False
920948

@@ -945,6 +973,13 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields):
945973
except UnknownAttributeError as err:
946974
raise err.suggest("To ignore extra fields in insert, set ignore_extra_fields=True")
947975
except DuplicateError as err:
976+
if match_mode:
977+
raise DuplicateError(
978+
f"Duplicate entry during skip_duplicates='match' insert into "
979+
f"{self.table_name}. A row with a new primary key may conflict on "
980+
f"a secondary unique index with an existing row, or a concurrent "
981+
f"insert created a conflict. Original error: {err}"
982+
)
948983
raise err.suggest("To ignore duplicate entries in insert, set skip_duplicates=True")
949984

950985
def insert_dataframe(self, df, index_as_pk=None, **insert_kwargs):

0 commit comments

Comments
 (0)