Skip to content

Commit b84a842

Browse files
prestwichclaude
andcommitted
test: add multi-block reorg and watermark min() coverage
Address review feedback: strengthen reorg integration tests with multi-block reverts and multiple-reorgs-between-polls scenarios. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 27b9baf commit b84a842

1 file changed

Lines changed: 167 additions & 3 deletions

File tree

crates/node-tests/tests/reorg.rs

Lines changed: 167 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ async fn test_block_filter_reorg() {
109109
// Poll: should have 1 block hash.
110110
let hashes: Vec<B256> = ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
111111
assert_eq!(hashes.len(), 1);
112-
let block2_hash = hashes[0];
113112

114113
// Process block 3 (increment), keep clone for revert.
115114
let block3 = process_increment(&ctx, *contract.address()).await;
@@ -128,8 +127,9 @@ async fn test_block_filter_reorg() {
128127
// Poll: should return the new block 3 hash.
129128
let hashes: Vec<B256> = ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
130129
assert_eq!(hashes.len(), 1);
131-
// Verify it is NOT the old block 2 hash (it should be the new block 3).
132-
assert_ne!(hashes[0], block2_hash);
130+
let new_block3_hash =
131+
ctx.alloy_provider.get_block_by_number(3.into()).await.unwrap().unwrap().hash();
132+
assert_eq!(hashes[0], new_block3_hash);
133133

134134
ctx
135135
})
@@ -383,3 +383,167 @@ async fn test_no_regression_filters_and_subscriptions() {
383383
})
384384
.await;
385385
}
386+
387+
// ---------------------------------------------------------------------------
388+
// 8. Multi-block reorg with log filter
389+
// ---------------------------------------------------------------------------
390+
391+
#[serial]
392+
#[tokio::test]
393+
async fn test_multi_block_reorg_log_filter() {
394+
rpc_test(|ctx, contract| async move {
395+
let addr = *contract.address();
396+
397+
// Install a log filter on the Counter address.
398+
let filter_id = ctx.alloy_provider.new_filter(&Filter::new().address(addr)).await.unwrap();
399+
400+
// Process blocks 2, 3, 4 (increment each → count 1, 2, 3).
401+
let block2 = process_increment(&ctx, addr).await;
402+
let block3 = process_increment(&ctx, addr).await;
403+
let block4 = process_increment(&ctx, addr).await;
404+
405+
// Poll: expect 3 logs.
406+
let logs: Vec<Log<LogData>> =
407+
ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
408+
assert_eq!(logs.len(), 3);
409+
for (i, log) in logs.iter().enumerate() {
410+
assert_eq!(log.inner.topics()[1], B256::with_last_byte(i as u8 + 1));
411+
}
412+
413+
// Revert blocks 4, 3, 2 (back to block 1).
414+
ctx.revert_block(block4).await.unwrap();
415+
ctx.revert_block(block3).await.unwrap();
416+
ctx.revert_block(block2).await.unwrap();
417+
418+
// Poll: empty (watermark rewinds start to 2, but latest=1).
419+
let logs: Vec<Log<LogData>> =
420+
ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
421+
assert!(logs.is_empty());
422+
423+
// Rebuild blocks 2, 3 (increment each → count 1, 2).
424+
let _new_b2 = process_increment(&ctx, addr).await;
425+
let _new_b3 = process_increment(&ctx, addr).await;
426+
427+
// Poll: expect 2 logs with count 1, 2.
428+
let logs: Vec<Log<LogData>> =
429+
ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
430+
assert_eq!(logs.len(), 2);
431+
assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(1));
432+
assert_eq!(logs[1].inner.topics()[1], B256::with_last_byte(2));
433+
434+
ctx
435+
})
436+
.await;
437+
}
438+
439+
// ---------------------------------------------------------------------------
440+
// 9. Multi-block reorg with log subscription
441+
// ---------------------------------------------------------------------------
442+
443+
#[serial]
444+
#[tokio::test]
445+
async fn test_multi_block_reorg_log_subscription() {
446+
rpc_test(|ctx, contract| async move {
447+
let addr = *contract.address();
448+
let mut sub =
449+
ctx.alloy_provider.subscribe_logs(&Filter::new().address(addr)).await.unwrap();
450+
451+
// Process blocks 2, 3 (increment each → count 1, 2).
452+
let block2 = process_increment(&ctx, addr).await;
453+
let block3 = process_increment(&ctx, addr).await;
454+
455+
// Receive 2 normal logs.
456+
for expected in [1u8, 2] {
457+
let log =
458+
tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap();
459+
assert!(!log.removed);
460+
assert_eq!(log.inner.topics()[1], B256::with_last_byte(expected));
461+
}
462+
463+
// Revert blocks 3, 2.
464+
ctx.revert_block(block3).await.unwrap();
465+
ctx.revert_block(block2).await.unwrap();
466+
467+
// Receive 2 removed logs (one per reverted block).
468+
for _ in 0..2 {
469+
let log =
470+
tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap();
471+
assert!(log.removed);
472+
assert_eq!(log.inner.address, addr);
473+
}
474+
475+
// Rebuild block 2 (increment → count 1).
476+
let _new_b2 = process_increment(&ctx, addr).await;
477+
478+
// Receive the new normal log.
479+
let log = tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap();
480+
assert!(!log.removed);
481+
assert_eq!(log.inner.topics()[1], B256::with_last_byte(1));
482+
483+
ctx
484+
})
485+
.await;
486+
}
487+
488+
// ---------------------------------------------------------------------------
489+
// 10. Multiple reorgs between polls (watermark min path)
490+
// ---------------------------------------------------------------------------
491+
492+
#[serial]
493+
#[tokio::test]
494+
async fn test_multiple_reorgs_between_polls() {
495+
rpc_test(|ctx, contract| async move {
496+
let addr = *contract.address();
497+
498+
let filter_id = ctx.alloy_provider.new_filter(&Filter::new().address(addr)).await.unwrap();
499+
500+
// Process blocks 2, 3, 4, 5 (count 1, 2, 3, 4).
501+
let _b2 = process_increment(&ctx, addr).await;
502+
let _b3 = process_increment(&ctx, addr).await;
503+
let b4 = process_increment(&ctx, addr).await;
504+
let b5 = process_increment(&ctx, addr).await;
505+
506+
// Poll to advance the filter cursor past block 5.
507+
let logs: Vec<Log<LogData>> =
508+
ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
509+
assert_eq!(logs.len(), 4);
510+
511+
// --- Reorg 1: revert block 5 only (common_ancestor=4, watermark=4) ---
512+
ctx.revert_block(b5).await.unwrap();
513+
514+
// Rebuild block 5 (count=5).
515+
let new_b5 = process_increment(&ctx, addr).await;
516+
517+
// --- Reorg 2 (deeper): revert blocks 5 AND 4 ---
518+
// common_ancestor for block 5 revert = 4, for block 4 revert = 3
519+
// watermark = min(4, min(4, 3)) = 3
520+
ctx.revert_block(new_b5).await.unwrap();
521+
ctx.revert_block(b4).await.unwrap();
522+
523+
// DO NOT POLL between reorg 1 and reorg 2 — this is the key.
524+
// The filter now has watermark=3 from the min() of both reorgs.
525+
526+
// Poll: empty (watermark+1=4, but latest=3 → start > latest).
527+
let logs: Vec<Log<LogData>> =
528+
ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
529+
assert!(logs.is_empty());
530+
531+
// Rebuild blocks 4, 5 (count 3, 4).
532+
let _new_b4 = process_increment(&ctx, addr).await;
533+
let _new_b5 = process_increment(&ctx, addr).await;
534+
535+
// Poll: 2 logs from blocks 4 and 5 (count 3, 4).
536+
// After reverting blocks 4+5, count was back to 2. Rebuilding
537+
// increments to 3 then 4. This proves the deeper watermark (3)
538+
// was kept — if only the shallow watermark (4) had been kept,
539+
// we'd see 1 log from block 5 only.
540+
let logs: Vec<Log<LogData>> =
541+
ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
542+
assert_eq!(logs.len(), 2);
543+
assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(3));
544+
assert_eq!(logs[1].inner.topics()[1], B256::with_last_byte(4));
545+
546+
ctx
547+
})
548+
.await;
549+
}

0 commit comments

Comments
 (0)