Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 34 additions & 59 deletions ocp/worker/currency/reserve/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
ocp_data "github.com/code-payments/ocp-server/ocp/data"
"github.com/code-payments/ocp-server/ocp/data/currency"
"github.com/code-payments/ocp-server/ocp/worker"
"github.com/code-payments/ocp-server/retry"
"github.com/code-payments/ocp-server/retry/backoff"
)

type reserveRuntime struct {
Expand All @@ -33,42 +31,28 @@ func New(log *zap.Logger, data ocp_data.Provider) worker.Runtime {
}

func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duration) error {
p.refreshMints(runtimeCtx)
go p.pollMints(runtimeCtx, interval/3)

for {
_, err := retry.Retry(
func() error {
p.log.Debug("updating reserves")

provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider)
trace := provider.StartTrace("currency_reserve_runtime")
defer trace.End()
tracedCtx := metrics.NewContext(runtimeCtx, trace)

err := p.UpdateAllLaunchpadCurrencyReserves(tracedCtx)
if err != nil {
trace.OnError(err)
p.log.With(zap.Error(err)).Warn("failed to process current reserve data")
}

return err
},
retry.NonRetriableErrors(context.Canceled),
retry.BackoffWithJitter(backoff.BinaryExponential(time.Second), interval, 0.1),
)
if err != nil {
if err != context.Canceled {
// Should not happen since only non-retriable error is context.Canceled
p.log.With(zap.Error(err)).Warn("unexpected error when processing current reserve data")
}
start := time.Now()

return err
}
func() {
p.log.Debug("updating reserves")

provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider)
trace := provider.StartTrace("currency_reserve_runtime")
defer trace.End()
tracedCtx := metrics.NewContext(runtimeCtx, trace)

p.UpdateAllLaunchpadCurrencyReserves(tracedCtx)
}()

delay := max(interval-time.Since(start), 0)
select {
case <-runtimeCtx.Done():
return runtimeCtx.Err()
case <-time.After(interval):
case <-time.After(delay):
}
}
}
Expand Down Expand Up @@ -121,37 +105,28 @@ func (p *reserveRuntime) getMints() []*common.Account {
return p.mints
}

func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) error {
func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) {
mints := p.getMints()

var wg sync.WaitGroup
wg.Add(len(mints))
for _, mint := range mints {
go func(mint *common.Account) {
defer wg.Done()

log := p.log.With(zap.String("mint", mint.PublicKey().ToBase58()))

circulatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, mint)
if err != nil {
log.With(zap.Error(err)).Warn("failed to get circulating supply")
return
}

err = p.data.PutCurrencyReserve(ctx, &currency.ReserveRecord{
Mint: mint.PublicKey().ToBase58(),
SupplyFromBonding: circulatingSupply,
Time: ts,
})
if err != nil {
log.With(zap.Error(err)).Warn("failed to put currency reserve")
return
}

recordReserveStateEvent(ctx, mint, circulatingSupply)
}(mint)
}
wg.Wait()
log := p.log.With(zap.String("mint", mint.PublicKey().ToBase58()))

return nil
circulatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, mint)
if err != nil {
log.With(zap.Error(err)).Warn("failed to get circulating supply")
continue
}

err = p.data.PutCurrencyReserve(ctx, &currency.ReserveRecord{
Mint: mint.PublicKey().ToBase58(),
SupplyFromBonding: circulatingSupply,
Time: ts,
})
if err != nil {
log.With(zap.Error(err)).Warn("failed to put currency reserve")
continue
}

