Skip to content

Commit ba40a9c

Browse files
feat: add skip_duplicates='match' mode to insert/insert1 (fixes #1049)
New skip_duplicates='match' option: a row is skipped only if a row with the same primary key already exists AND all secondary unique index values also match. If the primary key exists but unique index values differ, DuplicateError is raised. Compared to skip_duplicates=True (which silently skips any row whose primary key is already present), 'match' mode detects when incoming data conflicts with existing data on non-PK unique constraints. Implementation: two-query approach (select-then-insert) via the new _filter_match_duplicates() method, which works identically for both MySQL and PostgreSQL backends. Unique index metadata is read from self.heading.indexes (populated at heading load time). Not supported for QueryExpression inserts (raises DataJointError). Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent 34acbbe commit ba40a9c

File tree

1 file changed

+74
-2
lines changed

1 file changed

+74
-2
lines changed

src/datajoint/table.py

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -744,8 +744,11 @@ def insert(
744744
directory with a CSV file, the contents of which will be inserted.
745745
replace : bool, optional
746746
If True, replaces the existing tuple.
747-
skip_duplicates : bool, optional
748-
If True, silently skip duplicate inserts.
747+
skip_duplicates : bool or str, optional
748+
If True, silently skip rows whose primary key already exists.
749+
If ``'match'``, skip only if the primary key exists AND all secondary
750+
unique index values also match; raise DuplicateError if the primary
751+
key exists but unique index values differ.
749752
ignore_extra_fields : bool, optional
750753
If False (default), fields that are not in the heading raise error.
751754
allow_direct_insert : bool, optional
@@ -808,6 +811,8 @@ def insert(
808811
quoted_fields = ",".join(self.adapter.quote_identifier(f) for f in fields)
809812

810813
# Duplicate handling (backend-agnostic)
814+
if skip_duplicates == "match":
815+
raise DataJointError("skip_duplicates='match' is not supported for QueryExpression inserts.")
811816
if skip_duplicates:
812817
duplicate = self.adapter.skip_duplicates_clause(self.full_table_name, self.primary_key)
813818
else:
@@ -831,6 +836,69 @@ def insert(
831836
# Single batch insert (original behavior)
832837
self._insert_rows(rows, replace, skip_duplicates, ignore_extra_fields)
833838

839+
def _filter_match_duplicates(self, rows):
840+
"""
841+
Filter rows for skip_duplicates='match'.
842+
843+
For each row: if a row with the same primary key already exists and all
844+
secondary unique index values also match, skip the row silently.
845+
If the primary key exists but unique index values differ, raise DuplicateError.
846+
847+
Parameters
848+
----------
849+
rows : list
850+
Raw rows (dicts, numpy records, or sequences) before encoding.
851+
852+
Returns
853+
-------
854+
list
855+
Rows that should be inserted.
856+
"""
857+
unique_col_sets = [list(cols) for cols, info in self.heading.indexes.items() if info["unique"]]
858+
859+
result = []
860+
for row in rows:
861+
# Normalize row to dict
862+
if isinstance(row, np.void):
863+
row_dict = {name: row[name] for name in row.dtype.fields}
864+
elif isinstance(row, collections.abc.Mapping):
865+
row_dict = dict(row)
866+
else:
867+
row_dict = dict(zip(self.heading.names, row))
868+
869+
# Build PK restriction
870+
pk_dict = {pk: row_dict[pk] for pk in self.primary_key if pk in row_dict}
871+
if len(pk_dict) < len(self.primary_key):
872+
result.append(row)
873+
continue
874+
875+
existing = (self & pk_dict).fetch(limit=1, as_dict=True)
876+
if not existing:
877+
result.append(row)
878+
continue
879+
880+
existing_row = existing[0]
881+
882+
# Check all unique index columns for a match
883+
all_match = True
884+
for cols in unique_col_sets:
885+
for col in cols:
886+
if col in row_dict and col in existing_row:
887+
if row_dict[col] != existing_row[col]:
888+
all_match = False
889+
break
890+
if not all_match:
891+
break
892+
893+
if not all_match:
894+
raise DuplicateError(
895+
f"Unique index conflict in {self.table_name}: "
896+
f"a row with the same primary key exists but unique index values differ."
897+
)
898+
# else: silently skip — existing row is an exact match
899+
900+
return result
901+
834902
def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields):
835903
"""
836904
Internal helper to insert a batch of rows.
@@ -846,6 +914,10 @@ def _insert_rows(self, rows, replace, skip_duplicates, ignore_extra_fields):
846914
ignore_extra_fields : bool
847915
If True, ignore unknown fields.
848916
"""
917+
if skip_duplicates == "match":
918+
rows = self._filter_match_duplicates(list(rows))
919+
skip_duplicates = False
920+
849921
# collects the field list from first row (passed by reference)
850922
field_list = []
851923
rows = list(self.__make_row_to_insert(row, field_list, ignore_extra_fields) for row in rows)

0 commit comments

Comments
 (0)