diff --git a/ocp/worker/geyser/backup.go b/ocp/worker/geyser/backup.go index 3fc867e..695dad0 100644 --- a/ocp/worker/geyser/backup.go +++ b/ocp/worker/geyser/backup.go @@ -73,8 +73,16 @@ func (p *runtime) backupTimelockStateWorker(runtimeCtx context.Context, state ti return } + reprocessDelay := p.conf.backupTimelockWorkerReprocessDelay.Get(runtimeCtx) + var wg sync.WaitGroup for _, timelockRecord := range timelockRecords { + if lastProcessed, ok := p.backupTimelockProcessedCache.Load(timelockRecord.Address); ok { + if time.Since(lastProcessed.(time.Time)) < reprocessDelay { + continue + } + } + wg.Add(1) go func(timelockRecord *timelock.Record) { @@ -85,7 +93,10 @@ func (p *runtime) backupTimelockStateWorker(runtimeCtx context.Context, state ti err := updateTimelockAccountRecord(tracedCtx, p.data, timelockRecord) if err != nil { log.With(zap.Error(err)).Warn("failed to update timelock account") + return } + + p.backupTimelockProcessedCache.Store(timelockRecord.Address, time.Now()) }(timelockRecord) } diff --git a/ocp/worker/geyser/config.go b/ocp/worker/geyser/config.go index 8aedea5..542922e 100644 --- a/ocp/worker/geyser/config.go +++ b/ocp/worker/geyser/config.go @@ -28,6 +28,9 @@ const ( BackupTimelockWorkerBatchSizeConfigEnvName = envConfigPrefix + "BACKUP_TIMELOCK_WORKER_BATCH_SIZE" defaultBackupTimelockWorkerBatchSize = 100 + BackupTimelockWorkerReprocessDelayConfigEnvName = envConfigPrefix + "BACKUP_TIMELOCK_WORKER_REPROCESS_DELAY" + defaultBackupTimelockWorkerReprocessDelay = time.Hour + BackupExternalDepositWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_EXTERNAL_DEPOSIT_WORKER_INTERVAL" defaultBackupExternalDepositWorkerInterval = time.Second @@ -45,8 +48,9 @@ type conf struct { backupExternalDepositWorkerInterval config.Duration backupExternalDepositWorkerBatchSize config.Uint64 - backupTimelockWorkerInterval config.Duration - backupTimelockWorkerBatchSize config.Uint64 + backupTimelockWorkerInterval config.Duration + backupTimelockWorkerBatchSize config.Uint64 + backupTimelockWorkerReprocessDelay config.Duration } // ConfigProvider defines how config values are pulled @@ -65,8 +69,9 @@ func WithEnvConfigs() ConfigProvider { backupExternalDepositWorkerInterval: env.NewDurationConfig(BackupExternalDepositWorkerIntervalConfigEnvName, defaultBackupExternalDepositWorkerInterval), backupExternalDepositWorkerBatchSize: env.NewUint64Config(BackupExternalDepositWorkerBatchSizeConfigEnvName, defaultBackupExternalDepositWorkerBatchSize), - backupTimelockWorkerInterval: env.NewDurationConfig(BackupTimelockWorkerIntervalConfigEnvName, defaultBackupTimelockWorkerInterval), - backupTimelockWorkerBatchSize: env.NewUint64Config(BackupTimelockWorkerBatchSizeConfigEnvName, defaultBackupTimelockWorkerBatchSize), + backupTimelockWorkerInterval: env.NewDurationConfig(BackupTimelockWorkerIntervalConfigEnvName, defaultBackupTimelockWorkerInterval), + backupTimelockWorkerBatchSize: env.NewUint64Config(BackupTimelockWorkerBatchSizeConfigEnvName, defaultBackupTimelockWorkerBatchSize), + backupTimelockWorkerReprocessDelay: env.NewDurationConfig(BackupTimelockWorkerReprocessDelayConfigEnvName, defaultBackupTimelockWorkerReprocessDelay), } } } diff --git a/ocp/worker/geyser/runtime.go b/ocp/worker/geyser/runtime.go index 3eb21c0..becb363 100644 --- a/ocp/worker/geyser/runtime.go +++ b/ocp/worker/geyser/runtime.go @@ -40,6 +40,7 @@ type runtime struct { backupTimelockStateWorkerDuration *time.Duration backupTimelockStateWorkerStatus bool + backupTimelockProcessedCache sync.Map // address -> time.Time backupExternalDepositWorkerStatus bool }