Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion graph/examples/append_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ pub fn main() -> anyhow::Result<()> {
};
mods.push(md);
}
let mut group = RowGroup::new(THING_TYPE.clone(), false);
let mut group = RowGroup::new(
THING_TYPE.clone(),
false,
false,
slog::Logger::root(slog::Discard, slog::o!()),
);

let start = Instant::now();
for md in mods {
Expand Down
151 changes: 142 additions & 9 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{
sync::Arc,
};

use slog::{warn, Logger};

use crate::{
blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime},
cheap_clone::CheapClone,
Expand Down Expand Up @@ -344,18 +346,27 @@ pub struct RowGroup {
rows: Vec<EntityModification>,

immutable: bool,
skip_duplicates: bool,
logger: Logger,

/// Map the `key.entity_id` of all entries in `rows` to the index with
/// the most recent entry for that id to speed up lookups.
last_mod: LastMod,
}

impl RowGroup {
pub fn new(entity_type: EntityType, immutable: bool) -> Self {
pub fn new(
entity_type: EntityType,
immutable: bool,
skip_duplicates: bool,
logger: Logger,
) -> Self {
Self {
entity_type,
rows: Vec::new(),
immutable,
skip_duplicates,
logger,
last_mod: LastMod::new(),
}
}
Expand Down Expand Up @@ -488,6 +499,13 @@ impl RowGroup {
.and_then(|&idx| self.rows.get(idx))
{
Some(prev) if prev.block() != row.block() => {
if self.skip_duplicates {
warn!(self.logger, "Skipping duplicate insert for immutable entity";
"entity" => row.key().to_string(),
"block" => row.block(),
"previous_block" => prev.block());
return Ok(());
}
return Err(StoreError::Input(
format!("entity {} is immutable; inserting it at block {} is not possible as it was already inserted at block {}",
row.key(), row.block(), prev.block())));
Expand All @@ -498,6 +516,12 @@ impl RowGroup {
self.push_row(row);
}
EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => {
if self.skip_duplicates {
warn!(self.logger, "Skipping unsupported operation for immutable entity";
"entity_type" => self.entity_type.to_string(),
"operation" => format!("{:?}", row));
return Ok(());
}
return Err(internal_error!(
"immutable entity type {} only allows inserts, not {:?}",
self.entity_type,
Expand Down Expand Up @@ -604,8 +628,18 @@ impl RowGroup {
pub struct RowGroupForPerfTest(RowGroup);

impl RowGroupForPerfTest {
pub fn new(entity_type: EntityType, immutable: bool) -> Self {
Self(RowGroup::new(entity_type, immutable))
pub fn new(
entity_type: EntityType,
immutable: bool,
skip_duplicates: bool,
logger: Logger,
) -> Self {
Self(RowGroup::new(
entity_type,
immutable,
skip_duplicates,
logger,
))
}

pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> {
Expand Down Expand Up @@ -661,11 +695,15 @@ impl<'a> Iterator for ClampsByBlockIterator<'a> {
#[derive(Debug, CacheWeight)]
pub struct RowGroups {
pub groups: Vec<RowGroup>,
logger: Logger,
}

impl RowGroups {
fn new() -> Self {
Self { groups: Vec::new() }
fn new(logger: Logger) -> Self {
Self {
groups: Vec::new(),
logger,
}
}

fn group(&self, entity_type: &EntityType) -> Option<&RowGroup> {
Expand All @@ -685,8 +723,13 @@ impl RowGroups {
Some(pos) => &mut self.groups[pos],
None => {
let immutable = entity_type.is_immutable();
self.groups
.push(RowGroup::new(entity_type.clone(), immutable));
let skip_duplicates = entity_type.skip_duplicates();
self.groups.push(RowGroup::new(
entity_type.clone(),
immutable,
skip_duplicates,
self.logger.clone(),
));
// unwrap: we just pushed an entry
self.groups.last_mut().unwrap()
}
Expand Down Expand Up @@ -784,6 +827,7 @@ impl Batch {
deterministic_errors: Vec<SubgraphError>,
offchain_to_remove: Vec<StoredDynamicDataSource>,
is_non_fatal_errors_active: bool,
logger: Logger,
) -> Result<Self, StoreError> {
let block = block_ptr.number;

Expand All @@ -797,7 +841,7 @@ impl Batch {
EntityModification::Remove { .. } => 0,
});

let mut mods = RowGroups::new();
let mut mods = RowGroups::new(logger);

for m in raw_mods {
mods.group_entry(&m.key().entity_type).push(m, block)?;
Expand Down Expand Up @@ -1048,6 +1092,8 @@ mod test {
};
use lazy_static::lazy_static;

use slog::Logger;

use super::{LastMod, RowGroup};

#[track_caller]
Expand Down Expand Up @@ -1080,6 +1126,8 @@ mod test {
entity_type: ENTRY_TYPE.clone(),
rows,
immutable: false,
skip_duplicates: false,
logger: Logger::root(slog::Discard, slog::o!()),
last_mod,
};
let act = group
Expand Down Expand Up @@ -1120,6 +1168,8 @@ mod test {
type Thing @entity { id: ID!, count: Int! }
type RowGroup @entity { id: ID! }
type Entry @entity { id: ID! }
type ImmThing @entity(immutable: true) { id: ID!, count: Int! }
type SkipDupThing @entity(immutable: true, skipDuplicates: true) { id: ID!, count: Int! }
"#;
lazy_static! {
static ref DEPLOYMENT: DeploymentHash = DeploymentHash::new("batchAppend").unwrap();
Expand All @@ -1128,6 +1178,8 @@ mod test {
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
static ref ROW_GROUP_TYPE: EntityType = SCHEMA.entity_type("RowGroup").unwrap();
static ref ENTRY_TYPE: EntityType = SCHEMA.entity_type("Entry").unwrap();
static ref IMM_THING_TYPE: EntityType = SCHEMA.entity_type("ImmThing").unwrap();
static ref SKIP_DUP_THING_TYPE: EntityType = SCHEMA.entity_type("SkipDupThing").unwrap();
}

/// Convenient notation for changes to a fixed entity
Expand Down Expand Up @@ -1187,7 +1239,12 @@ mod test {
impl Group {
fn new() -> Self {
Self {
group: RowGroup::new(THING_TYPE.clone(), false),
group: RowGroup::new(
THING_TYPE.clone(),
false,
false,
Logger::root(slog::Discard, slog::o!()),
),
}
}

Expand Down Expand Up @@ -1292,4 +1349,80 @@ mod test {
let op = group.last_op(&key, 0);
assert_eq!(None, op);
}

fn make_insert(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification {
EntityModification::Insert {
key: entity_type.parse_key(id).unwrap(),
data: Arc::new(entity! { SCHEMA => id: id, count: block }),
block,
end: None,
}
}

fn make_overwrite(
entity_type: &EntityType,
id: &str,
block: BlockNumber,
) -> EntityModification {
EntityModification::Overwrite {
key: entity_type.parse_key(id).unwrap(),
data: Arc::new(entity! { SCHEMA => id: id, count: block }),
block,
end: None,
}
}

fn make_remove(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification {
EntityModification::Remove {
key: entity_type.parse_key(id).unwrap(),
block,
}
}

fn discard_logger() -> Logger {
Logger::root(slog::Discard, slog::o!())
}

#[test]
fn append_row_immutable_default_rejects_cross_block_duplicate() {
let mut group = RowGroup::new(IMM_THING_TYPE.clone(), true, false, discard_logger());
let res = group.push(make_insert(&IMM_THING_TYPE, "one", 1), 1);
assert!(res.is_ok());
let res = group.push(make_insert(&IMM_THING_TYPE, "one", 2), 2);
assert!(res.is_err());
}

#[test]
fn append_row_skip_duplicates_allows_cross_block_duplicate() {
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger());
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
assert!(res.is_ok());
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 2), 2);
assert!(res.is_ok());
assert_eq!(group.row_count(), 1);
}

#[test]
fn append_row_skip_duplicates_allows_overwrite() {
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger());
let res = group.append_row(make_overwrite(&SKIP_DUP_THING_TYPE, "one", 1));
assert!(res.is_ok());
}

#[test]
fn append_row_skip_duplicates_allows_remove() {
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger());
let res = group.append_row(make_remove(&SKIP_DUP_THING_TYPE, "one", 1));
assert!(res.is_ok());
}

#[test]
fn append_row_skip_duplicates_same_block_still_pushes() {
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger());
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
assert!(res.is_ok());
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
assert!(res.is_ok());
assert_eq!(group.row_count(), 2);
}
}
4 changes: 4 additions & 0 deletions graph/src/schema/entity_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl EntityType {
self.schema.is_immutable(self.atom)
}

pub fn skip_duplicates(&self) -> bool {
self.schema.skip_duplicates(self.atom)
}

pub fn id_type(&self) -> Result<IdType, Error> {
self.schema.id_type(self.atom)
}
Expand Down
Loading
Loading