From 577c4ab718b4ff8ee2eff97249e04ff37bf17e6f Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Thu, 5 Mar 2026 13:42:17 -0500 Subject: [PATCH 1/7] Support live and historical reserve states at the DB level --- ocp/data/currency/memory/store.go | 94 ++++++++++++--- ocp/data/currency/model.go | 3 + ocp/data/currency/postgres/model.go | 95 +++++++++++++-- ocp/data/currency/postgres/store.go | 45 ++++++- ocp/data/currency/postgres/store_test.go | 9 ++ ocp/data/currency/store.go | 24 +++- ocp/data/currency/tests/tests.go | 144 ++++++++++++++++++++++- ocp/data/internal.go | 18 ++- ocp/worker/currency/reserve/runtime.go | 2 +- 9 files changed, 394 insertions(+), 40 deletions(-) diff --git a/ocp/data/currency/memory/store.go b/ocp/data/currency/memory/store.go index 6abecc9..8bd9c7a 100644 --- a/ocp/data/currency/memory/store.go +++ b/ocp/data/currency/memory/store.go @@ -16,13 +16,15 @@ const ( ) type store struct { - mu sync.Mutex - exchangeRateRecords []*currency.ExchangeRateRecord - lastExchangeRateIndex uint64 - metadataRecords []*currency.MetadataRecord - lastMetadataIndex uint64 - reserveRecords []*currency.ReserveRecord - lastReserveIndex uint64 + mu sync.Mutex + exchangeRateRecords []*currency.ExchangeRateRecord + lastExchangeRateIndex uint64 + metadataRecords []*currency.MetadataRecord + lastMetadataIndex uint64 + historicalReserveRecords []*currency.ReserveRecord + lastHistoricalReserveIndex uint64 + liveReserveRecords map[string]*currency.ReserveRecord + lastLiveReserveIndex uint64 } type RateByTime []*currency.ExchangeRateRecord @@ -47,6 +49,8 @@ func New() currency.Store { return &store{ exchangeRateRecords: make([]*currency.ExchangeRateRecord, 0), lastExchangeRateIndex: 1, + liveReserveRecords: make(map[string]*currency.ReserveRecord), + lastLiveReserveIndex: 1, } } @@ -56,8 +60,10 @@ func (s *store) reset() { s.lastExchangeRateIndex = 1 s.metadataRecords = make([]*currency.MetadataRecord, 0) s.lastMetadataIndex = 1 - s.reserveRecords = make([]*currency.ReserveRecord, 0) - s.lastReserveIndex = 1 + s.historicalReserveRecords = make([]*currency.ReserveRecord, 0) + s.lastHistoricalReserveIndex = 1 + s.liveReserveRecords = make(map[string]*currency.ReserveRecord) + s.lastLiveReserveIndex = 1 s.mu.Unlock() } @@ -306,7 +312,7 @@ func (s *store) CountMints(ctx context.Context) (uint64, error) { return uint64(len(s.metadataRecords)), nil } -func (s *store) PutReserveRecord(ctx context.Context, data *currency.ReserveRecord) error { +func (s *store) PutHistoricalReserveRecord(ctx context.Context, data *currency.ReserveRecord) error { if err := data.Validate(); err != nil { return err } @@ -315,15 +321,15 @@ func (s *store) PutReserveRecord(ctx context.Context, data *currency.ReserveReco defer s.mu.Unlock() // Not ideal but fine for testing the currency store - for _, item := range s.reserveRecords { + for _, item := range s.historicalReserveRecords { if item.Mint == data.Mint && item.Time.Unix() == data.Time.Unix() { return currency.ErrExists } } - data.Id = s.lastReserveIndex - s.reserveRecords = append(s.reserveRecords, data.Clone()) - s.lastReserveIndex = s.lastReserveIndex + 1 + data.Id = s.lastHistoricalReserveIndex + s.historicalReserveRecords = append(s.historicalReserveRecords, data.Clone()) + s.lastHistoricalReserveIndex = s.lastHistoricalReserveIndex + 1 return nil } @@ -334,7 +340,7 @@ func (s *store) GetReserveAtTime(ctx context.Context, mint string, t time.Time) // Not ideal but fine for testing the currency store var results []*currency.ReserveRecord - for _, item := range s.reserveRecords { + for _, item := range s.historicalReserveRecords { if item.Mint == mint && item.Time.Unix() <= t.Unix() && item.Time.Format(dateFormat) == t.Format(dateFormat) { results = append(results, item) } @@ -353,11 +359,11 @@ func (s *store) GetReservesInRange(ctx context.Context, mint string, interval qu s.mu.Lock() defer s.mu.Unlock() - sort.Sort(ReserveByTime(s.reserveRecords)) + sort.Sort(ReserveByTime(s.historicalReserveRecords)) // Not ideal but fine for testing the currency store var all []*currency.ReserveRecord - for _, item := range s.reserveRecords { + for _, item := range s.historicalReserveRecords { if item.Mint == mint && item.Time.Unix() >= start.Unix() && item.Time.Unix() <= end.Unix() { all = append(all, item.Clone()) } @@ -377,3 +383,57 @@ func (s *store) GetReservesInRange(ctx context.Context, mint string, interval qu return all, nil } + +func (s *store) PutLiveReserveRecord(ctx context.Context, data *currency.ReserveRecord) error { + if err := data.Validate(); err != nil { + return err + } + + s.mu.Lock() + defer s.mu.Unlock() + + if existing, ok := s.liveReserveRecords[data.Mint]; ok { + if data.Slot <= existing.Slot { + return currency.ErrStaleReserveState + } + + cloned := data.Clone() + cloned.Id = existing.Id + s.liveReserveRecords[data.Mint] = cloned + cloned.CopyTo(data) + return nil + } + + data.Id = s.lastLiveReserveIndex + s.liveReserveRecords[data.Mint] = data.Clone() + s.lastLiveReserveIndex++ + + return nil +} + +func (s *store) GetLiveReserve(ctx context.Context, mint string) (*currency.ReserveRecord, error) { + s.mu.Lock() + defer s.mu.Unlock() + + record, ok := s.liveReserveRecords[mint] + if !ok { + return nil, currency.ErrNotFound + } + + return record.Clone(), nil +} + +func (s *store) GetAllLiveReserves(ctx context.Context) (map[string]*currency.ReserveRecord, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.liveReserveRecords) == 0 { + return nil, currency.ErrNotFound + } + + res := make(map[string]*currency.ReserveRecord, len(s.liveReserveRecords)) + for mint, record := range s.liveReserveRecords { + res[mint] = record.Clone() + } + return res, nil +} diff --git a/ocp/data/currency/model.go b/ocp/data/currency/model.go index acc8a7e..97fa5e8 100644 --- a/ocp/data/currency/model.go +++ b/ocp/data/currency/model.go @@ -266,6 +266,7 @@ type ReserveRecord struct { Id uint64 Mint string SupplyFromBonding uint64 + Slot uint64 // Not available for historical records Time time.Time } @@ -286,6 +287,7 @@ func (m *ReserveRecord) Clone() *ReserveRecord { Id: m.Id, Mint: m.Mint, SupplyFromBonding: m.SupplyFromBonding, + Slot: m.Slot, Time: m.Time, } } @@ -294,6 +296,7 @@ func (m *ReserveRecord) CopyTo(dst *ReserveRecord) { dst.Id = m.Id dst.Mint = m.Mint dst.SupplyFromBonding = m.SupplyFromBonding + dst.Slot = m.Slot dst.Time = m.Time } diff --git a/ocp/data/currency/postgres/model.go b/ocp/data/currency/postgres/model.go index 7866af5..a96fae0 100644 --- a/ocp/data/currency/postgres/model.go +++ b/ocp/data/currency/postgres/model.go @@ -19,6 +19,7 @@ const ( exchangeRateTableName = "ocp__core_exchangerate" metadataTableName = "ocp__core_currencymetadata" reserveTableName = "ocp__core_currencyreserve" + liveReserveTableName = "ocp__core_currencyreserve2" dateFormat = "2006-01-02" ) @@ -186,7 +187,7 @@ func fromMetadataModel(obj *metadataModel) *currency.MetadataRecord { } } -type reserveModel struct { +type historicalReserveModel struct { Id sql.NullInt64 `db:"id"` ForDate string `db:"for_date"` ForTimestamp time.Time `db:"for_timestamp"` @@ -194,12 +195,12 @@ type reserveModel struct { SupplyFromBonding uint64 `db:"supply_from_bonding"` } -func toReserveModel(obj *currency.ReserveRecord) (*reserveModel, error) { +func toHistoricalReserveModel(obj *currency.ReserveRecord) (*historicalReserveModel, error) { if err := obj.Validate(); err != nil { return nil, err } - return &reserveModel{ + return &historicalReserveModel{ Id: sql.NullInt64{Int64: int64(obj.Id), Valid: obj.Id > 0}, ForDate: obj.Time.UTC().Format(dateFormat), ForTimestamp: obj.Time.UTC(), @@ -208,7 +209,7 @@ func toReserveModel(obj *currency.ReserveRecord) (*reserveModel, error) { }, nil } -func fromReserveModel(obj *reserveModel) *currency.ReserveRecord { +func fromHistoricalReserveModel(obj *historicalReserveModel) *currency.ReserveRecord { return ¤cy.ReserveRecord{ Id: uint64(obj.Id.Int64), Time: obj.ForTimestamp.UTC(), @@ -217,6 +218,34 @@ func fromReserveModel(obj *reserveModel) *currency.ReserveRecord { } } +type liveReserveModel struct { + Id sql.NullInt64 `db:"id"` + Mint string `db:"mint"` + SupplyFromBonding uint64 `db:"supply_from_bonding"` + Slot uint64 `db:"slot"` + LastUpdatedAt time.Time `db:"last_updated_at"` +} + +func toLiveReserveModel(obj *currency.ReserveRecord) *liveReserveModel { + return &liveReserveModel{ + Id: sql.NullInt64{Int64: int64(obj.Id), Valid: obj.Id > 0}, + Mint: obj.Mint, + SupplyFromBonding: obj.SupplyFromBonding, + Slot: obj.Slot, + LastUpdatedAt: obj.Time.UTC(), + } +} + +func fromLiveReserveModel(obj *liveReserveModel) *currency.ReserveRecord { + return ¤cy.ReserveRecord{ + Id: uint64(obj.Id.Int64), + Mint: obj.Mint, + SupplyFromBonding: obj.SupplyFromBonding, + Slot: obj.Slot, + Time: obj.LastUpdatedAt.UTC(), + } +} + func marshalSocialLinks(links []currency.SocialLink) string { if len(links) == 0 { return "[]" @@ -327,7 +356,7 @@ func (m *metadataModel) dbSave(ctx context.Context, db *sqlx.DB) error { }) } -func (m *reserveModel) dbSave(ctx context.Context, db *sqlx.DB) error { +func (m *historicalReserveModel) dbSave(ctx context.Context, db *sqlx.DB) error { return pgutil.ExecuteInTx(ctx, db, sql.LevelDefault, func(tx *sqlx.Tx) error { err := tx.QueryRowxContext(ctx, `INSERT INTO `+reserveTableName+` @@ -462,8 +491,8 @@ func dbCountMetadataByState(ctx context.Context, db *sqlx.DB, state currency.Met return res, nil } -func dbGetReserveByMintAndTime(ctx context.Context, db *sqlx.DB, mint string, t time.Time, ordering q.Ordering) (*reserveModel, error) { - res := &reserveModel{} +func dbGetReserveByMintAndTime(ctx context.Context, db *sqlx.DB, mint string, t time.Time, ordering q.Ordering) (*historicalReserveModel, error) { + res := &historicalReserveModel{} err := db.GetContext(ctx, res, makeTimeBasedGetQuery(reserveTableName, "mint = $1 AND for_date = $2 AND for_timestamp <= $3", ordering), mint, @@ -473,8 +502,8 @@ func dbGetReserveByMintAndTime(ctx context.Context, db *sqlx.DB, mint string, t return res, pgutil.CheckNoRows(err, currency.ErrNotFound) } -func dbGetAllReservesForRange(ctx context.Context, db *sqlx.DB, mint string, interval q.Interval, start time.Time, end time.Time, ordering q.Ordering) ([]*reserveModel, error) { - res := []*reserveModel{} +func dbGetAllReservesForRange(ctx context.Context, db *sqlx.DB, mint string, interval q.Interval, start time.Time, end time.Time, ordering q.Ordering) ([]*historicalReserveModel, error) { + res := []*historicalReserveModel{} err := db.SelectContext(ctx, &res, makeTimeBasedRangeQuery(reserveTableName, "mint = $1 AND for_timestamp >= $2 AND for_timestamp <= $3", ordering, interval), mint, start.UTC(), end.UTC(), @@ -489,3 +518,51 @@ func dbGetAllReservesForRange(ctx context.Context, db *sqlx.DB, mint string, int return res, nil } + +func (m *liveReserveModel) dbSave(ctx context.Context, db *sqlx.DB) error { + return pgutil.ExecuteInTx(ctx, db, sql.LevelDefault, func(tx *sqlx.Tx) error { + err := tx.QueryRowxContext(ctx, + `INSERT INTO `+liveReserveTableName+` + (mint, supply_from_bonding, slot, last_updated_at) + VALUES ($1, $2, $3, $4) + + ON CONFLICT (mint) + DO UPDATE SET supply_from_bonding = $2, slot = $3, last_updated_at = $4 + WHERE `+liveReserveTableName+`.slot < $3 + + RETURNING id, mint, supply_from_bonding, slot, last_updated_at`, + m.Mint, + m.SupplyFromBonding, + m.Slot, + m.LastUpdatedAt, + ).StructScan(m) + + return pgutil.CheckNoRows(err, currency.ErrStaleReserveState) + }) +} + +func dbGetLiveReserveByMint(ctx context.Context, db *sqlx.DB, mint string) (*liveReserveModel, error) { + res := &liveReserveModel{} + err := db.GetContext(ctx, res, + `SELECT id, mint, supply_from_bonding, slot, last_updated_at + FROM `+liveReserveTableName+` + WHERE mint = $1`, + mint, + ) + return res, pgutil.CheckNoRows(err, currency.ErrNotFound) +} + +func dbGetAllLiveReserves(ctx context.Context, db *sqlx.DB) ([]*liveReserveModel, error) { + var res []*liveReserveModel + err := db.SelectContext(ctx, &res, + `SELECT id, mint, supply_from_bonding, slot, last_updated_at + FROM `+liveReserveTableName, + ) + if err != nil { + return nil, pgutil.CheckNoRows(err, currency.ErrNotFound) + } + if len(res) == 0 { + return nil, currency.ErrNotFound + } + return res, nil +} diff --git a/ocp/data/currency/postgres/store.go b/ocp/data/currency/postgres/store.go index c8fe266..ba80fe2 100644 --- a/ocp/data/currency/postgres/store.go +++ b/ocp/data/currency/postgres/store.go @@ -149,8 +149,8 @@ func (s *store) CountMetadataByState(ctx context.Context, state currency.Metadat return dbCountMetadataByState(ctx, s.db, state) } -func (s *store) PutReserveRecord(ctx context.Context, record *currency.ReserveRecord) error { - model, err := toReserveModel(record) +func (s *store) PutHistoricalReserveRecord(ctx context.Context, record *currency.ReserveRecord) error { + model, err := toHistoricalReserveModel(record) if err != nil { return err } @@ -160,7 +160,7 @@ func (s *store) PutReserveRecord(ctx context.Context, record *currency.ReserveRe return err } - fromReserveModel(model).CopyTo(record) + fromHistoricalReserveModel(model).CopyTo(record) return nil } @@ -170,7 +170,7 @@ func (s *store) GetReserveAtTime(ctx context.Context, mint string, t time.Time) if err != nil { return nil, err } - return fromReserveModel(model), nil + return fromHistoricalReserveModel(model), nil } func (s *store) GetReservesInRange(ctx context.Context, mint string, interval query.Interval, start time.Time, end time.Time, ordering query.Ordering) ([]*currency.ReserveRecord, error) { @@ -198,8 +198,43 @@ func (s *store) GetReservesInRange(ctx context.Context, mint string, interval qu res := []*currency.ReserveRecord{} for _, item := range list { - res = append(res, fromReserveModel(item)) + res = append(res, fromHistoricalReserveModel(item)) } return res, nil } + +func (s *store) PutLiveReserveRecord(ctx context.Context, record *currency.ReserveRecord) error { + model := toLiveReserveModel(record) + + err := model.dbSave(ctx, s.db) + if err != nil { + return err + } + + fromLiveReserveModel(model).CopyTo(record) + + return nil +} + +func (s *store) GetLiveReserve(ctx context.Context, mint string) (*currency.ReserveRecord, error) { + model, err := dbGetLiveReserveByMint(ctx, s.db, mint) + if err != nil { + return nil, err + } + return fromLiveReserveModel(model), nil +} + +func (s *store) GetAllLiveReserves(ctx context.Context) (map[string]*currency.ReserveRecord, error) { + models, err := dbGetAllLiveReserves(ctx, s.db) + if err != nil { + return nil, err + } + + res := make(map[string]*currency.ReserveRecord, len(models)) + for _, model := range models { + record := fromLiveReserveModel(model) + res[record.Mint] = record + } + return res, nil +} diff --git a/ocp/data/currency/postgres/store_test.go b/ocp/data/currency/postgres/store_test.go index 1cb7012..437f10e 100644 --- a/ocp/data/currency/postgres/store_test.go +++ b/ocp/data/currency/postgres/store_test.go @@ -81,6 +81,14 @@ const ( CONSTRAINT ocp__core_currencyreserve__uniq__timestamp__and__mint UNIQUE (for_timestamp, mint) ); + CREATE TABLE ocp__core_currencyreserve2 ( + id serial NOT NULL PRIMARY KEY, + + mint TEXT UNIQUE NOT NULL, + supply_from_bonding BIGINT NOT NULL, + slot BIGINT NOT NULL, + last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL + ); ` // Used for testing ONLY, the table and migrations are external to this repository @@ -88,6 +96,7 @@ const ( DROP TABLE ocp__core_exchangerate; DROP TABLE ocp__core_currencymetadata; DROP TABLE ocp__core_currencyreserve; + DROP TABLE ocp__core_currencyreserve2; ` ) diff --git a/ocp/data/currency/store.go b/ocp/data/currency/store.go index dbbf35a..394620f 100644 --- a/ocp/data/currency/store.go +++ b/ocp/data/currency/store.go @@ -14,6 +14,7 @@ var ( ErrInvalidInterval = errors.New("the provided interval is not valid") ErrExists = errors.New("record exists") ErrStaleMetadataVersion = errors.New("metadata version is stale") + ErrStaleReserveState = errors.New("reserve state is stale") ErrDuplicateCurrency = errors.New("duplicate currency detected") ) @@ -70,8 +71,8 @@ type Store interface { // CountMints returns the total number of currency creator mints CountMints(ctx context.Context) (uint64, error) - // PutReserveRecord puts a currency creator mint reserve records into the store. - PutReserveRecord(ctx context.Context, record *ReserveRecord) error + // PutHistoricalReserveRecord puts a currency creator mint reserve records into the store. + PutHistoricalReserveRecord(ctx context.Context, record *ReserveRecord) error // GetReserveAtTime gets reserve state for a given currency creator mint at a point // in time. If the exact time is not available, the most recent data prior to the @@ -88,4 +89,23 @@ type Store interface { // ErrInvalidRange is returned if the range is not valid // ErrInvalidInterval is returned if the interval is not valid GetReservesInRange(ctx context.Context, mint string, interval query.Interval, start time.Time, end time.Time, ordering query.Ordering) ([]*ReserveRecord, error) + + // PutLiveReserveRecord upserts the latest reserve record for a currency creator + // mint. An upsert is only performed if the provided slot is greater than the slot + // currently stored. + // + // ErrStaleReserveState is returned if the provided slot is not greater than the + // stored slot. + PutLiveReserveRecord(ctx context.Context, record *ReserveRecord) error + + // GetLiveReserve gets the latest live reserve record for a currency creator mint. + // + // ErrNotFound is returned if no live reserve record exists for the provided mint. + GetLiveReserve(ctx context.Context, mint string) (*ReserveRecord, error) + + // GetAllLiveReserves gets the latest live reserve records for all currency + // creator mints. + // + // ErrNotFound is returned if no live reserve records exist. + GetAllLiveReserves(ctx context.Context) (map[string]*ReserveRecord, error) } diff --git a/ocp/data/currency/tests/tests.go b/ocp/data/currency/tests/tests.go index b3d5855..0910e34 100644 --- a/ocp/data/currency/tests/tests.go +++ b/ocp/data/currency/tests/tests.go @@ -26,6 +26,8 @@ func RunTests(t *testing.T, s currency.Store, teardown func()) { testCountMetadataByState, testReserveRoundTrip, testGetReservesInRange, + testLiveReserveRoundTrip, + testGetAllLiveReserves, } { tf(t, s) teardown() @@ -636,9 +638,9 @@ func testReserveRoundTrip(t *testing.T, s currency.Store) { SupplyFromBonding: 1, Time: now, } - require.NoError(t, s.PutReserveRecord(context.Background(), expected)) + require.NoError(t, s.PutHistoricalReserveRecord(context.Background(), expected)) - assert.Equal(t, currency.ErrExists, s.PutReserveRecord(context.Background(), expected)) + assert.Equal(t, currency.ErrExists, s.PutHistoricalReserveRecord(context.Background(), expected)) actual, err := s.GetReserveAtTime(context.Background(), "mint", now) require.NoError(t, err) @@ -677,7 +679,7 @@ func testGetReservesInRange(t *testing.T, s currency.Store) { for _, item := range reserves { itemCopy := item - require.NoError(t, s.PutReserveRecord(context.Background(), &itemCopy)) + require.NoError(t, s.PutHistoricalReserveRecord(context.Background(), &itemCopy)) } result, err := s.GetReservesInRange(context.Background(), mint, query.IntervalRaw, reserves[0].Time, reserves[99].Time, query.Ascending) @@ -827,6 +829,142 @@ func testMetadataSaveWithVersioning(t *testing.T, s currency.Store) { assert.Equal(t, "updatedalt1111111111111111111111111111111111111", actual.Alt) } +func testLiveReserveRoundTrip(t *testing.T, s currency.Store) { + ctx := context.Background() + mint := "live-reserve-mint" + + // No record should exist initially + _, err := s.GetLiveReserve(ctx, mint) + assert.Equal(t, currency.ErrNotFound, err) + + // Insert the first live reserve record + record := ¤cy.ReserveRecord{ + Mint: mint, + SupplyFromBonding: 1000, + Slot: 100, + Time: time.Now(), + } + require.NoError(t, s.PutLiveReserveRecord(ctx, record)) + + // Verify the record was stored + actual, err := s.GetLiveReserve(ctx, mint) + require.NoError(t, err) + assert.Equal(t, mint, actual.Mint) + assert.EqualValues(t, 1000, actual.SupplyFromBonding) + assert.EqualValues(t, 100, actual.Slot) + + // Update with a higher slot should succeed + record = ¤cy.ReserveRecord{ + Mint: mint, + SupplyFromBonding: 2000, + Slot: 200, + Time: time.Now(), + } + require.NoError(t, s.PutLiveReserveRecord(ctx, record)) + + actual, err = s.GetLiveReserve(ctx, mint) + require.NoError(t, err) + assert.EqualValues(t, 2000, actual.SupplyFromBonding) + assert.EqualValues(t, 200, actual.Slot) + + // Update with same slot should return stale error + record = ¤cy.ReserveRecord{ + Mint: mint, + SupplyFromBonding: 3000, + Slot: 200, + Time: time.Now(), + } + assert.Equal(t, currency.ErrStaleReserveState, s.PutLiveReserveRecord(ctx, record)) + + // Update with lower slot should return stale error + record = ¤cy.ReserveRecord{ + Mint: mint, + SupplyFromBonding: 3000, + Slot: 50, + Time: time.Now(), + } + assert.Equal(t, currency.ErrStaleReserveState, s.PutLiveReserveRecord(ctx, record)) + + // Verify original record unchanged after stale attempts + actual, err = s.GetLiveReserve(ctx, mint) + require.NoError(t, err) + assert.EqualValues(t, 2000, actual.SupplyFromBonding) + assert.EqualValues(t, 200, actual.Slot) + + // Different mint should work independently + otherMint := "other-live-mint" + record = ¤cy.ReserveRecord{ + Mint: otherMint, + SupplyFromBonding: 5000, + Slot: 50, + Time: time.Now(), + } + require.NoError(t, s.PutLiveReserveRecord(ctx, record)) + + actual, err = s.GetLiveReserve(ctx, otherMint) + require.NoError(t, err) + assert.EqualValues(t, 5000, actual.SupplyFromBonding) + assert.EqualValues(t, 50, actual.Slot) +} + +func testGetAllLiveReserves(t *testing.T, s currency.Store) { + ctx := context.Background() + + // No records should exist initially + _, err := s.GetAllLiveReserves(ctx) + assert.Equal(t, currency.ErrNotFound, err) + + // Insert live reserves for two mints + record1 := ¤cy.ReserveRecord{ + Mint: "mint-all-live-1", + SupplyFromBonding: 1000, + Slot: 100, + Time: time.Now(), + } + require.NoError(t, s.PutLiveReserveRecord(ctx, record1)) + + // Should return one record + reserves, err := s.GetAllLiveReserves(ctx) + require.NoError(t, err) + assert.Len(t, reserves, 1) + assert.EqualValues(t, 1000, reserves["mint-all-live-1"].SupplyFromBonding) + assert.EqualValues(t, 100, reserves["mint-all-live-1"].Slot) + + record2 := ¤cy.ReserveRecord{ + Mint: "mint-all-live-2", + SupplyFromBonding: 2000, + Slot: 200, + Time: time.Now(), + } + require.NoError(t, s.PutLiveReserveRecord(ctx, record2)) + + // Should return both records + reserves, err = s.GetAllLiveReserves(ctx) + require.NoError(t, err) + assert.Len(t, reserves, 2) + assert.EqualValues(t, 1000, reserves["mint-all-live-1"].SupplyFromBonding) + assert.EqualValues(t, 100, reserves["mint-all-live-1"].Slot) + assert.EqualValues(t, 2000, reserves["mint-all-live-2"].SupplyFromBonding) + assert.EqualValues(t, 200, reserves["mint-all-live-2"].Slot) + + // Update one mint and verify the change is reflected + record1Updated := ¤cy.ReserveRecord{ + Mint: "mint-all-live-1", + SupplyFromBonding: 1500, + Slot: 150, + Time: time.Now(), + } + require.NoError(t, s.PutLiveReserveRecord(ctx, record1Updated)) + + reserves, err = s.GetAllLiveReserves(ctx) + require.NoError(t, err) + assert.Len(t, reserves, 2) + assert.EqualValues(t, 1500, reserves["mint-all-live-1"].SupplyFromBonding) + assert.EqualValues(t, 150, reserves["mint-all-live-1"].Slot) + assert.EqualValues(t, 2000, reserves["mint-all-live-2"].SupplyFromBonding) + assert.EqualValues(t, 200, reserves["mint-all-live-2"].Slot) +} + func assertEquivalentMetadataRecords(t *testing.T, obj1, obj2 *currency.MetadataRecord) { assert.Equal(t, obj1.Name, obj2.Name) assert.Equal(t, obj1.Symbol, obj2.Symbol) diff --git a/ocp/data/internal.go b/ocp/data/internal.go index 4f5809f..a44dfd4 100644 --- a/ocp/data/internal.go +++ b/ocp/data/internal.go @@ -141,9 +141,12 @@ type DatabaseData interface { GetCurrencyMetadataCountByState(ctx context.Context, state currency.MetadataState) (uint64, error) GetAllCurrencyMints(ctx context.Context) ([]string, error) CountCurrencyMints(ctx context.Context) (uint64, error) - PutCurrencyReserve(ctx context.Context, record *currency.ReserveRecord) error + PutHistoricalCurrencyReserve(ctx context.Context, record *currency.ReserveRecord) error GetCurrencyReserveAtTime(ctx context.Context, mint string, t time.Time) (*currency.ReserveRecord, error) GetCurrencyReserveHistory(ctx context.Context, mint string, opts ...query.Option) ([]*currency.ReserveRecord, error) + PutLiveCurrencyReserve(ctx context.Context, record *currency.ReserveRecord) error + GetLiveCurrencyReserve(ctx context.Context, mint string) (*currency.ReserveRecord, error) + GetAllLiveCurrencyReserves(ctx context.Context) (map[string]*currency.ReserveRecord, error) // Deposits // -------------------------------------------------------------------------------- @@ -544,8 +547,8 @@ func (dp *DatabaseProvider) GetAllCurrencyMints(ctx context.Context) ([]string, func (dp *DatabaseProvider) CountCurrencyMints(ctx context.Context) (uint64, error) { return dp.currencies.CountMints(ctx) } -func (dp *DatabaseProvider) PutCurrencyReserve(ctx context.Context, record *currency.ReserveRecord) error { - return dp.currencies.PutReserveRecord(ctx, record) +func (dp *DatabaseProvider) PutHistoricalCurrencyReserve(ctx context.Context, record *currency.ReserveRecord) error { + return dp.currencies.PutHistoricalReserveRecord(ctx, record) } func (dp *DatabaseProvider) GetCurrencyReserveAtTime(ctx context.Context, mint string, t time.Time) (*currency.ReserveRecord, error) { return dp.currencies.GetReserveAtTime(ctx, mint, t) @@ -568,6 +571,15 @@ func (dp *DatabaseProvider) GetCurrencyReserveHistory(ctx context.Context, mint return dp.currencies.GetReservesInRange(ctx, mint, req.Interval, req.Start, req.End, req.SortBy) } +func (dp *DatabaseProvider) PutLiveCurrencyReserve(ctx context.Context, record *currency.ReserveRecord) error { + return dp.currencies.PutLiveReserveRecord(ctx, record) +} +func (dp *DatabaseProvider) GetLiveCurrencyReserve(ctx context.Context, mint string) (*currency.ReserveRecord, error) { + return dp.currencies.GetLiveReserve(ctx, mint) +} +func (dp *DatabaseProvider) GetAllLiveCurrencyReserves(ctx context.Context) (map[string]*currency.ReserveRecord, error) { + return dp.currencies.GetAllLiveReserves(ctx) +} // Deposits // -------------------------------------------------------------------------------- diff --git a/ocp/worker/currency/reserve/runtime.go b/ocp/worker/currency/reserve/runtime.go index 7f92e29..09ddf98 100644 --- a/ocp/worker/currency/reserve/runtime.go +++ b/ocp/worker/currency/reserve/runtime.go @@ -117,7 +117,7 @@ func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) continue } - err = p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ + err = p.data.PutHistoricalCurrencyReserve(ctx, ¤cy.ReserveRecord{ Mint: mint.PublicKey().ToBase58(), SupplyFromBonding: circulatingSupply, Time: ts, From 7e8ac70f2dba7b742f0ea494e082cdeb360883cb Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Thu, 5 Mar 2026 13:53:00 -0500 Subject: [PATCH 2/7] Currency reserve worker no longer polls blockchain --- ocp/worker/currency/reserve/metrics.go | 4 +- ocp/worker/currency/reserve/runtime.go | 78 +++++--------------------- 2 files changed, 15 insertions(+), 67 deletions(-) diff --git a/ocp/worker/currency/reserve/metrics.go b/ocp/worker/currency/reserve/metrics.go index 9db9427..c608c71 100644 --- a/ocp/worker/currency/reserve/metrics.go +++ b/ocp/worker/currency/reserve/metrics.go @@ -12,7 +12,7 @@ const ( reserveStateEventName = "CurrencyReserveStateObserved" ) -func recordReserveStateEvent(ctx context.Context, mint *common.Account, supply uint64) { +func recordReserveStateEvent(ctx context.Context, mint string, supply uint64) { if !common.IsCoreMintUsdStableCoin() { return } @@ -20,7 +20,7 @@ func recordReserveStateEvent(ctx context.Context, mint *common.Account, supply u price, _ := currencycreator.EstimateCurrentPrice(supply).Float64() usdMarketCap := price * (float64(supply) / float64(currencycreator.DefaultMintQuarksPerUnit)) metrics.RecordEvent(ctx, reserveStateEventName, map[string]interface{}{ - "mint": mint.PublicKey().ToBase58(), + "mint": mint, "supply": supply, "usd_market_cap": usdMarketCap, }) diff --git a/ocp/worker/currency/reserve/runtime.go b/ocp/worker/currency/reserve/runtime.go index 09ddf98..347024a 100644 --- a/ocp/worker/currency/reserve/runtime.go +++ b/ocp/worker/currency/reserve/runtime.go @@ -9,7 +9,6 @@ import ( "github.com/code-payments/ocp-server/metrics" "github.com/code-payments/ocp-server/ocp/common" - currency_util "github.com/code-payments/ocp-server/ocp/currency" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/currency" "github.com/code-payments/ocp-server/ocp/worker" @@ -31,21 +30,18 @@ func New(log *zap.Logger, data ocp_data.Provider) worker.Runtime { } func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duration) error { - p.refreshMints(runtimeCtx) - go p.pollMints(runtimeCtx, interval/3) - for { start := time.Now() func() { - p.log.Debug("updating reserves") + p.log.Debug("updating historical reserves") provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) trace := provider.StartTrace("currency_reserve_runtime") defer trace.End() tracedCtx := metrics.NewContext(runtimeCtx, trace) - p.UpdateAllLaunchpadCurrencyReserves(tracedCtx) + p.UpdateAllHistoricalLaunchpadCurrencyReserves(tracedCtx) }() delay := max(interval-time.Since(start), 0) @@ -57,76 +53,28 @@ func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duratio } } -func (p *reserveRuntime) pollMints(ctx context.Context, interval time.Duration) { - // Initial fetch before the first reserve update - p.refreshMints(ctx) - - for { - select { - case <-ctx.Done(): - return - case <-time.After(interval): - p.refreshMints(ctx) - } - } -} +func (p *reserveRuntime) UpdateAllHistoricalLaunchpadCurrencyReserves(ctx context.Context) { + now := time.Now() -func (p *reserveRuntime) refreshMints(ctx context.Context) { - mintStrings, err := p.data.GetAllCurrencyMints(ctx) + liveReserveStatesByMint, err := p.data.GetAllLiveCurrencyReserves(ctx) if err != nil { - p.log.With(zap.Error(err)).Warn("failed to refresh currency mints") + p.log.With(zap.Error(err)).Warn("failed getting all live reserve states") return } - var mints []*common.Account - for _, mint := range mintStrings { - account, err := common.NewAccountFromPublicKeyString(mint) - if err != nil { - p.log.With(zap.Error(err), zap.String("mint", mint)).Warn("invalid mint public key") - continue - } - - if common.IsCoreMint(account) { - continue - } - - mints = append(mints, account) - } - - p.mintsMu.Lock() - p.mints = mints - p.mintsMu.Unlock() -} - -func (p *reserveRuntime) getMints() []*common.Account { - p.mintsMu.RLock() - defer p.mintsMu.RUnlock() - - return p.mints -} - -func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) { - mints := p.getMints() - - for _, mint := range mints { - log := p.log.With(zap.String("mint", mint.PublicKey().ToBase58())) - - circulatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, mint) - if err != nil { - log.With(zap.Error(err)).Warn("failed to get circulating supply") - continue - } + for mint, reserveRecord := range liveReserveStatesByMint { + log := p.log.With(zap.String("mint", mint)) err = p.data.PutHistoricalCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: mint.PublicKey().ToBase58(), - SupplyFromBonding: circulatingSupply, - Time: ts, + Mint: mint, + SupplyFromBonding: reserveRecord.SupplyFromBonding, + Time: now, }) if err != nil { - log.With(zap.Error(err)).Warn("failed to put currency reserve") + log.With(zap.Error(err)).Warn("failed to put historical currency reserve") continue } - recordReserveStateEvent(ctx, mint, circulatingSupply) + recordReserveStateEvent(ctx, mint, reserveRecord.SupplyFromBonding) } } From 3cad94c9bd41377f7b10c2310fb56e11cc22b7c8 Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Thu, 5 Mar 2026 14:17:34 -0500 Subject: [PATCH 3/7] Geyser now updates live currency reserve state --- ocp/worker/geyser/currency_reserve.go | 50 +++++++++++++++++++++++++++ ocp/worker/geyser/handler.go | 15 +++++--- 2 files changed, 60 insertions(+), 5 deletions(-) create mode 100644 ocp/worker/geyser/currency_reserve.go diff --git a/ocp/worker/geyser/currency_reserve.go b/ocp/worker/geyser/currency_reserve.go new file mode 100644 index 0000000..6cbcf54 --- /dev/null +++ b/ocp/worker/geyser/currency_reserve.go @@ -0,0 +1,50 @@ +package geyser + +import ( + "context" + "time" + + "github.com/code-payments/ocp-server/cache" + "github.com/code-payments/ocp-server/ocp/common" + ocp_data "github.com/code-payments/ocp-server/ocp/data" + "github.com/code-payments/ocp-server/ocp/data/currency" + "github.com/code-payments/ocp-server/solana/currencycreator" +) + +var ( + liquidityPoolVaultCache = cache.NewCache(10_000) +) + +func processPotentialCirculatingSupplyUpdate(ctx context.Context, data ocp_data.Provider, tokenAccount, mintAccount *common.Account, amount uint64, slot uint64) error { + cachedVault, ok := liquidityPoolVaultCache.Retrieve(mintAccount.PublicKey().ToBase58()) + if !ok { + metadataRecord, err := data.GetCurrencyMetadata(ctx, mintAccount.PublicKey().ToBase58()) + if err == currency.ErrNotFound { + return nil + } else if err != nil { + return err + } + + cachedVault = metadataRecord.VaultMint + liquidityPoolVaultCache.Insert(mintAccount.PublicKey().ToBase58(), cachedVault, 1) + } + + if cachedVault != tokenAccount.PublicKey().ToBase58() { + return nil + } + + return updateLiveReserveState(ctx, data, mintAccount.PublicKey().ToBase58(), amount, slot) +} + +func updateLiveReserveState(ctx context.Context, data ocp_data.Provider, mint string, vaultAmount uint64, slot uint64) error { + err := data.PutLiveCurrencyReserve(ctx, ¤cy.ReserveRecord{ + Mint: mint, + SupplyFromBonding: currencycreator.DefaultMintMaxQuarkSupply - vaultAmount, + Slot: slot, + Time: time.Now(), + }) + if err == currency.ErrStaleReserveState { + return nil + } + return err +} diff --git a/ocp/worker/geyser/handler.go b/ocp/worker/geyser/handler.go index 1c77fed..96dabda 100644 --- a/ocp/worker/geyser/handler.go +++ b/ocp/worker/geyser/handler.go @@ -75,11 +75,6 @@ func (h *TokenProgramAccountHandler) Handle(ctx context.Context, update *geyserp return errors.Wrap(err, "invalid owner account") } - // Not an ATA, so filter it out. It cannot be a VM deposit ATA - if bytes.Equal(tokenAccount.PublicKey().ToBytes(), ownerAccount.PublicKey().ToBytes()) { - return nil - } - mintAccount, err := common.NewAccountFromPublicKeyBytes(unmarshalled.Mint) if err != nil { return errors.Wrap(err, "invalid mint account") @@ -93,6 +88,16 @@ func (h *TokenProgramAccountHandler) Handle(ctx context.Context, update *geyserp return nil } + err = processPotentialCirculatingSupplyUpdate(ctx, h.data, tokenAccount, mintAccount, unmarshalled.Amount, update.Slot) + if err != nil { + return errors.Wrap(err, "error processing potential currency circulating supply update") + } + + // Not an ATA, so filter it out. It cannot be a VM deposit ATA + if bytes.Equal(tokenAccount.PublicKey().ToBytes(), ownerAccount.PublicKey().ToBytes()) { + return nil + } + exists, userAuthorityAccount, err := testForKnownUserAuthorityFromDepositPda(ctx, h.data, ownerAccount) if err != nil { return errors.Wrap(err, "error testing for user authority from deposit pda") From 94cbaf312f919ccf4af17336563044f1d61224f9 Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Fri, 6 Mar 2026 08:32:52 -0500 Subject: [PATCH 4/7] Swap worker updates live reserve state --- ocp/transaction/token_balances.go | 13 +++++++ ocp/worker/swap/util.go | 64 ++++++++++++++++++++++++++++--- ocp/worker/swap/worker.go | 12 +++++- 3 files changed, 82 insertions(+), 7 deletions(-) diff --git a/ocp/transaction/token_balances.go b/ocp/transaction/token_balances.go index 8aae347..fa199b4 100644 --- a/ocp/transaction/token_balances.go +++ b/ocp/transaction/token_balances.go @@ -33,3 +33,16 @@ func GetDeltaQuarksFromTokenBalances(tokenAccount *common.Account, tokenBalances return postQuarkBalance - preQuarkBalance, nil } + +func GetPostQuarksFromTokenBalances(tokenAccount *common.Account, tokenBalances *solana.TransactionTokenBalances) (uint64, bool, error) { + for _, tokenBalance := range tokenBalances.PostTokenBalances { + if tokenBalances.Accounts[tokenBalance.AccountIndex] == tokenAccount.PublicKey().ToBase58() { + balance, err := strconv.ParseUint(tokenBalance.TokenAmount.Amount, 10, 64) + if err != nil { + return 0, false, errors.Wrap(err, "error parsing post token balance") + } + return balance, true, nil + } + } + return 0, false, nil +} diff --git a/ocp/worker/swap/util.go b/ocp/worker/swap/util.go index 61f213c..784e44d 100644 --- a/ocp/worker/swap/util.go +++ b/ocp/worker/swap/util.go @@ -15,6 +15,7 @@ import ( currency_lib "github.com/code-payments/ocp-server/currency" "github.com/code-payments/ocp-server/ocp/common" currency_util "github.com/code-payments/ocp-server/ocp/currency" + "github.com/code-payments/ocp-server/ocp/data/currency" "github.com/code-payments/ocp-server/ocp/data/deposit" "github.com/code-payments/ocp-server/ocp/data/intent" "github.com/code-payments/ocp-server/ocp/data/nonce" @@ -23,6 +24,7 @@ import ( transaction_util "github.com/code-payments/ocp-server/ocp/transaction" vm_util "github.com/code-payments/ocp-server/ocp/vm" "github.com/code-payments/ocp-server/solana" + "github.com/code-payments/ocp-server/solana/currencycreator" ) func (p *runtime) validateSwapState(record *swap.Record, states ...swap.State) error { @@ -132,7 +134,7 @@ func (p *runtime) submitTransaction(ctx context.Context, record *swap.Record) er return nil } -func (p *runtime) updateBalancesForFinalizedSwap(ctx context.Context, swapRecord *swap.Record) (uint64, error) { +func (p *runtime) updateBalancesForFinalizedSwap(ctx context.Context, swapRecord *swap.Record, tokenBalances *solana.TransactionTokenBalances) (uint64, error) { owner, err := common.NewAccountFromPublicKeyString(swapRecord.Owner) if err != nil { return 0, err @@ -165,11 +167,6 @@ func (p *runtime) updateBalancesForFinalizedSwap(ctx context.Context, swapRecord return 0, err } - tokenBalances, err := p.data.GetBlockchainTransactionTokenBalances(ctx, swapRecord.TransactionSignature) - if err != nil { - return 0, err - } - deltaQuarksIntoOmnibus, err := transaction_util.GetDeltaQuarksFromTokenBalances(destinationVmConfig.Omnibus, tokenBalances) if err != nil { return 0, err @@ -526,6 +523,61 @@ func (p *runtime) ensureSwapDestinationIsInitialized(ctx context.Context, record return vm_util.EnsureVirtualTimelockAccountIsInitialized(ctx, p.data, destinationTimelockVault, true) } +func (p *runtime) updateLiveReserveStateForFinalizedSwap(ctx context.Context, swapRecord *swap.Record, tokenBalances *solana.TransactionTokenBalances) error { + fromMint, err := common.NewAccountFromPublicKeyString(swapRecord.FromMint) + if err != nil { + return err + } + + toMint, err := common.NewAccountFromPublicKeyString(swapRecord.ToMint) + if err != nil { + return err + } + + var currencyMints []*common.Account + if !common.IsCoreMint(fromMint) { + currencyMints = append(currencyMints, fromMint) + } + if !common.IsCoreMint(toMint) { + currencyMints = append(currencyMints, toMint) + } + + for _, mint := range currencyMints { + metadataRecord, err := p.data.GetCurrencyMetadata(ctx, mint.PublicKey().ToBase58()) + if err != nil { + return err + } + + vaultMint, err := common.NewAccountFromPublicKeyString(metadataRecord.VaultMint) + if err != nil { + return err + } + + postBalance, ok, err := transaction_util.GetPostQuarksFromTokenBalances(vaultMint, tokenBalances) + if err != nil { + return err + } + if !ok { + continue + } + + err = p.data.PutLiveCurrencyReserve(ctx, ¤cy.ReserveRecord{ + Mint: mint.PublicKey().ToBase58(), + SupplyFromBonding: currencycreator.DefaultMintMaxQuarkSupply - postBalance, + Slot: tokenBalances.Slot, + Time: time.Now(), + }) + if err == currency.ErrStaleReserveState { + continue + } + if err != nil { + return err + } + } + + return nil +} + func getSwapDepositIntentID(signature string, destination *common.Account) string { combined := fmt.Sprintf("%s-%s", signature, destination.PublicKey().ToBase58()) hashed := sha256.Sum256([]byte(combined)) diff --git a/ocp/worker/swap/worker.go b/ocp/worker/swap/worker.go index 50f055b..8f0a01b 100644 --- a/ocp/worker/swap/worker.go +++ b/ocp/worker/swap/worker.go @@ -213,7 +213,17 @@ func (p *runtime) handleStateSubmitting(ctx context.Context, record *swap.Record // todo: Recovery flow to put back source funds into the source VM return p.markSwapFailed(ctx, record) } else { - quarksBought, err := p.updateBalancesForFinalizedSwap(ctx, record) + tokenBalances, err := p.data.GetBlockchainTransactionTokenBalances(ctx, record.TransactionSignature) + if err != nil { + return errors.Wrap(err, "error getting transaction token balances") + } + + err = p.updateLiveReserveStateForFinalizedSwap(ctx, record, tokenBalances) + if err != nil { + return errors.Wrap(err, "error updating live reserve state") + } + + quarksBought, err := p.updateBalancesForFinalizedSwap(ctx, record, tokenBalances) if err != nil { return errors.Wrap(err, "error updating balances") } From 593aba58914ebb12079bb9539ac5681f11465465 Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Fri, 6 Mar 2026 09:10:55 -0500 Subject: [PATCH 5/7] Currency reserve worker now updates live state if it is stale --- ocp/currency/reserve.go | 14 +++---- ocp/rpc/currency/worker.go | 2 +- ocp/worker/currency/reserve/config.go | 31 ++++++++++++++ ocp/worker/currency/reserve/runtime.go | 58 ++++++++++++++++++++------ 4 files changed, 85 insertions(+), 20 deletions(-) create mode 100644 ocp/worker/currency/reserve/config.go diff --git a/ocp/currency/reserve.go b/ocp/currency/reserve.go index 8f90553..d59e3a9 100644 --- a/ocp/currency/reserve.go +++ b/ocp/currency/reserve.go @@ -14,26 +14,26 @@ import ( // GetLaunchpadCurrencyCirculatingSupply gets the current circulating supply in // quarks for a launchpad currency directly from the blockchain -func GetLaunchpadCurrencyCirculatingSupply(ctx context.Context, data ocp_data.Provider, mint *common.Account) (uint64, time.Time, error) { +func GetLaunchpadCurrencyCirculatingSupply(ctx context.Context, data ocp_data.Provider, mint *common.Account) (uint64, uint64, time.Time, error) { metadataRecord, err := data.GetCurrencyMetadata(ctx, mint.PublicKey().ToBase58()) if err != nil { - return 0, time.Time{}, err + return 0, 0, time.Time{}, err } accounts, err := common.GetLaunchpadCurrencyAccounts(metadataRecord) if err != nil { - return 0, time.Time{}, err + return 0, 0, time.Time{}, err } - ai, _, err := data.GetBlockchainAccountInfo(ctx, accounts.VaultMint.PublicKey().ToBase58(), solana.CommitmentFinalized) + ai, slot, err := data.GetBlockchainAccountInfo(ctx, accounts.VaultMint.PublicKey().ToBase58(), solana.CommitmentFinalized) if err != nil { - return 0, time.Time{}, err + return 0, 0, time.Time{}, err } var tokenAccount token.Account if !tokenAccount.Unmarshal(ai.Data) { - return 0, time.Time{}, errors.New("invalid token account state") + return 0, 0, time.Time{}, errors.New("invalid token account state") } - return currencycreator.DefaultMintMaxQuarkSupply - tokenAccount.Amount, time.Now(), nil + return currencycreator.DefaultMintMaxQuarkSupply - tokenAccount.Amount, slot, time.Now(), nil } diff --git a/ocp/rpc/currency/worker.go b/ocp/rpc/currency/worker.go index 3c1d82d..83e42a6 100644 --- a/ocp/rpc/currency/worker.go +++ b/ocp/rpc/currency/worker.go @@ -349,7 +349,7 @@ func (m *liveMintStateWorker) pollReserveState(ctx context.Context) { func (m *liveMintStateWorker) fetchAndUpdateReserveState(ctx context.Context, mint *common.Account) *liveReserveStateData { mintAddr := mint.PublicKey().ToBase58() - supply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, m.data, mint) + supply, _, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, m.data, mint) if err != nil { m.log.With( zap.Error(err), diff --git a/ocp/worker/currency/reserve/config.go b/ocp/worker/currency/reserve/config.go new file mode 100644 index 0000000..0b2baf7 --- /dev/null +++ b/ocp/worker/currency/reserve/config.go @@ -0,0 +1,31 @@ +package reserve + +import ( + "time" + + "github.com/code-payments/ocp-server/config" + "github.com/code-payments/ocp-server/config/env" +) + +const ( + envConfigPrefix = "CURRENCY_RESERVE_RUNTIME_" + + StaleThresholdConfigEnvName = envConfigPrefix + "STALE_THRESHOLD" + defaultStaleThreshold = 24 * time.Hour +) + +type conf struct { + staleThreshold config.Duration +} + +// ConfigProvider defines how config values are pulled +type ConfigProvider func() *conf + +// WithEnvConfigs returns configuration pulled from environment variables +func WithEnvConfigs() ConfigProvider { + return func() *conf { + return &conf{ + staleThreshold: env.NewDurationConfig(StaleThresholdConfigEnvName, defaultStaleThreshold), + } + } +} diff --git a/ocp/worker/currency/reserve/runtime.go b/ocp/worker/currency/reserve/runtime.go index 347024a..7b4ef82 100644 --- a/ocp/worker/currency/reserve/runtime.go +++ b/ocp/worker/currency/reserve/runtime.go @@ -2,13 +2,13 @@ package reserve import ( "context" - "sync" "time" "go.uber.org/zap" "github.com/code-payments/ocp-server/metrics" "github.com/code-payments/ocp-server/ocp/common" + currency_util "github.com/code-payments/ocp-server/ocp/currency" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/currency" "github.com/code-payments/ocp-server/ocp/worker" @@ -16,15 +16,14 @@ import ( type reserveRuntime struct { log *zap.Logger + conf *conf data ocp_data.Provider - - mintsMu sync.RWMutex - mints []*common.Account } -func New(log *zap.Logger, data ocp_data.Provider) worker.Runtime { +func New(log *zap.Logger, data ocp_data.Provider, configProvider ConfigProvider) worker.Runtime { return &reserveRuntime{ log: log, + conf: configProvider(), data: data, } } @@ -34,14 +33,14 @@ func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duratio start := time.Now() func() { - p.log.Debug("updating historical reserves") + p.log.Debug("updating reserves") provider := runtimeCtx.Value(metrics.ProviderContextKey).(metrics.Provider) trace := provider.StartTrace("currency_reserve_runtime") defer trace.End() tracedCtx := metrics.NewContext(runtimeCtx, trace) - p.UpdateAllHistoricalLaunchpadCurrencyReserves(tracedCtx) + p.UpdateAllLaunchpadCurrencyReserves(tracedCtx) }() delay := max(interval-time.Since(start), 0) @@ -53,8 +52,8 @@ func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duratio } } -func (p *reserveRuntime) UpdateAllHistoricalLaunchpadCurrencyReserves(ctx context.Context) { - now := time.Now() +func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) { + staleThreshold := p.conf.staleThreshold.Get(ctx) liveReserveStatesByMint, err := p.data.GetAllLiveCurrencyReserves(ctx) if err != nil { @@ -62,12 +61,20 @@ func (p *reserveRuntime) UpdateAllHistoricalLaunchpadCurrencyReserves(ctx contex return } - for mint, reserveRecord := range liveReserveStatesByMint { + for mint, liveReserveRecord := range liveReserveStatesByMint { log := p.log.With(zap.String("mint", mint)) + now := time.Now() + if now.Sub(liveReserveRecord.Time) >= staleThreshold { + newReserveRecord, err := p.refreshLiveReserveState(ctx, log, mint) + if err == nil { + liveReserveRecord = newReserveRecord + } + } + err = p.data.PutHistoricalCurrencyReserve(ctx, ¤cy.ReserveRecord{ Mint: mint, - SupplyFromBonding: reserveRecord.SupplyFromBonding, + SupplyFromBonding: liveReserveRecord.SupplyFromBonding, Time: now, }) if err != nil { @@ -75,6 +82,33 @@ func (p *reserveRuntime) UpdateAllHistoricalLaunchpadCurrencyReserves(ctx contex continue } - recordReserveStateEvent(ctx, mint, reserveRecord.SupplyFromBonding) + recordReserveStateEvent(ctx, mint, liveReserveRecord.SupplyFromBonding) + } +} + +func (p *reserveRuntime) refreshLiveReserveState(ctx context.Context, log *zap.Logger, mint string) (*currency.ReserveRecord, error) { + mintAccount, err := common.NewAccountFromPublicKeyString(mint) + if err != nil { + log.With(zap.Error(err)).Warn("invalid mint public key") + return nil, err + } + + circulatingSupply, slot, _, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, mintAccount) + if err != nil { + log.With(zap.Error(err)).Warn("failed to get circulating supply from blockchain") + return nil, err + } + + record := ¤cy.ReserveRecord{ + Mint: mint, + SupplyFromBonding: circulatingSupply, + Slot: slot, + Time: time.Now(), + } + err = p.data.PutLiveCurrencyReserve(ctx, record) + if err != nil && err != currency.ErrStaleReserveState { + log.With(zap.Error(err)).Warn("failed to update live reserve state") + return nil, err } + return record, nil } From fd7f5365ea7514b97667fdd576aa72389b00984c Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Fri, 6 Mar 2026 10:00:18 -0500 Subject: [PATCH 6/7] Initial reserve states are populated on currency launch --- ocp/worker/currency/launcher/util.go | 23 +++++++++++++++++++++++ ocp/worker/currency/launcher/worker.go | 8 +++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/ocp/worker/currency/launcher/util.go b/ocp/worker/currency/launcher/util.go index 5858c39..1292863 100644 --- a/ocp/worker/currency/launcher/util.go +++ b/ocp/worker/currency/launcher/util.go @@ -108,6 +108,29 @@ func (p *runtime) markCurrencyMetadataAvailable(ctx context.Context, record *cur return p.data.SaveCurrencyMetadata(ctx, record) } +func (p *runtime) putInitialReserveState(ctx context.Context, record *currency.MetadataRecord) error { + err := p.data.PutLiveCurrencyReserve(ctx, ¤cy.ReserveRecord{ + Mint: record.Mint, + SupplyFromBonding: 0, + Slot: 0, + Time: record.CreatedAt, + }) + if err != nil { + return errors.Wrap(err, "error putting initial live reserve state") + } + + err = p.data.PutHistoricalCurrencyReserve(ctx, ¤cy.ReserveRecord{ + Mint: record.Mint, + SupplyFromBonding: 0, + Time: record.CreatedAt, + }) + if err != nil { + return errors.Wrap(err, "error putting initial historical reserve state") + } + + return nil +} + func (p *runtime) validateVmMetadataState(record *vm_metadata.Record, states ...vm_metadata.State) error { if slices.Contains(states, record.State) { return nil diff --git a/ocp/worker/currency/launcher/worker.go b/ocp/worker/currency/launcher/worker.go index ea2f754..565d155 100644 --- a/ocp/worker/currency/launcher/worker.go +++ b/ocp/worker/currency/launcher/worker.go @@ -345,6 +345,12 @@ func (p *runtime) handleStateFinalValidation(ctx context.Context, currencyMetada if err != nil { return err } - return p.markCurrencyMetadataAvailable(ctx, currencyMetadataRecord) + + err = p.markCurrencyMetadataAvailable(ctx, currencyMetadataRecord) + if err != nil { + return err + } + + return p.putInitialReserveState(ctx, currencyMetadataRecord) }) } From cdf4c6515acc73309d971b09b1221add6d894a1f Mon Sep 17 00:00:00 2001 From: jeffyanta Date: Fri, 6 Mar 2026 13:53:48 -0500 Subject: [PATCH 7/7] Update currency RPC worker to poll against the live data in the currency store --- ocp/rpc/currency/live_data.go | 21 ++-- ocp/rpc/currency/metadata.go | 19 +--- ocp/rpc/currency/stream.go | 18 ++-- ocp/rpc/currency/worker.go | 196 +++++++++++----------------------- 4 files changed, 85 insertions(+), 169 deletions(-) diff --git a/ocp/rpc/currency/live_data.go b/ocp/rpc/currency/live_data.go index b1e8ca3..3f2473b 100644 --- a/ocp/rpc/currency/live_data.go +++ b/ocp/rpc/currency/live_data.go @@ -55,12 +55,6 @@ func (s *currencyServer) StreamLiveMintData( requestedMints = append(requestedMints, mint) } - // Track requested mints so the worker polls their reserve state - if err := s.liveMintStateWorker.trackMints(ctx, requestedMints); err != nil { - log.With(zap.Error(err)).Warn("failed to track requested mints") - return status.Error(codes.Internal, "") - } - // Generate unique stream ID streamID := uuid.New().String() log = log.With(zap.String("stream_id", streamID)) @@ -97,10 +91,19 @@ func (s *currencyServer) StreamLiveMintData( continue } - err := s.liveMintStateWorker.waitForReserveState(ctx, mint) + isSupported, err := common.IsSupportedMint(ctx, s.data, mint) if err != nil { - log.With(zap.Error(err)).Debug("context cancelled while waiting for reserve state") - return status.Error(codes.Canceled, "") + log.With(zap.Error(err)).Warn("failed to validate mint") + return status.Error(codes.Internal, "") + } + if !isSupported { + continue + } + + err = s.liveMintStateWorker.waitForReserveState(ctx, mint, 2*s.conf.reserveStatePollInterval.Get(ctx)) + if err != nil { + log.With(zap.Error(err)).Debug("failed to wait for live mint reserve state") + return status.Error(codes.Internal, "") } state, err := s.liveMintStateWorker.getReserveState(mint) diff --git a/ocp/rpc/currency/metadata.go b/ocp/rpc/currency/metadata.go index daead13..9ec17a5 100644 --- a/ocp/rpc/currency/metadata.go +++ b/ocp/rpc/currency/metadata.go @@ -29,23 +29,6 @@ func (s *currencyServer) GetMints(ctx context.Context, req *currencypb.GetMintsR log := s.log.With(zap.String("method", "GetMints")) log = client.InjectLoggingMetadata(ctx, log) - // Track all requested mints so the worker polls their reserve state - var requestedMints []*common.Account - for _, protoMintAddress := range req.Addresses { - mintAccount, err := common.NewAccountFromProto(protoMintAddress) - if err != nil { - continue - } - requestedMints = append(requestedMints, mintAccount) - } - if err := s.liveMintStateWorker.trackMints(ctx, requestedMints); err != nil { - if err == errMintNotSupported { - return ¤cypb.GetMintsResponse{Result: currencypb.GetMintsResponse_NOT_FOUND}, nil - } - log.With(zap.Error(err)).Warn("failed to track requested mints") - return nil, status.Error(codes.Internal, "") - } - resp := ¤cypb.GetMintsResponse{ MetadataByAddress: make(map[string]*currencypb.Mint), } @@ -153,7 +136,7 @@ func (s *currencyServer) GetMints(ctx context.Context, req *currencypb.GetMintsR return nil, status.Error(codes.Internal, "") } - err = s.liveMintStateWorker.waitForReserveState(ctx, mintAccount) + err = s.liveMintStateWorker.waitForReserveState(ctx, mintAccount, 2*s.conf.reserveStatePollInterval.Get(ctx)) if err != nil { log.With(zap.Error(err)).Warn("failed to wait for live mint reserve state") return nil, status.Error(codes.Internal, "") diff --git a/ocp/rpc/currency/stream.go b/ocp/rpc/currency/stream.go index d3afdc4..3de70ca 100644 --- a/ocp/rpc/currency/stream.go +++ b/ocp/rpc/currency/stream.go @@ -2,7 +2,6 @@ package currency import ( "sync" - "time" "github.com/pkg/errors" @@ -12,8 +11,7 @@ import ( ) const ( - streamNotifyTimeout = 10 * time.Second - streamBufferSize = 100 + streamBufferSize = 100 ) // streamUpdate represents an update to send to streams (pre-signed) @@ -44,17 +42,17 @@ func newLiveMintDataStream(id string, mints []*common.Account, bufferSize int) * } } -func (s *liveMintDataStream) notifyExchangeRates(data *liveExchangeRateData, timeout time.Duration) error { +func (s *liveMintDataStream) notifyExchangeRates(data *liveExchangeRateData) error { if data.SignedResponse == nil { return errors.New("exchange rates missing pre-signed response") } update := &streamUpdate{ response: data.SignedResponse, } - return s.notify(update, timeout) + return s.notify(update) } -func (s *liveMintDataStream) notifyReserveStates(states []*liveReserveStateData, timeout time.Duration) error { +func (s *liveMintDataStream) notifyReserveStates(states []*liveReserveStateData) error { // Filter reserve states based on subscribed mints var filtered []*currencypb.VerifiedLaunchpadCurrencyReserveState for _, state := range states { @@ -87,10 +85,10 @@ func (s *liveMintDataStream) notifyReserveStates(states []*liveReserveStateData, update := &streamUpdate{ response: response, } - return s.notify(update, timeout) + return s.notify(update) } -func (s *liveMintDataStream) notify(update *streamUpdate, timeout time.Duration) error { +func (s *liveMintDataStream) notify(update *streamUpdate) error { s.Lock() if s.closed { @@ -100,10 +98,10 @@ func (s *liveMintDataStream) notify(update *streamUpdate, timeout time.Duration) select { case s.streamCh <- update: - case <-time.After(timeout): + default: s.Unlock() s.close() - return errors.New("timed out sending data to streamCh") + return errors.New("streamCh is full; closing stream that's falling behind") } s.Unlock() diff --git a/ocp/rpc/currency/worker.go b/ocp/rpc/currency/worker.go index 83e42a6..866c834 100644 --- a/ocp/rpc/currency/worker.go +++ b/ocp/rpc/currency/worker.go @@ -1,7 +1,6 @@ package currency import ( - "bytes" "context" "crypto/ed25519" "sync" @@ -16,20 +15,13 @@ import ( "github.com/code-payments/ocp-server/ocp/auth" "github.com/code-payments/ocp-server/ocp/common" - currency_util "github.com/code-payments/ocp-server/ocp/currency" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/currency" ) -var ( - errMintNotTracked = errors.New("mint is not being tracked") - errMintNotSupported = errors.New("mint is not supported") -) - // liveExchangeRateData represents live exchange rate data with its pre-signed response type liveExchangeRateData struct { Rates map[string]float64 - Timestamp time.Time SignedResponse *currencypb.StreamLiveMintDataResponse } @@ -37,7 +29,6 @@ type liveExchangeRateData struct { type liveReserveStateData struct { Mint *common.Account SupplyFromBonding uint64 - Timestamp time.Time SignedState *currencypb.VerifiedLaunchpadCurrencyReserveState } @@ -46,9 +37,6 @@ type liveMintStateWorker struct { conf *conf data ocp_data.Provider - mintsMu sync.RWMutex - trackedMints map[string]*common.Account - stateMu sync.RWMutex exchangeRates *liveExchangeRateData launchpadReserves map[string]*liveReserveStateData @@ -63,6 +51,8 @@ type liveMintStateWorker struct { reserveReadyMu sync.Mutex reserveReadyChans map[string]chan struct{} + reservePollTrigger chan struct{} + ctx context.Context cancel context.CancelFunc } @@ -73,11 +63,11 @@ func newLiveMintStateWorker(log *zap.Logger, data ocp_data.Provider, conf *conf) log: log, conf: conf, data: data, - trackedMints: make(map[string]*common.Account), launchpadReserves: make(map[string]*liveReserveStateData), streams: make(map[string]*liveMintDataStream), exchangeRatesReady: make(chan struct{}), reserveReadyChans: make(map[string]chan struct{}), + reservePollTrigger: make(chan struct{}, 1), ctx: ctx, cancel: cancel, } @@ -106,54 +96,14 @@ func (m *liveMintStateWorker) stop() { m.streams = make(map[string]*liveMintDataStream) } -// getTrackedMints returns the current set of dynamically tracked mints -func (m *liveMintStateWorker) getTrackedMints() map[string]*common.Account { - m.mintsMu.RLock() - defer m.mintsMu.RUnlock() - - result := make(map[string]*common.Account, len(m.trackedMints)) - for k, v := range m.trackedMints { - result[k] = v - } - return result -} - -// trackMints validates and adds mints to the tracked set. Only mints that -// pass IsSupportedMint validation are added. Core mint is excluded. Returns -// an error if any non-core mint is unsupported or cannot be validated. -func (m *liveMintStateWorker) trackMints(ctx context.Context, mints []*common.Account) error { - for _, mint := range mints { - if common.IsCoreMint(mint) { - continue - } - - mintAddr := mint.PublicKey().ToBase58() - - m.mintsMu.RLock() - _, alreadyTracked := m.trackedMints[mintAddr] - m.mintsMu.RUnlock() - - if alreadyTracked { - continue - } - - isSupported, err := common.IsSupportedMint(ctx, m.data, mint) - if err != nil { - return errors.Wrapf(err, "failed to validate mint %s", mintAddr) - } - if !isSupported { - return errMintNotSupported - } - - m.mintsMu.Lock() - m.trackedMints[mintAddr] = mint - m.mintsMu.Unlock() - - m.log.With(zap.String("mint", mintAddr)).Debug("tracking new mint from client request") - - go m.fetchAndUpdateReserveState(ctx, mint) +// triggerReservePoll sends a non-blocking signal to the reserve poll loop +// to run immediately. +func (m *liveMintStateWorker) triggerReservePoll() { + select { + case m.reservePollTrigger <- struct{}{}: + default: + // Already triggered, no need to queue another } - return nil } // registerStream creates and registers a new stream for the given mints. @@ -197,30 +147,28 @@ func (m *liveMintStateWorker) waitForExchangeRates(ctx context.Context) error { } // waitForReserveState blocks until reserve state data for a specific mint is -// available or context is cancelled. Returns ErrMintNotTracked immediately if -// the mint is not in the tracked set. -func (m *liveMintStateWorker) waitForReserveState(ctx context.Context, mint *common.Account) error { - if !m.isTrackedMint(mint) { - return errMintNotTracked +// available, the context is cancelled, or the timeout is exceeded. Triggers +// an immediate poll if the mint isn't cached yet. +func (m *liveMintStateWorker) waitForReserveState(ctx context.Context, mint *common.Account, timeout time.Duration) error { + ch := m.getOrCreateReserveReadyChan(mint) + + select { + case <-ch: + return nil + default: + m.triggerReservePoll() } - ch := m.getOrCreateReserveReadyChan(mint) select { case <-ch: return nil + case <-time.After(timeout): + return errors.New("timed out waiting for reserve state") case <-ctx.Done(): return ctx.Err() } } -func (m *liveMintStateWorker) isTrackedMint(mint *common.Account) bool { - m.mintsMu.RLock() - defer m.mintsMu.RUnlock() - - _, ok := m.trackedMints[mint.PublicKey().ToBase58()] - return ok -} - func (m *liveMintStateWorker) getOrCreateReserveReadyChan(mint *common.Account) chan struct{} { m.reserveReadyMu.Lock() defer m.reserveReadyMu.Unlock() @@ -246,12 +194,11 @@ func (m *liveMintStateWorker) getReserveState(mint *common.Account) (*liveReserv m.stateMu.RLock() defer m.stateMu.RUnlock() - for _, data := range m.launchpadReserves { - if bytes.Equal(mint.PublicKey().ToBytes(), data.Mint.PublicKey().ToBytes()) { - return data, nil - } + data, ok := m.launchpadReserves[mint.PublicKey().ToBase58()] + if !ok { + return nil, errors.New("not found") } - return nil, errors.New("not found") + return data, nil } func (m *liveMintStateWorker) markExchangeRatesReady() { @@ -316,7 +263,6 @@ func (m *liveMintStateWorker) fetchAndUpdateExchangeRates(ctx context.Context, l m.stateMu.Lock() m.exchangeRates = &liveExchangeRateData{ Rates: rates.Rates, - Timestamp: time.Now(), SignedResponse: signedResponse, } m.stateMu.Unlock() @@ -340,69 +286,55 @@ func (m *liveMintStateWorker) pollReserveState(ctx context.Context) { return case <-ticker.C: m.fetchAndUpdateReserveStates(ctx) + case <-m.reservePollTrigger: + m.fetchAndUpdateReserveStates(ctx) } } } -// fetchAndUpdateReserveState fetches and updates the reserve state for a single mint. -// Returns the updated state data, or nil if the fetch failed. -func (m *liveMintStateWorker) fetchAndUpdateReserveState(ctx context.Context, mint *common.Account) *liveReserveStateData { - mintAddr := mint.PublicKey().ToBase58() - - supply, _, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, m.data, mint) - if err != nil { - m.log.With( - zap.Error(err), - zap.String("mint", mintAddr), - ).Warn("failed to fetch launchpad currency circulating supply") - return nil +func (m *liveMintStateWorker) fetchAndUpdateReserveStates(ctx context.Context) { + liveReserves, err := m.data.GetAllLiveCurrencyReserves(ctx) + if err == currency.ErrNotFound { + return } - - signedState, err := m.signReserveState(mint, supply, ts) if err != nil { - m.log.With( - zap.Error(err), - zap.String("mint", mintAddr), - ).Warn("failed to sign reserve state") - return nil - } - - m.markReserveStateReady(mint) - - stateData := &liveReserveStateData{ - Mint: mint, - SupplyFromBonding: supply, - Timestamp: ts, - SignedState: signedState, + m.log.With(zap.Error(err)).Warn("failed to fetch all live currency reserves") + return } - m.stateMu.Lock() - m.launchpadReserves[mintAddr] = stateData - m.stateMu.Unlock() - - return stateData -} + var updatedStates []*liveReserveStateData + for mintAddr, record := range liveReserves { + mint, err := common.NewAccountFromPublicKeyString(mintAddr) + if err != nil { + m.log.With( + zap.Error(err), + zap.String("mint", mintAddr), + ).Warn("failed to parse mint address") + continue + } -func (m *liveMintStateWorker) fetchAndUpdateReserveStates(ctx context.Context) { - trackedMints := m.getTrackedMints() + signedState, err := m.signReserveState(mint, record.SupplyFromBonding, time.Now()) + if err != nil { + m.log.With( + zap.Error(err), + zap.String("mint", mintAddr), + ).Warn("failed to sign reserve state") + continue + } - var mu sync.Mutex - var updatedStates []*liveReserveStateData + stateData := &liveReserveStateData{ + Mint: mint, + SupplyFromBonding: record.SupplyFromBonding, + SignedState: signedState, + } - var wg sync.WaitGroup - wg.Add(len(trackedMints)) - for _, mint := range trackedMints { - go func(mint *common.Account) { - defer wg.Done() + m.stateMu.Lock() + m.launchpadReserves[mintAddr] = stateData + m.stateMu.Unlock() - if stateData := m.fetchAndUpdateReserveState(ctx, mint); stateData != nil { - mu.Lock() - updatedStates = append(updatedStates, stateData) - mu.Unlock() - } - }(mint) + m.markReserveStateReady(mint) + updatedStates = append(updatedStates, stateData) } - wg.Wait() if len(updatedStates) > 0 { m.notifyReserveStates(updatedStates) @@ -427,7 +359,7 @@ func (m *liveMintStateWorker) notifyExchangeRates() { for _, stream := range streams { if stream.wantsExchangeRates() { - if err := stream.notifyExchangeRates(data, streamNotifyTimeout); err != nil { + if err := stream.notifyExchangeRates(data); err != nil { m.log.With( zap.Error(err), zap.String("stream_id", stream.id), @@ -446,7 +378,7 @@ func (m *liveMintStateWorker) notifyReserveStates(states []*liveReserveStateData m.streamsMu.RUnlock() for _, stream := range streams { - if err := stream.notifyReserveStates(states, streamNotifyTimeout); err != nil { + if err := stream.notifyReserveStates(states); err != nil { m.log.With( zap.Error(err), zap.String("stream_id", stream.id),