diff --git a/graph/examples/append_row.rs b/graph/examples/append_row.rs index f272c07cf82..ac7ff12ec93 100644 --- a/graph/examples/append_row.rs +++ b/graph/examples/append_row.rs @@ -104,7 +104,12 @@ pub fn main() -> anyhow::Result<()> { }; mods.push(md); } - let mut group = RowGroup::new(THING_TYPE.clone(), false); + let mut group = RowGroup::new( + THING_TYPE.clone(), + false, + false, + slog::Logger::root(slog::Discard, slog::o!()), + ); let start = Instant::now(); for md in mods { diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 8c9671cb347..95244cde1fe 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -4,6 +4,8 @@ use std::{ sync::Arc, }; +use slog::{warn, Logger}; + use crate::{ blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime}, cheap_clone::CheapClone, @@ -344,6 +346,8 @@ pub struct RowGroup { rows: Vec, immutable: bool, + skip_duplicates: bool, + logger: Logger, /// Map the `key.entity_id` of all entries in `rows` to the index with /// the most recent entry for that id to speed up lookups. @@ -351,11 +355,18 @@ pub struct RowGroup { } impl RowGroup { - pub fn new(entity_type: EntityType, immutable: bool) -> Self { + pub fn new( + entity_type: EntityType, + immutable: bool, + skip_duplicates: bool, + logger: Logger, + ) -> Self { Self { entity_type, rows: Vec::new(), immutable, + skip_duplicates, + logger, last_mod: LastMod::new(), } } @@ -488,6 +499,13 @@ impl RowGroup { .and_then(|&idx| self.rows.get(idx)) { Some(prev) if prev.block() != row.block() => { + if self.skip_duplicates { + warn!(self.logger, "Skipping duplicate insert for immutable entity"; + "entity" => row.key().to_string(), + "block" => row.block(), + "previous_block" => prev.block()); + return Ok(()); + } return Err(StoreError::Input( format!("entity {} is immutable; inserting it at block {} is not possible as it was already inserted at block {}", row.key(), row.block(), prev.block()))); @@ -498,6 +516,12 @@ impl RowGroup { self.push_row(row); } EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => { + if self.skip_duplicates { + warn!(self.logger, "Skipping unsupported operation for immutable entity"; + "entity_type" => self.entity_type.to_string(), + "operation" => format!("{:?}", row)); + return Ok(()); + } return Err(internal_error!( "immutable entity type {} only allows inserts, not {:?}", self.entity_type, @@ -604,8 +628,18 @@ impl RowGroup { pub struct RowGroupForPerfTest(RowGroup); impl RowGroupForPerfTest { - pub fn new(entity_type: EntityType, immutable: bool) -> Self { - Self(RowGroup::new(entity_type, immutable)) + pub fn new( + entity_type: EntityType, + immutable: bool, + skip_duplicates: bool, + logger: Logger, + ) -> Self { + Self(RowGroup::new( + entity_type, + immutable, + skip_duplicates, + logger, + )) } pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> { @@ -661,11 +695,15 @@ impl<'a> Iterator for ClampsByBlockIterator<'a> { #[derive(Debug, CacheWeight)] pub struct RowGroups { pub groups: Vec, + logger: Logger, } impl RowGroups { - fn new() -> Self { - Self { groups: Vec::new() } + fn new(logger: Logger) -> Self { + Self { + groups: Vec::new(), + logger, + } } fn group(&self, entity_type: &EntityType) -> Option<&RowGroup> { @@ -685,8 +723,13 @@ impl RowGroups { Some(pos) => &mut self.groups[pos], None => { let immutable = entity_type.is_immutable(); - self.groups - .push(RowGroup::new(entity_type.clone(), immutable)); + let skip_duplicates = entity_type.skip_duplicates(); + self.groups.push(RowGroup::new( + entity_type.clone(), + immutable, + skip_duplicates, + self.logger.clone(), + )); // unwrap: we just pushed an entry self.groups.last_mut().unwrap() } @@ -784,6 +827,7 @@ impl Batch { deterministic_errors: Vec, offchain_to_remove: Vec, is_non_fatal_errors_active: bool, + logger: Logger, ) -> Result { let block = block_ptr.number; @@ -797,7 +841,7 @@ impl Batch { EntityModification::Remove { .. } => 0, }); - let mut mods = RowGroups::new(); + let mut mods = RowGroups::new(logger); for m in raw_mods { mods.group_entry(&m.key().entity_type).push(m, block)?; @@ -1048,6 +1092,8 @@ mod test { }; use lazy_static::lazy_static; + use slog::Logger; + use super::{LastMod, RowGroup}; #[track_caller] @@ -1080,6 +1126,8 @@ mod test { entity_type: ENTRY_TYPE.clone(), rows, immutable: false, + skip_duplicates: false, + logger: Logger::root(slog::Discard, slog::o!()), last_mod, }; let act = group @@ -1120,6 +1168,8 @@ mod test { type Thing @entity { id: ID!, count: Int! } type RowGroup @entity { id: ID! } type Entry @entity { id: ID! } + type ImmThing @entity(immutable: true) { id: ID!, count: Int! } + type SkipDupThing @entity(immutable: true, skipDuplicates: true) { id: ID!, count: Int! } "#; lazy_static! { static ref DEPLOYMENT: DeploymentHash = DeploymentHash::new("batchAppend").unwrap(); @@ -1128,6 +1178,8 @@ mod test { static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap(); static ref ROW_GROUP_TYPE: EntityType = SCHEMA.entity_type("RowGroup").unwrap(); static ref ENTRY_TYPE: EntityType = SCHEMA.entity_type("Entry").unwrap(); + static ref IMM_THING_TYPE: EntityType = SCHEMA.entity_type("ImmThing").unwrap(); + static ref SKIP_DUP_THING_TYPE: EntityType = SCHEMA.entity_type("SkipDupThing").unwrap(); } /// Convenient notation for changes to a fixed entity @@ -1187,7 +1239,12 @@ mod test { impl Group { fn new() -> Self { Self { - group: RowGroup::new(THING_TYPE.clone(), false), + group: RowGroup::new( + THING_TYPE.clone(), + false, + false, + Logger::root(slog::Discard, slog::o!()), + ), } } @@ -1292,4 +1349,80 @@ mod test { let op = group.last_op(&key, 0); assert_eq!(None, op); } + + fn make_insert(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification { + EntityModification::Insert { + key: entity_type.parse_key(id).unwrap(), + data: Arc::new(entity! { SCHEMA => id: id, count: block }), + block, + end: None, + } + } + + fn make_overwrite( + entity_type: &EntityType, + id: &str, + block: BlockNumber, + ) -> EntityModification { + EntityModification::Overwrite { + key: entity_type.parse_key(id).unwrap(), + data: Arc::new(entity! { SCHEMA => id: id, count: block }), + block, + end: None, + } + } + + fn make_remove(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification { + EntityModification::Remove { + key: entity_type.parse_key(id).unwrap(), + block, + } + } + + fn discard_logger() -> Logger { + Logger::root(slog::Discard, slog::o!()) + } + + #[test] + fn append_row_immutable_default_rejects_cross_block_duplicate() { + let mut group = RowGroup::new(IMM_THING_TYPE.clone(), true, false, discard_logger()); + let res = group.push(make_insert(&IMM_THING_TYPE, "one", 1), 1); + assert!(res.is_ok()); + let res = group.push(make_insert(&IMM_THING_TYPE, "one", 2), 2); + assert!(res.is_err()); + } + + #[test] + fn append_row_skip_duplicates_allows_cross_block_duplicate() { + let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger()); + let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1); + assert!(res.is_ok()); + let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 2), 2); + assert!(res.is_ok()); + assert_eq!(group.row_count(), 1); + } + + #[test] + fn append_row_skip_duplicates_allows_overwrite() { + let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger()); + let res = group.append_row(make_overwrite(&SKIP_DUP_THING_TYPE, "one", 1)); + assert!(res.is_ok()); + } + + #[test] + fn append_row_skip_duplicates_allows_remove() { + let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger()); + let res = group.append_row(make_remove(&SKIP_DUP_THING_TYPE, "one", 1)); + assert!(res.is_ok()); + } + + #[test] + fn append_row_skip_duplicates_same_block_still_pushes() { + let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger()); + let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1); + assert!(res.is_ok()); + let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1); + assert!(res.is_ok()); + assert_eq!(group.row_count(), 2); + } } diff --git a/graph/src/schema/entity_type.rs b/graph/src/schema/entity_type.rs index deb3cb3d8ef..c46f68681c9 100644 --- a/graph/src/schema/entity_type.rs +++ b/graph/src/schema/entity_type.rs @@ -68,6 +68,10 @@ impl EntityType { self.schema.is_immutable(self.atom) } + pub fn skip_duplicates(&self) -> bool { + self.schema.skip_duplicates(self.atom) + } + pub fn id_type(&self) -> Result { self.schema.id_type(self.atom) } diff --git a/graph/src/schema/input/mod.rs b/graph/src/schema/input/mod.rs index 2ec52fe1762..e8c8791dd82 100644 --- a/graph/src/schema/input/mod.rs +++ b/graph/src/schema/input/mod.rs @@ -42,6 +42,7 @@ pub mod kw { pub const ENTITY: &str = "entity"; pub const IMMUTABLE: &str = "immutable"; pub const TIMESERIES: &str = "timeseries"; + pub const SKIP_DUPLICATES: &str = "skipDuplicates"; pub const TIMESTAMP: &str = "timestamp"; pub const AGGREGATE: &str = "aggregate"; pub const AGGREGATION: &str = "aggregation"; @@ -131,6 +132,14 @@ impl TypeInfo { } } + fn skip_duplicates(&self) -> bool { + match self { + TypeInfo::Object(obj_type) => obj_type.immutable && obj_type.skip_duplicates, + TypeInfo::Interface(_) => false, + TypeInfo::Aggregation(_) => false, + } + } + fn kind(&self) -> TypeKind { match self { TypeInfo::Object(_) => TypeKind::Object, @@ -414,6 +423,7 @@ pub struct ObjectType { /// is part of an aggregation aggregation: Option, pub timeseries: bool, + pub skip_duplicates: bool, interfaces: Box<[Word]>, shared_interfaces: Box<[Atom]>, } @@ -453,6 +463,10 @@ impl ObjectType { None => timeseries, _ => unreachable!("validations ensure we don't get here"), }; + let skip_duplicates = match dir.argument(kw::SKIP_DUPLICATES) { + Some(Value::Boolean(sd)) => *sd, + _ => false, + }; Self { name, fields, @@ -460,6 +474,7 @@ impl ObjectType { immutable, aggregation: None, timeseries, + skip_duplicates, interfaces, shared_interfaces, } @@ -491,6 +506,7 @@ impl ObjectType { immutable: false, aggregation: None, timeseries: false, + skip_duplicates: false, fields, shared_interfaces: Box::new([]), } @@ -889,6 +905,7 @@ impl Aggregation { immutable: true, aggregation: Some(name), timeseries: false, + skip_duplicates: false, interfaces: Box::new([]), shared_interfaces: Box::new([]), } @@ -1203,6 +1220,13 @@ impl InputSchema { .unwrap_or(false) } + pub(in crate::schema) fn skip_duplicates(&self, entity_type: Atom) -> bool { + self.type_info(entity_type) + .ok() + .map(|ti| ti.skip_duplicates()) + .unwrap_or(false) + } + /// Return true if `type_name` is the name of an object or interface type pub fn is_reference(&self, type_name: &str) -> bool { self.inner @@ -2050,6 +2074,15 @@ mod validations { Ok(b) => b.unwrap_or(timeseries), Err(e) => return Some(e), }; + let skip_duplicates = match bool_arg(dir, kw::SKIP_DUPLICATES) { + Ok(b) => b.unwrap_or(false), + Err(e) => return Some(e), + }; + if skip_duplicates && !immutable { + return Some(SchemaValidationError::SkipDuplicatesRequiresImmutable( + object_type.name.clone(), + )); + } if timeseries { if !immutable { Some(SchemaValidationError::MutableTimeseries( @@ -3154,6 +3187,43 @@ type Gravatar @entity { } } } + + #[test] + fn validate_entity_directives_skip_duplicates_non_boolean() { + let schema = + parse("type Foo @entity(immutable: true, skipDuplicates: \"yes\") { id: ID! }"); + let errors = validate(&schema).unwrap_err(); + assert!( + errors.contains(&SchemaValidationError::EntityDirectiveNonBooleanArgValue( + "skipDuplicates".to_string() + )), + "expected EntityDirectiveNonBooleanArgValue for non-boolean skipDuplicates, got: {errors:?}" + ); + } + + #[test] + fn validate_entity_directives_skip_duplicates_requires_immutable() { + let schema = parse("type Foo @entity(skipDuplicates: true) { id: ID! }"); + let errors = validate(&schema).unwrap_err(); + assert!( + errors.contains(&SchemaValidationError::SkipDuplicatesRequiresImmutable( + "Foo".to_string() + )), + "expected SkipDuplicatesRequiresImmutable for mutable entity, got: {errors:?}" + ); + } + + #[test] + fn validate_entity_directives_timeseries_skip_duplicates_valid() { + let schema = parse( + "type Foo @entity(timeseries: true, skipDuplicates: true) { id: Int8! timestamp: Timestamp! }", + ); + let result = validate(&schema); + assert!( + result.is_ok(), + "expected timeseries + skipDuplicates to pass validation, got: {result:?}" + ); + } } } diff --git a/graph/src/schema/mod.rs b/graph/src/schema/mod.rs index 1e40299df63..52a440fc04f 100644 --- a/graph/src/schema/mod.rs +++ b/graph/src/schema/mod.rs @@ -148,6 +148,8 @@ pub enum SchemaValidationError { AggregationDerivedField(String, String), #[error("Timeseries {0} is marked as mutable, it must be immutable")] MutableTimeseries(String), + #[error("Entity type `{0}` has skipDuplicates: true but is not immutable; skipDuplicates requires immutable: true")] + SkipDuplicatesRequiresImmutable(String), #[error("Timeseries {0} is missing a `timestamp` field")] TimeseriesMissingTimestamp(String), #[error("Type {0} has a `timestamp` field of type {1}, but it must be of type Timestamp")] diff --git a/graph/src/util/cache_weight.rs b/graph/src/util/cache_weight.rs index 077db9a51ce..b560906fdc0 100644 --- a/graph/src/util/cache_weight.rs +++ b/graph/src/util/cache_weight.rs @@ -188,6 +188,12 @@ impl CacheWeight for EntityType { } } +impl CacheWeight for slog::Logger { + fn indirect_weight(&self) -> usize { + 0 + } +} + impl CacheWeight for [u8; 32] { fn indirect_weight(&self) -> usize { 0 diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d361bbe9c56..3f82149eca7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,6 +258,15 @@ importers: specifier: 0.31.0 version: 0.31.0 + tests/runner-tests/skip-duplicates: + devDependencies: + '@graphprotocol/graph-cli': + specifier: 0.60.0 + version: 0.60.0(@types/node@24.3.0)(bufferutil@4.0.9)(encoding@0.1.13)(node-fetch@2.7.0(encoding@0.1.13))(typescript@5.9.2)(utf-8-validate@5.0.10) + '@graphprotocol/graph-ts': + specifier: 0.31.0 + version: 0.31.0 + tests/runner-tests/typename: devDependencies: '@graphprotocol/graph-cli': @@ -2678,11 +2687,6 @@ packages: engines: {node: '>=10'} hasBin: true - semver@7.6.3: - resolution: {integrity: sha512-oVekP1cKtI+CTDvHWYFUcMtsK/00wmAEfyqKfNdARm8u1wNVhSgaX7A8d4UuIlUI5e84iEwOhs7ZPYRmzU9U6A==} - engines: {node: '>=10'} - hasBin: true - semver@7.7.3: resolution: {integrity: sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==} engines: {node: '>=10'} @@ -3739,7 +3743,7 @@ snapshots: chalk: 4.1.2 clean-stack: 3.0.1 cli-progress: 3.12.0 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) ejs: 3.1.10 get-package-type: 0.1.0 globby: 11.1.0 @@ -3812,7 +3816,7 @@ snapshots: chalk: 4.1.2 clean-stack: 3.0.1 cli-progress: 3.12.0 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) ejs: 3.1.10 fs-extra: 9.1.0 get-package-type: 0.1.0 @@ -3824,7 +3828,7 @@ snapshots: natural-orderby: 2.0.3 object-treeify: 1.1.33 password-prompt: 1.1.3 - semver: 7.6.3 + semver: 7.7.4 string-width: 4.2.3 strip-ansi: 6.0.1 supports-color: 8.1.1 @@ -3886,7 +3890,7 @@ snapshots: dependencies: '@oclif/core': 2.16.0(@types/node@24.3.0)(typescript@5.9.2) chalk: 4.1.2 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) transitivePeerDependencies: - '@swc/core' - '@swc/wasm' @@ -4583,11 +4587,9 @@ snapshots: optionalDependencies: supports-color: 8.1.1 - debug@4.4.1(supports-color@8.1.1): + debug@4.4.1: dependencies: ms: 2.1.3 - optionalDependencies: - supports-color: 8.1.1 debug@4.4.3(supports-color@8.1.1): dependencies: @@ -4664,7 +4666,7 @@ snapshots: dns-over-http-resolver@1.2.3(node-fetch@2.7.0(encoding@0.1.13)): dependencies: - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) native-fetch: 3.0.0(node-fetch@2.7.0(encoding@0.1.13)) receptacle: 1.3.2 transitivePeerDependencies: @@ -5272,7 +5274,7 @@ snapshots: any-signal: 2.1.2 blob-to-it: 1.0.4 browser-readablestream-to-it: 1.0.3 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) err-code: 3.0.1 ipfs-core-types: 0.9.0(node-fetch@2.7.0(encoding@0.1.13)) ipfs-unixfs: 6.0.9 @@ -5301,7 +5303,7 @@ snapshots: '@ipld/dag-pb': 2.1.18 abort-controller: 3.0.0 any-signal: 2.1.2 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.1 err-code: 3.0.1 ipfs-core-types: 0.9.0(node-fetch@2.7.0(encoding@0.1.13)) ipfs-core-utils: 0.13.0(encoding@0.1.13)(node-fetch@2.7.0(encoding@0.1.13)) @@ -6170,8 +6172,6 @@ snapshots: dependencies: lru-cache: 6.0.0 - semver@7.6.3: {} - semver@7.7.3: {} semver@7.7.4: {} diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 4d8df5c3a28..620732a4bae 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -360,7 +360,7 @@ impl DeploymentStore { // Clamp entities before inserting them to avoid having versions // with overlapping block ranges let section = stopwatch.start_section("apply_entity_modifications_delete"); - layout.delete(conn, group, stopwatch).await?; + layout.delete(logger, conn, group, stopwatch).await?; section.end(); let section = stopwatch.start_section("check_interface_entity_uniqueness"); diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index efdfe6a5d8d..28c1f78c27c 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -373,6 +373,7 @@ impl Layout { position: position as u32, is_account_like: false, immutable: false, + skip_duplicates: false, has_causality_region: false, } } @@ -769,44 +770,60 @@ impl Layout { // We insert the entities in chunks to make sure each operation does // not exceed the maximum number of bindings allowed in queries let chunk_size = InsertQuery::chunk_size(table); + let mut affected_rows: usize = 0; + let mut expected_rows: usize = 0; for chunk in group.write_chunks(chunk_size) { // Empty chunks would lead to invalid SQL if !chunk.is_empty() { - if let Err(e) = InsertQuery::new(table, &chunk)?.execute(conn).await { - // We occasionally get these errors but it's entirely - // unclear what causes them. We work around that by - // switching to row-by-row inserts until we can figure - // out what the underlying cause is - let err_msg = e.to_string(); - if !err_msg.contains("value too large to transmit") { - let (block, msg) = chunk_details(&chunk); - return Err(StoreError::write_failure( - e, - table.object.as_str(), - block, - msg, - )); + expected_rows += chunk.len(); + match InsertQuery::new(table, &chunk)?.execute(conn).await { + Ok(count) => { + affected_rows += count; } - let (block, msg) = chunk_details(&chunk); - warn!(logger, "Insert of entire chunk failed. Trying row by row insert."; - "table" => table.object.as_str(), - "block" => block, - "error" => err_msg, - "details" => msg - ); - for single_chunk in chunk.as_single_writes() { - InsertQuery::new(table, &single_chunk)? - .execute(conn) - .await - .map_err(|e| { - let (block, msg) = chunk_details(&single_chunk); - let msg = format!("{}: offending row {:?}", msg, single_chunk); - StoreError::write_failure(e, table.object.as_str(), block, msg) - })?; + Err(e) => { + // We occasionally get these errors but it's entirely + // unclear what causes them. We work around that by + // switching to row-by-row inserts until we can figure + // out what the underlying cause is + let err_msg = e.to_string(); + if !err_msg.contains("value too large to transmit") { + let (block, msg) = chunk_details(&chunk); + return Err(StoreError::write_failure( + e, + table.object.as_str(), + block, + msg, + )); + } + let (block, msg) = chunk_details(&chunk); + warn!(logger, "Insert of entire chunk failed. Trying row by row insert."; + "table" => table.object.as_str(), + "block" => block, + "error" => err_msg, + "details" => msg + ); + for single_chunk in chunk.as_single_writes() { + let count = InsertQuery::new(table, &single_chunk)? + .execute(conn) + .await + .map_err(|e| { + let (block, msg) = chunk_details(&single_chunk); + let msg = format!("{}: offending row {:?}", msg, single_chunk); + StoreError::write_failure(e, table.object.as_str(), block, msg) + })?; + affected_rows += count; + } } } } } + if affected_rows < expected_rows && table.immutable && table.skip_duplicates { + warn!(logger, "Cross-batch duplicate inserts skipped by ON CONFLICT DO NOTHING"; + "entity_type" => table.object.as_str(), + "expected_rows" => expected_rows, + "affected_rows" => affected_rows, + "skipped" => expected_rows - affected_rows); + } Ok(()) } @@ -943,12 +960,20 @@ impl Layout { pub async fn update<'a>( &'a self, + logger: &Logger, conn: &mut AsyncPgConnection, group: &'a RowGroup, stopwatch: &StopwatchMetrics, ) -> Result { let table = self.table_for_entity(&group.entity_type)?; if table.immutable && group.has_clamps() { + if table.skip_duplicates { + let ids = group.ids().join(", "); + warn!(logger, "Skipping immutable entity update in store layer"; + "entity_type" => group.entity_type.to_string(), + "ids" => ids); + return Ok(0); + } let ids = group .ids() .map(|id| id.to_string()) @@ -990,6 +1015,7 @@ impl Layout { pub async fn delete( &self, + logger: &Logger, conn: &mut AsyncPgConnection, group: &RowGroup, stopwatch: &StopwatchMetrics, @@ -1014,6 +1040,13 @@ impl Layout { let table = self.table_for_entity(&group.entity_type)?; if table.immutable { + if table.skip_duplicates { + let ids = group.ids().join(", "); + warn!(logger, "Skipping immutable entity delete in store layer"; + "entity_type" => group.entity_type.to_string(), + "ids" => ids); + return Ok(0); + } return Err(internal_error!( "entities of type `{}` can not be deleted since they are immutable. Entity ids are [{}]", table.object, group.ids().join(", ") @@ -1662,6 +1695,8 @@ pub struct Table { /// deleted pub(crate) immutable: bool, + pub(crate) skip_duplicates: bool, + /// Whether this table has an explicit `causality_region` column. If `false`, then the column is /// not present and the causality region for all rows is implicitly `0` (equivalent to CasualityRegion::ONCHAIN). pub(crate) has_causality_region: bool, @@ -1692,6 +1727,7 @@ impl Table { .collect::, StoreError>>()?; let qualified_name = SqlName::qualified_name(&catalog.site.namespace, &table_name); let immutable = defn.is_immutable(); + let skip_duplicates = defn.skip_duplicates(); let nsp = catalog.site.namespace.clone(); let table = Table { object: defn.cheap_clone(), @@ -1705,6 +1741,7 @@ impl Table { columns, position, immutable, + skip_duplicates, has_causality_region, }; Ok(table) @@ -1722,6 +1759,7 @@ impl Table { is_account_like: self.is_account_like, position: self.position, immutable: self.immutable, + skip_duplicates: self.skip_duplicates, has_causality_region: self.has_causality_region, }; diff --git a/store/postgres/src/relational/query_tests.rs b/store/postgres/src/relational/query_tests.rs index 1b68ae5d0cc..eb3159df590 100644 --- a/store/postgres/src/relational/query_tests.rs +++ b/store/postgres/src/relational/query_tests.rs @@ -2,16 +2,19 @@ use std::{collections::BTreeSet, sync::Arc}; use diesel::{debug_query, pg::Pg}; use graph::{ + components::store::write::RowGroup, data_source::CausalityRegion, - prelude::{r, serde_json as json, DeploymentHash, EntityFilter}, + entity, + prelude::{r, serde_json as json, DeploymentHash, EntityFilter, Logger}, schema::InputSchema, + slog, }; use crate::{ block_range::BoundSide, layout_for_tests::{make_dummy_site, Namespace}, relational::{Catalog, ColumnType, Layout}, - relational_queries::{FindRangeQuery, FromColumnValue}, + relational_queries::{FindRangeQuery, FromColumnValue, InsertQuery}, }; use crate::relational_queries::Filter; @@ -197,3 +200,64 @@ fn test_id_type_casting(table: &crate::relational::Table, expected_cast: &str, t sql ); } + +fn insert_sql_for_schema(gql: &str, entity_type_name: &str) -> String { + use graph::components::store::write::EntityModification; + + let layout = test_layout(gql); + let schema = &layout.input_schema; + let et = schema.entity_type(entity_type_name).unwrap(); + let table = layout.table_for_entity(&et).unwrap(); + + let mut entity = entity! { schema => id: "test1" }; + entity.set_vid(1).unwrap(); + let key = et.key(graph::data::store::Id::String("test1".into())); + let emod = EntityModification::Insert { + key, + data: Arc::new(entity), + block: 1, + end: None, + }; + + let logger = Logger::root(slog::Discard, slog::o!()); + let mut group = RowGroup::new(et, table.immutable, table.skip_duplicates, logger); + group.push(emod, 1).unwrap(); + + let chunks: Vec<_> = group.write_chunks(100).collect(); + let chunk = &chunks[0]; + let query = InsertQuery::new(table.as_ref(), chunk).unwrap(); + debug_query::(&query).to_string() +} + +#[test] +fn skip_duplicates_insert_generates_on_conflict() { + let schema = " + type Thing @entity(immutable: true, skipDuplicates: true) { + id: String! + }"; + let sql = insert_sql_for_schema(schema, "Thing"); + assert!( + sql.contains("ON CONFLICT"), + "Expected ON CONFLICT in SQL for skip_duplicates immutable table, got: {}", + sql + ); + assert!( + sql.contains("DO NOTHING"), + "Expected DO NOTHING in SQL for skip_duplicates immutable table, got: {}", + sql + ); +} + +#[test] +fn default_immutable_insert_has_no_on_conflict_skip_duplicates() { + let schema = " + type Thing @entity(immutable: true) { + id: String! + }"; + let sql = insert_sql_for_schema(schema, "Thing"); + assert!( + !sql.contains("ON CONFLICT"), + "Default immutable table should NOT have ON CONFLICT, got: {}", + sql + ); +} diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 1c746f1338e..eee8b6073f1 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -2584,6 +2584,12 @@ impl<'a> QueryFragment for InsertQuery<'a> { out.push_sql(")"); } + if self.table.immutable && self.table.skip_duplicates { + out.push_sql("\n ON CONFLICT ("); + out.push_identifier(self.table.primary_key().name.as_str())?; + out.push_sql(") DO NOTHING"); + } + Ok(()) } } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 958ba2def76..680a217c8be 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -1839,6 +1839,7 @@ impl WritableStoreTrait for WritableStore { deterministic_errors, processed_data_sources, is_non_fatal_errors_active, + self.store.logger.clone(), )?; self.writer.write(batch, stopwatch).await?; diff --git a/store/test-store/tests/postgres/relational.rs b/store/test-store/tests/postgres/relational.rs index 4a029709408..7c586259ffe 100644 --- a/store/test-store/tests/postgres/relational.rs +++ b/store/test-store/tests/postgres/relational.rs @@ -128,6 +128,11 @@ const THINGS_GQL: &str = r#" order: Int, } + type SkipDupMink @entity(immutable: true, skipDuplicates: true) { + id: ID!, + order: Int, + } + type User @entity { id: ID!, name: String!, @@ -228,6 +233,7 @@ lazy_static! { static ref CAT_TYPE: EntityType = THINGS_SCHEMA.entity_type("Cat").unwrap(); static ref FERRET_TYPE: EntityType = THINGS_SCHEMA.entity_type("Ferret").unwrap(); static ref MINK_TYPE: EntityType = THINGS_SCHEMA.entity_type("Mink").unwrap(); + static ref SKIP_DUP_MINK_TYPE: EntityType = THINGS_SCHEMA.entity_type("SkipDupMink").unwrap(); static ref CHAIR_TYPE: EntityType = THINGS_SCHEMA.entity_type("Chair").unwrap(); static ref NULLABLE_STRINGS_TYPE: EntityType = THINGS_SCHEMA.entity_type("NullableStrings").unwrap(); @@ -315,7 +321,7 @@ async fn update_entity_at( ); let group = row_group_update(entity_type, block, entities_with_keys_owned.clone()); let updated = layout - .update(conn, &group, &MOCK_STOPWATCH) + .update(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect(&errmsg); assert_eq!(updated, entities_with_keys_owned.len()); @@ -643,7 +649,7 @@ async fn update() { let entities = vec![(key, entity.clone())]; let group = row_group_update(&entity_type, 0, entities); layout - .update(conn, &group, &MOCK_STOPWATCH) + .update(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect("Failed to update"); @@ -708,7 +714,7 @@ async fn update_many() { let entities: Vec<_> = keys.into_iter().zip(entities_vec.into_iter()).collect(); let group = row_group_update(&entity_type, 0, entities); layout - .update(conn, &group, &MOCK_STOPWATCH) + .update(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect("Failed to update"); @@ -780,7 +786,7 @@ async fn serialize_bigdecimal() { let entities = vec![(key, entity.clone())]; let group = row_group_update(&entity_type, 0, entities); layout - .update(conn, &group, &MOCK_STOPWATCH) + .update(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect("Failed to update"); @@ -870,7 +876,7 @@ async fn delete() { let mut entity_keys = vec![key]; let group = row_group_delete(&entity_type, 1, entity_keys.clone()); let count = layout - .delete(conn, &group, &MOCK_STOPWATCH) + .delete(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect("Failed to delete"); assert_eq!(0, count); @@ -884,7 +890,7 @@ async fn delete() { let group = row_group_delete(&entity_type, 1, entity_keys); let count = layout - .delete(conn, &group, &MOCK_STOPWATCH) + .delete(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect("Failed to delete"); assert_eq!(1, count); @@ -915,7 +921,7 @@ async fn insert_many_and_delete_many() { .collect(); let group = row_group_delete(&SCALAR_TYPE, 1, entity_keys); let num_removed = layout - .delete(conn, &group, &MOCK_STOPWATCH) + .delete(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect("Failed to delete"); assert_eq!(2, num_removed); @@ -996,7 +1002,12 @@ async fn conflicting_entity() { let fred = entity! { layout.input_schema => id: id.clone(), name: id.clone() }; let fred = Arc::new(fred); let types: Vec<_> = types.into_iter().cloned().collect(); - let mut group = RowGroup::new(entity_type.clone(), false); + let mut group = RowGroup::new( + entity_type.clone(), + false, + false, + slog::Logger::root(slog::Discard, slog::o!()), + ); group .push( EntityModification::Insert { @@ -2126,3 +2137,103 @@ async fn check_filters() { }) .await; } + +/// Create a RowGroup with Overwrite modifications for testing store-layer +/// immutability enforcement. Uses immutable=false on the RowGroup to bypass +/// write-batch validation (the store layer checks the Table's immutability). +fn store_layer_row_group_update( + entity_type: &EntityType, + block: BlockNumber, + data: impl IntoIterator, +) -> RowGroup { + let mut group = RowGroup::new( + entity_type.clone(), + false, + false, + slog::Logger::root(slog::Discard, slog::o!()), + ); + for (key, data) in data { + group + .push(EntityModification::overwrite(key, data, block), block) + .unwrap(); + } + group +} + +/// Create a RowGroup with Remove modifications for testing store-layer +/// immutability enforcement. Uses immutable=false on the RowGroup to bypass +/// write-batch validation (the store layer checks the Table's immutability). +fn store_layer_row_group_delete( + entity_type: &EntityType, + block: BlockNumber, + data: impl IntoIterator, +) -> RowGroup { + let mut group = RowGroup::new( + entity_type.clone(), + false, + false, + slog::Logger::root(slog::Discard, slog::o!()), + ); + for key in data { + group + .push(EntityModification::remove(key, block), block) + .unwrap(); + } + group +} + +#[graph::test] +async fn skip_duplicates_update_returns_ok() { + run_test(async |conn, layout| { + let entity = entity! { layout.input_schema => + id: "sd1", + order: 1, + vid: 0i64 + }; + let key = SKIP_DUP_MINK_TYPE.key(entity.id()); + let entities = vec![(key, entity)]; + let group = store_layer_row_group_update(&SKIP_DUP_MINK_TYPE, 1, entities); + let result = layout.update(&LOGGER, conn, &group, &MOCK_STOPWATCH).await; + assert_eq!(result.unwrap(), 0); + }) + .await; +} + +#[graph::test] +async fn skip_duplicates_delete_returns_ok() { + run_test(async |conn, layout| { + let key = SKIP_DUP_MINK_TYPE.parse_key("sd1").unwrap(); + let group = store_layer_row_group_delete(&SKIP_DUP_MINK_TYPE, 1, vec![key]); + let result = layout.delete(&LOGGER, conn, &group, &MOCK_STOPWATCH).await; + assert_eq!(result.unwrap(), 0); + }) + .await; +} + +#[graph::test] +async fn default_immutable_update_still_errors() { + run_test(async |conn, layout| { + let entity = entity! { layout.input_schema => + id: "m1", + order: 1, + vid: 0i64 + }; + let key = MINK_TYPE.key(entity.id()); + let entities = vec![(key, entity)]; + let group = store_layer_row_group_update(&MINK_TYPE, 1, entities); + let result = layout.update(&LOGGER, conn, &group, &MOCK_STOPWATCH).await; + assert!(result.is_err()); + }) + .await; +} + +#[graph::test] +async fn default_immutable_delete_still_errors() { + run_test(async |conn, layout| { + let key = MINK_TYPE.parse_key("m1").unwrap(); + let group = store_layer_row_group_delete(&MINK_TYPE, 1, vec![key]); + let result = layout.delete(&LOGGER, conn, &group, &MOCK_STOPWATCH).await; + assert!(result.is_err()); + }) + .await; +} diff --git a/store/test-store/tests/postgres/relational_bytes.rs b/store/test-store/tests/postgres/relational_bytes.rs index 470d4e17412..70c0b1e0dc3 100644 --- a/store/test-store/tests/postgres/relational_bytes.rs +++ b/store/test-store/tests/postgres/relational_bytes.rs @@ -85,7 +85,12 @@ pub fn row_group_update( block: BlockNumber, data: impl IntoIterator, ) -> RowGroup { - let mut group = RowGroup::new(entity_type.clone(), false); + let mut group = RowGroup::new( + entity_type.clone(), + false, + false, + slog::Logger::root(slog::Discard, slog::o!()), + ); for (key, data) in data { group .push(EntityModification::overwrite(key, data, block), block) @@ -99,7 +104,12 @@ pub fn row_group_insert( block: BlockNumber, data: impl IntoIterator, ) -> RowGroup { - let mut group = RowGroup::new(entity_type.clone(), false); + let mut group = RowGroup::new( + entity_type.clone(), + false, + false, + slog::Logger::root(slog::Discard, slog::o!()), + ); for (key, data) in data { group .push(EntityModification::insert(key, data, block), block) @@ -113,7 +123,12 @@ pub fn row_group_delete( block: BlockNumber, data: impl IntoIterator, ) -> RowGroup { - let mut group = RowGroup::new(entity_type.clone(), false); + let mut group = RowGroup::new( + entity_type.clone(), + false, + false, + slog::Logger::root(slog::Discard, slog::o!()), + ); for key in data { group .push(EntityModification::remove(key, block), block) @@ -350,7 +365,7 @@ async fn update() { let entities = vec![(key, entity.clone())]; let group = row_group_update(&entity_type, 1, entities); layout - .update(conn, &group, &MOCK_STOPWATCH) + .update(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect("Failed to update"); @@ -382,7 +397,7 @@ async fn delete() { let mut entity_keys = vec![key.clone()]; let group = row_group_delete(&entity_type, 1, entity_keys.clone()); let count = layout - .delete(conn, &group, &MOCK_STOPWATCH) + .delete(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect("Failed to delete"); assert_eq!(0, count); @@ -394,7 +409,7 @@ async fn delete() { .expect("Failed to update entity types"); let group = row_group_delete(&entity_type, 1, entity_keys); let count = layout - .delete(conn, &group, &MOCK_STOPWATCH) + .delete(&LOGGER, conn, &group, &MOCK_STOPWATCH) .await .expect("Failed to delete"); assert_eq!(1, count); diff --git a/tests/runner-tests/skip-duplicates/abis/Contract.abi b/tests/runner-tests/skip-duplicates/abis/Contract.abi new file mode 100644 index 00000000000..9d9f56b9263 --- /dev/null +++ b/tests/runner-tests/skip-duplicates/abis/Contract.abi @@ -0,0 +1,15 @@ +[ + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "testCommand", + "type": "string" + } + ], + "name": "TestEvent", + "type": "event" + } +] diff --git a/tests/runner-tests/skip-duplicates/package.json b/tests/runner-tests/skip-duplicates/package.json new file mode 100644 index 00000000000..da0ba3e3e87 --- /dev/null +++ b/tests/runner-tests/skip-duplicates/package.json @@ -0,0 +1,13 @@ +{ + "name": "skip-duplicates", + "version": "0.0.0", + "private": true, + "scripts": { + "codegen": "graph codegen --skip-migrations", + "deploy:test": "graph deploy test/skip-duplicates --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" + }, + "devDependencies": { + "@graphprotocol/graph-cli": "0.60.0", + "@graphprotocol/graph-ts": "0.31.0" + } +} diff --git a/tests/runner-tests/skip-duplicates/schema.graphql b/tests/runner-tests/skip-duplicates/schema.graphql new file mode 100644 index 00000000000..94c5a2862d0 --- /dev/null +++ b/tests/runner-tests/skip-duplicates/schema.graphql @@ -0,0 +1,4 @@ +type Ping @entity(immutable: true, skipDuplicates: true) { + id: ID! + value: String! +} diff --git a/tests/runner-tests/skip-duplicates/src/mapping.ts b/tests/runner-tests/skip-duplicates/src/mapping.ts new file mode 100644 index 00000000000..bf756595b5d --- /dev/null +++ b/tests/runner-tests/skip-duplicates/src/mapping.ts @@ -0,0 +1,8 @@ +import { ethereum } from "@graphprotocol/graph-ts"; +import { Ping } from "../generated/schema"; + +export function handleBlock(block: ethereum.Block): void { + let entity = new Ping("duplicate-entity"); + entity.value = "ping"; + entity.save(); +} diff --git a/tests/runner-tests/skip-duplicates/subgraph.yaml b/tests/runner-tests/skip-duplicates/subgraph.yaml new file mode 100644 index 00000000000..068301105c2 --- /dev/null +++ b/tests/runner-tests/skip-duplicates/subgraph.yaml @@ -0,0 +1,25 @@ +specVersion: 0.0.8 +schema: + file: ./schema.graphql +dataSources: + - kind: ethereum/contract + name: Contract + network: test + source: + address: "0x0000000000000000000000000000000000000000" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.7 + language: wasm/assemblyscript + entities: + - Ping + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlock + filter: + kind: polling + every: 1 + file: ./src/mapping.ts diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 27ffe9422f6..a215d270a5d 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -1381,3 +1381,40 @@ async fn aggregation_current_bucket() { }) ); } + +#[graph::test] +async fn skip_duplicates() { + let RunnerTestRecipe { stores, test_info } = + RunnerTestRecipe::new("skip_duplicates", "skip-duplicates").await; + + let blocks = { + let block_0 = genesis(); + let mut block_1 = empty_block(block_0.ptr(), test_ptr(1)); + push_test_polling_trigger(&mut block_1); + let mut block_2 = empty_block(block_1.ptr(), test_ptr(2)); + push_test_polling_trigger(&mut block_2); + let mut block_3 = empty_block(block_2.ptr(), test_ptr(3)); + push_test_polling_trigger(&mut block_3); + let mut block_4 = empty_block(block_3.ptr(), test_ptr(4)); + push_test_polling_trigger(&mut block_4); + vec![block_0, block_1, block_2, block_3, block_4] + }; + + let stop_block = blocks.last().unwrap().block.ptr(); + let chain = chain(&test_info.test_name, blocks, &stores, None).await; + let ctx = fixture::setup(&test_info, &stores, &chain, None, None).await; + + ctx.start_and_sync_to(stop_block).await; + + let query_res = ctx + .query(r#"{ ping(id: "duplicate-entity") { id, value } }"#) + .await + .unwrap(); + + assert_eq!( + query_res, + Some(object! { + ping: object! { id: "duplicate-entity", value: "ping" } + }) + ); +}