From 7e83419c49c710816f0b5a2a492f8facfbfe394b Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Thu, 5 Mar 2026 09:21:11 -0500 Subject: [PATCH] Improve configurability of workers --- ocp/worker/currency/reserve/runtime.go | 93 ++++++++++---------------- ocp/worker/geyser/backup.go | 4 +- ocp/worker/geyser/config.go | 18 +++-- ocp/worker/nonce/config.go | 5 ++ ocp/worker/nonce/pool.go | 6 +- ocp/worker/swap/config.go | 6 +- 6 files changed, 59 insertions(+), 73 deletions(-) diff --git a/ocp/worker/currency/reserve/runtime.go b/ocp/worker/currency/reserve/runtime.go index d21307c..7f92e29 100644 --- a/ocp/worker/currency/reserve/runtime.go +++ b/ocp/worker/currency/reserve/runtime.go @@ -13,8 +13,6 @@ import ( ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/currency" "github.com/code-payments/ocp-server/ocp/worker" - "github.com/code-payments/ocp-server/retry" - "github.com/code-payments/ocp-server/retry/backoff" ) type reserveRuntime struct { @@ -33,42 +31,28 @@ func New(log *zap.Logger, data ocp_data.Provider) worker.Runtime { } func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duration) error { + p.refreshMints(runtimeCtx) go p.pollMints(runtimeCtx, interval/3) for { - _, err := retry.Retry( - func() error { - p.log.Debug("updating reserves") - - provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) - trace := provider.StartTrace("currency_reserve_runtime") - defer trace.End() - tracedCtx := metrics.NewContext(runtimeCtx, trace) - - err := p.UpdateAllLaunchpadCurrencyReserves(tracedCtx) - if err != nil { - trace.OnError(err) - p.log.With(zap.Error(err)).Warn("failed to process current reserve data") - } - - return err - }, - retry.NonRetriableErrors(context.Canceled), - retry.BackoffWithJitter(backoff.BinaryExponential(time.Second), interval, 0.1), - ) - if err != nil { - if err != context.Canceled { - // Should not happen since only non-retriable error is context.Canceled - p.log.With(zap.Error(err)).Warn("unexpected error when processing current reserve data") - } + start := time.Now() - return err - } + func() { + p.log.Debug("updating reserves") + + provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) + trace := provider.StartTrace("currency_reserve_runtime") + defer trace.End() + tracedCtx := metrics.NewContext(runtimeCtx, trace) + + p.UpdateAllLaunchpadCurrencyReserves(tracedCtx) + }() + delay := max(interval-time.Since(start), 0) select { case <-runtimeCtx.Done(): return runtimeCtx.Err() - case <-time.After(interval): + case <-time.After(delay): } } } @@ -121,37 +105,28 @@ func (p *reserveRuntime) getMints() []*common.Account { return p.mints } -func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) error { +func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) { mints := p.getMints() - var wg sync.WaitGroup - wg.Add(len(mints)) for _, mint := range mints { - go func(mint *common.Account) { - defer wg.Done() - - log := p.log.With(zap.String("mint", mint.PublicKey().ToBase58())) - - circulatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, mint) - if err != nil { - log.With(zap.Error(err)).Warn("failed to get circulating supply") - return - } - - err = p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: mint.PublicKey().ToBase58(), - SupplyFromBonding: circulatingSupply, - Time: ts, - }) - if err != nil { - log.With(zap.Error(err)).Warn("failed to put currency reserve") - return - } - - recordReserveStateEvent(ctx, mint, circulatingSupply) - }(mint) - } - wg.Wait() + log := p.log.With(zap.String("mint", mint.PublicKey().ToBase58())) - return nil + circulatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, mint) + if err != nil { + log.With(zap.Error(err)).Warn("failed to get circulating supply") + continue + } + + err = p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ + Mint: mint.PublicKey().ToBase58(), + SupplyFromBonding: circulatingSupply, + Time: ts, + }) + if err != nil { + log.With(zap.Error(err)).Warn("failed to put currency reserve") + continue + } + + recordReserveStateEvent(ctx, mint, circulatingSupply) + } } diff --git a/ocp/worker/geyser/backup.go b/ocp/worker/geyser/backup.go index 5310aed..3fc867e 100644 --- a/ocp/worker/geyser/backup.go +++ b/ocp/worker/geyser/backup.go @@ -55,7 +55,7 @@ func (p *runtime) backupTimelockStateWorker(runtimeCtx context.Context, state ti state, query.WithDirection(query.Ascending), query.WithCursor(cursor), - query.WithLimit(256), + query.WithLimit(p.conf.backupTimelockWorkerBatchSize.Get(runtimeCtx)), ) if err == timelock.ErrTimelockNotFound { p.metricStatusLock.Lock() @@ -125,7 +125,7 @@ func (p *runtime) backupExternalDepositWorker(runtimeCtx context.Context, interv defer trace.End() tracedCtx := metrics.NewContext(runtimeCtx, trace) - accountInfoRecords, err := p.data.GetPrioritizedAccountInfosRequiringDepositSync(tracedCtx, 256) + accountInfoRecords, err := p.data.GetPrioritizedAccountInfosRequiringDepositSync(tracedCtx, p.conf.backupExternalDepositWorkerBatchSize.Get(runtimeCtx)) if err == account.ErrAccountInfoNotFound { return } else if err != nil { diff --git a/ocp/worker/geyser/config.go b/ocp/worker/geyser/config.go index 6e0f9cb..8aedea5 100644 --- a/ocp/worker/geyser/config.go +++ b/ocp/worker/geyser/config.go @@ -25,8 +25,14 @@ const ( BackupTimelockWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_TIMELOCK_WORKER_INTERVAL" defaultBackupTimelockWorkerInterval = time.Second + BackupTimelockWorkerBatchSizeConfigEnvName = envConfigPrefix + "BACKUP_TIMELOCK_WORKER_BATCH_SIZE" + defaultBackupTimelockWorkerBatchSize = 100 + BackupExternalDepositWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_EXTERNAL_DEPOSIT_WORKER_INTERVAL" defaultBackupExternalDepositWorkerInterval = time.Second + + BackupExternalDepositWorkerBatchSizeConfigEnvName = envConfigPrefix + "BACKUP_EXTERNAL_DEPOSIT_WORKER_BATCH_SIZE" + defaultBackupExternalDepositWorkerBatchSize = 100 ) type conf struct { @@ -36,9 +42,11 @@ type conf struct { programUpdateWorkerCount config.Uint64 programUpdateQueueSize config.Uint64 - backupExternalDepositWorkerInterval config.Duration + backupExternalDepositWorkerInterval config.Duration + backupExternalDepositWorkerBatchSize config.Uint64 - backupTimelockWorkerInterval config.Duration + backupTimelockWorkerInterval config.Duration + backupTimelockWorkerBatchSize config.Uint64 } // ConfigProvider defines how config values are pulled @@ -54,9 +62,11 @@ func WithEnvConfigs() ConfigProvider { programUpdateWorkerCount: env.NewUint64Config(ProgramUpdateWorkerCountConfigEnvName, defaultProgramUpdateWorkerCount), programUpdateQueueSize: env.NewUint64Config(ProgramUpdateQueueSizeConfigEnvName, defaultProgramUpdateQueueSize), - backupExternalDepositWorkerInterval: env.NewDurationConfig(BackupExternalDepositWorkerIntervalConfigEnvName, defaultBackupExternalDepositWorkerInterval), + backupExternalDepositWorkerInterval: env.NewDurationConfig(BackupExternalDepositWorkerIntervalConfigEnvName, defaultBackupExternalDepositWorkerInterval), + backupExternalDepositWorkerBatchSize: env.NewUint64Config(BackupExternalDepositWorkerBatchSizeConfigEnvName, defaultBackupExternalDepositWorkerBatchSize), - backupTimelockWorkerInterval: env.NewDurationConfig(BackupTimelockWorkerIntervalConfigEnvName, defaultBackupTimelockWorkerInterval), + backupTimelockWorkerInterval: env.NewDurationConfig(BackupTimelockWorkerIntervalConfigEnvName, defaultBackupTimelockWorkerInterval), + backupTimelockWorkerBatchSize: env.NewUint64Config(BackupTimelockWorkerBatchSizeConfigEnvName, defaultBackupTimelockWorkerBatchSize), } } } diff --git a/ocp/worker/nonce/config.go b/ocp/worker/nonce/config.go index c1af709..c30b4ad 100644 --- a/ocp/worker/nonce/config.go +++ b/ocp/worker/nonce/config.go @@ -8,6 +8,9 @@ import ( const ( envConfigPrefix = "NONCE_RUNTIME_" + BatchSizeConfigEnvName = envConfigPrefix + "WORKER_BATCH_SIZE" + defaultBatchSize = 100 + solanaMainnetNoncePubkeyPrefixConfigEnvName = envConfigPrefix + "SOLANA_MAINNET_NONCE_PUBKEY_PREFIX" defaultSolanaMainnetNoncePubkeyPrefix = "non" @@ -19,6 +22,7 @@ const ( ) type conf struct { + batchSize config.Uint64 solanaMainnetNoncePubkeyPrefix config.String onDemandTransactionNoncePoolSize config.Uint64 clientSwapNoncePoolSize config.Uint64 @@ -31,6 +35,7 @@ type ConfigProvider func() *conf func WithEnvConfigs() ConfigProvider { return func() *conf { return &conf{ + batchSize: env.NewUint64Config(BatchSizeConfigEnvName, defaultBatchSize), solanaMainnetNoncePubkeyPrefix: env.NewStringConfig(solanaMainnetNoncePubkeyPrefixConfigEnvName, defaultSolanaMainnetNoncePubkeyPrefix), onDemandTransactionNoncePoolSize: env.NewUint64Config(onDemandTransactiontNoncePoolSizeConfigEnvName, defaultOnDemandTransactionNoncePoolSize), clientSwapNoncePoolSize: env.NewUint64Config(clientSwapNoncePoolSizeConfigEnvName, defaultClientSwapNoncePoolSize), diff --git a/ocp/worker/nonce/pool.go b/ocp/worker/nonce/pool.go index 1caf702..381db0f 100644 --- a/ocp/worker/nonce/pool.go +++ b/ocp/worker/nonce/pool.go @@ -18,10 +18,6 @@ import ( // todo: We can generalize nonce handling by environment using an interface -const ( - nonceBatchSize = 100 -) - func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, state nonce.State, interval time.Duration) error { var cursor query.Cursor delay := interval @@ -39,7 +35,7 @@ func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, stat tracedCtx, env, state, - query.WithLimit(nonceBatchSize), + query.WithLimit(p.conf.batchSize.Get(runtimeCtx)), query.WithCursor(cursor), ) if err != nil { diff --git a/ocp/worker/swap/config.go b/ocp/worker/swap/config.go index 417ccfd..9dcf2fd 100644 --- a/ocp/worker/swap/config.go +++ b/ocp/worker/swap/config.go @@ -10,8 +10,8 @@ import ( const ( envConfigPrefix = "SWAP_RUNTIME_" - BatchSizeConfigEnvName = envConfigPrefix + "WORKER_BATCH_SIZE" - defaultFulfillmentBatchSize = 100 + BatchSizeConfigEnvName = envConfigPrefix + "WORKER_BATCH_SIZE" + defaultBatchSize = 100 ClientTimeoutToFundConfigEnvName = envConfigPrefix + "CLIENT_TIMEOUT_TO_FUND" defaultClientTimeoutToFund = 30 * time.Second @@ -33,7 +33,7 @@ type ConfigProvider func() *conf func WithEnvConfigs() ConfigProvider { return func() *conf { return &conf{ - batchSize: env.NewUint64Config(BatchSizeConfigEnvName, defaultFulfillmentBatchSize), + batchSize: env.NewUint64Config(BatchSizeConfigEnvName, defaultBatchSize), clientTimeoutToFund: env.NewDurationConfig(ClientTimeoutToFundConfigEnvName, defaultClientTimeoutToFund), externalWalletFinalizationTimeout: env.NewDurationConfig(ExternalWalletFinalizationTimeoutConfigEnvName, defaultExternalWalletFinalizationTimeout), }