recordReserveStateEvent(ctx, mint, circulatingSupply)
}
}
4 changes: 2 additions & 2 deletions ocp/worker/geyser/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (p *runtime) backupTimelockStateWorker(runtimeCtx context.Context, state ti
state,
query.WithDirection(query.Ascending),
query.WithCursor(cursor),
query.WithLimit(256),
query.WithLimit(p.conf.backupTimelockWorkerBatchSize.Get(runtimeCtx)),
)
if err == timelock.ErrTimelockNotFound {
p.metricStatusLock.Lock()
Expand Down Expand Up @@ -125,7 +125,7 @@ func (p *runtime) backupExternalDepositWorker(runtimeCtx context.Context, interv
defer trace.End()
tracedCtx := metrics.NewContext(runtimeCtx, trace)

accountInfoRecords, err := p.data.GetPrioritizedAccountInfosRequiringDepositSync(tracedCtx, 256)
accountInfoRecords, err := p.data.GetPrioritizedAccountInfosRequiringDepositSync(tracedCtx, p.conf.backupExternalDepositWorkerBatchSize.Get(runtimeCtx))
if err == account.ErrAccountInfoNotFound {
return
} else if err != nil {
Expand Down
18 changes: 14 additions & 4 deletions ocp/worker/geyser/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ const (
BackupTimelockWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_TIMELOCK_WORKER_INTERVAL"
defaultBackupTimelockWorkerInterval = time.Second

BackupTimelockWorkerBatchSizeConfigEnvName = envConfigPrefix + "BACKUP_TIMELOCK_WORKER_BATCH_SIZE"
defaultBackupTimelockWorkerBatchSize = 100

BackupExternalDepositWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_EXTERNAL_DEPOSIT_WORKER_INTERVAL"
defaultBackupExternalDepositWorkerInterval = time.Second

BackupExternalDepositWorkerBatchSizeConfigEnvName = envConfigPrefix + "BACKUP_EXTERNAL_DEPOSIT_WORKER_BATCH_SIZE"
defaultBackupExternalDepositWorkerBatchSize = 100
)

type conf struct {
Expand All @@ -36,9 +42,11 @@ type conf struct {
programUpdateWorkerCount config.Uint64
programUpdateQueueSize config.Uint64

backupExternalDepositWorkerInterval config.Duration
backupExternalDepositWorkerInterval config.Duration
backupExternalDepositWorkerBatchSize config.Uint64

backupTimelockWorkerInterval config.Duration
backupTimelockWorkerInterval config.Duration
backupTimelockWorkerBatchSize config.Uint64
}

// ConfigProvider defines how config values are pulled
Expand All @@ -54,9 +62,11 @@ func WithEnvConfigs() ConfigProvider {
programUpdateWorkerCount: env.NewUint64Config(ProgramUpdateWorkerCountConfigEnvName, defaultProgramUpdateWorkerCount),
programUpdateQueueSize: env.NewUint64Config(ProgramUpdateQueueSizeConfigEnvName, defaultProgramUpdateQueueSize),

backupExternalDepositWorkerInterval: env.NewDurationConfig(BackupExternalDepositWorkerIntervalConfigEnvName, defaultBackupExternalDepositWorkerInterval),
backupExternalDepositWorkerInterval: env.NewDurationConfig(BackupExternalDepositWorkerIntervalConfigEnvName, defaultBackupExternalDepositWorkerInterval),
backupExternalDepositWorkerBatchSize: env.NewUint64Config(BackupExternalDepositWorkerBatchSizeConfigEnvName, defaultBackupExternalDepositWorkerBatchSize),

backupTimelockWorkerInterval: env.NewDurationConfig(BackupTimelockWorkerIntervalConfigEnvName, defaultBackupTimelockWorkerInterval),
backupTimelockWorkerInterval: env.NewDurationConfig(BackupTimelockWorkerIntervalConfigEnvName, defaultBackupTimelockWorkerInterval),
backupTimelockWorkerBatchSize: env.NewUint64Config(BackupTimelockWorkerBatchSizeConfigEnvName, defaultBackupTimelockWorkerBatchSize),
}
}
}
5 changes: 5 additions & 0 deletions ocp/worker/nonce/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
const (
envConfigPrefix = "NONCE_RUNTIME_"

BatchSizeConfigEnvName = envConfigPrefix + "WORKER_BATCH_SIZE"
defaultBatchSize = 100

solanaMainnetNoncePubkeyPrefixConfigEnvName = envConfigPrefix + "SOLANA_MAINNET_NONCE_PUBKEY_PREFIX"
defaultSolanaMainnetNoncePubkeyPrefix = "non"

Expand All @@ -19,6 +22,7 @@ const (
)

type conf struct {
batchSize config.Uint64
solanaMainnetNoncePubkeyPrefix config.String
onDemandTransactionNoncePoolSize config.Uint64
clientSwapNoncePoolSize config.Uint64
Expand All @@ -31,6 +35,7 @@ type ConfigProvider func() *conf
func WithEnvConfigs() ConfigProvider {
return func() *conf {
return &conf{
batchSize: env.NewUint64Config(BatchSizeConfigEnvName, defaultBatchSize),
solanaMainnetNoncePubkeyPrefix: env.NewStringConfig(solanaMainnetNoncePubkeyPrefixConfigEnvName, defaultSolanaMainnetNoncePubkeyPrefix),
onDemandTransactionNoncePoolSize: env.NewUint64Config(onDemandTransactiontNoncePoolSizeConfigEnvName, defaultOnDemandTransactionNoncePoolSize),
clientSwapNoncePoolSize: env.NewUint64Config(clientSwapNoncePoolSizeConfigEnvName, defaultClientSwapNoncePoolSize),
Expand Down
6 changes: 1 addition & 5 deletions ocp/worker/nonce/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import (

// todo: We can generalize nonce handling by environment using an interface

const (
nonceBatchSize = 100
)

func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, state nonce.State, interval time.Duration) error {
var cursor query.Cursor
delay := interval
Expand All @@ -39,7 +35,7 @@ func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, stat
tracedCtx,
env,
state,
query.WithLimit(nonceBatchSize),
query.WithLimit(p.conf.batchSize.Get(runtimeCtx)),
query.WithCursor(cursor),
)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions ocp/worker/swap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
const (
envConfigPrefix = "SWAP_RUNTIME_"

BatchSizeConfigEnvName = envConfigPrefix + "WORKER_BATCH_SIZE"
defaultFulfillmentBatchSize = 100
BatchSizeConfigEnvName = envConfigPrefix + "WORKER_BATCH_SIZE"
defaultBatchSize = 100

ClientTimeoutToFundConfigEnvName = envConfigPrefix + "CLIENT_TIMEOUT_TO_FUND"
defaultClientTimeoutToFund = 30 * time.Second
Expand All @@ -33,7 +33,7 @@ type ConfigProvider func() *conf
func WithEnvConfigs() ConfigProvider {
return func() *conf {
return &conf{
batchSize: env.NewUint64Config(BatchSizeConfigEnvName, defaultFulfillmentBatchSize),
batchSize: env.NewUint64Config(BatchSizeConfigEnvName, defaultBatchSize),
clientTimeoutToFund: env.NewDurationConfig(ClientTimeoutToFundConfigEnvName, defaultClientTimeoutToFund),
externalWalletFinalizationTimeout: env.NewDurationConfig(ExternalWalletFinalizationTimeoutConfigEnvName, defaultExternalWalletFinalizationTimeout),
}
Expand Down