From f725ce04a1e0a4e12c35a9480874b3c27be975c2 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Mon, 23 Mar 2026 18:37:50 -0500 Subject: [PATCH] chains/txmgr: support interfaces for broadcaster, confirmer, tracker --- chains/txmgr/broadcaster.go | 8 ++ chains/txmgr/confirmer.go | 12 +++ chains/txmgr/tracker.go | 20 +++- chains/txmgr/txmgr.go | 180 ++++++++++++++++++++++-------------- 4 files changed, 146 insertions(+), 74 deletions(-) diff --git a/chains/txmgr/broadcaster.go b/chains/txmgr/broadcaster.go index 0d06612..705379f 100644 --- a/chains/txmgr/broadcaster.go +++ b/chains/txmgr/broadcaster.go @@ -228,6 +228,10 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) closeInternal() return nil } +func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) SetEnabledAddresses(addrs []ADDR) { + eb.enabledAddresses = addrs +} + func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) SetResumeCallback(callback ResumeCallback) { eb.resumeCallback = callback } @@ -240,6 +244,10 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) HealthReport() m return map[string]error{eb.Name(): eb.Healthy()} } +func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) Ready() error { + return eb.StateMachine.Ready() +} + // Trigger forces the monitor for a particular address to recheck for new txes // Logs error and does nothing if address was not registered on startup func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) Trigger(addr ADDR) { diff --git a/chains/txmgr/confirmer.go b/chains/txmgr/confirmer.go index 39b6edd..e8d7448 100644 --- a/chains/txmgr/confirmer.go +++ b/chains/txmgr/confirmer.go @@ -169,6 +169,14 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) closeInternal() return nil } +func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) Deliver(head HEAD) { + ec.mb.Deliver(head) +} + +func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) SetEnabledAddresses(addrs []ADDR) { + ec.enabledAddresses = addrs +} + func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) SetResumeCallback(callback ResumeCallback) { ec.resumeCallback = callback } @@ -181,6 +189,10 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) HealthReport() return map[string]error{ec.Name(): ec.Healthy()} } +func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) Ready() error { + return ec.StateMachine.Ready() +} + func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() { defer ec.wg.Done() ctx, cancel := ec.stopCh.NewCtx() diff --git a/chains/txmgr/tracker.go b/chains/txmgr/tracker.go index 7e963e5..7e24129 100644 --- a/chains/txmgr/tracker.go +++ b/chains/txmgr/tracker.go @@ -100,6 +100,14 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) Close() error { }) } +func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) Name() string { + return tr.lggr.Name() +} + +func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) HealthReport() map[string]error { + return map[string]error{tr.Name(): tr.Healthy()} +} + func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) closeInternal() error { tr.initSync.Lock() defer tr.initSync.Unlock() @@ -163,6 +171,10 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop(ctx context.Con } } +func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) Deliver(num int64) { + tr.mb.Deliver(num) +} + func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) GetAbandonedAddresses() []ADDR { tr.lock.Lock() defer tr.lock.Unlock() @@ -187,7 +199,7 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) IsStarted() bool { return tr.isStarted } -func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) getEnabledAddresses() []ADDR { +func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) GetEnabledAddresses() []ADDR { tr.lock.RLock() defer tr.lock.RUnlock() return slices.Collect(maps.Keys(tr.enabledAddrs)) @@ -206,12 +218,12 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) ensureEnabledAddresses( if err != nil { return fmt.Errorf("failed to get enabled addresses for chain: %w", err) } - tr.setEnabledAddresses(enabledAddrs) + tr.SetEnabledAddresses(enabledAddrs) return nil } -// setEnabledAddresses sets enabled addresses. Caller must hold tr.lock, or the Tracker must be unstarted (pre-startInternal, or post-closeInternal). -func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) setEnabledAddresses(enabledAddrs []ADDR) { +// SetEnabledAddresses sets enabled addresses. Caller must hold tr.lock, or the Tracker must be unstarted (pre-startInternal, or post-closeInternal). +func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) SetEnabledAddresses(enabledAddrs []ADDR) { if len(enabledAddrs) == 0 { tr.lggr.Warnf("enabled address list is empty") } diff --git a/chains/txmgr/txmgr.go b/chains/txmgr/txmgr.go index 1c3301e..906afed 100644 --- a/chains/txmgr/txmgr.go +++ b/chains/txmgr/txmgr.go @@ -83,6 +83,32 @@ type reset struct { done chan error } +type BroadcasterI[ADDR chains.Hashable] interface { + services.Service + SetEnabledAddresses([]ADDR) + SetResumeCallback(ResumeCallback) + Trigger(ADDR) +} + +type ConfirmerI[HEAD chains.Head[BHASH], ADDR chains.Hashable, BHASH chains.Hashable] interface { + services.Service + Deliver(HEAD) + SetEnabledAddresses([]ADDR) + SetResumeCallback(ResumeCallback) +} + +type TrackerI[ADDR chains.Hashable] interface { + services.Service + Deliver(int64) + GetEnabledAddresses() []ADDR + SetEnabledAddresses([]ADDR) +} + +type resetableService interface { + startInternal(ctx context.Context) error + closeInternal() error +} + type Txm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH chains.Hashable, BHASH chains.Hashable, R txmgrtypes.ChainReceipt[THASH, BHASH], SEQ chains.Sequence, FEE fees.Fee] struct { services.StateMachine logger logger.SugaredLogger @@ -105,9 +131,9 @@ type Txm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH cha reaper *Reaper[CID] resender *Resender[CID, ADDR, THASH, BHASH, R, SEQ, FEE] - broadcaster *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE] - confirmer *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE] - tracker *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE] + broadcaster BroadcasterI[ADDR] + confirmer ConfirmerI[HEAD, ADDR, BHASH] + tracker TrackerI[ADDR] finalizer txmgrtypes.Finalizer[BHASH, HEAD] fwdMgr txmgrtypes.ForwarderManager[ADDR] txAttemptBuilder txmgrtypes.TxAttemptBuilder[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE] @@ -136,10 +162,10 @@ func NewTxm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH fwdMgr txmgrtypes.ForwarderManager[ADDR], txAttemptBuilder txmgrtypes.TxAttemptBuilder[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE], txStore txmgrtypes.TxStore[ADDR, CID, THASH, BHASH, R, SEQ, FEE], - broadcaster *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE], - confirmer *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE], + broadcaster BroadcasterI[ADDR], + confirmer ConfirmerI[HEAD, ADDR, BHASH], resender *Resender[CID, ADDR, THASH, BHASH, R, SEQ, FEE], - tracker *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE], + tracker TrackerI[ADDR], finalizer txmgrtypes.Finalizer[BHASH, HEAD], newErrorClassifierFunc NewErrorClassifier, txmv2wrapper TxmV2Wrapper[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE], @@ -201,7 +227,7 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) Start(ctx context.Cont if err := ms.Start(ctx, b.tracker); err != nil { return fmt.Errorf("Txm: Tracker failed to start: %w", err) } - b.enabledAddrs = b.tracker.getEnabledAddresses() + b.enabledAddrs = b.tracker.GetEnabledAddresses() slices.SortFunc(b.enabledAddrs, func(a, b ADDR) int { return strings.Compare(a.String(), b.String()) }) if err := ms.Start(ctx, b.finalizer); err != nil { @@ -325,9 +351,9 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) HealthReport() map[str // setEnabled sets enabled addresses in the broadcaster, tracker, and confirmer. // Must only be called before starting, or after closing (during a reset). func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) setEnabled(addrs []ADDR) { - b.broadcaster.enabledAddresses = addrs - b.tracker.setEnabledAddresses(addrs) - b.confirmer.enabledAddresses = addrs + b.broadcaster.SetEnabledAddresses(addrs) + b.tracker.SetEnabledAddresses(addrs) + b.confirmer.SetEnabledAddresses(addrs) } func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() { @@ -349,14 +375,20 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() { // These should always close successfully, since it should be logically // impossible to enter this code path with ec/eb in a state other than // "Started" - if err := b.broadcaster.closeInternal(); err != nil { - b.logger.Panicw(fmt.Sprintf("Failed to Close Broadcaster: %v", err), "err", err) + if r, ok := b.broadcaster.(resetableService); ok { + if err := r.closeInternal(); err != nil { + b.logger.Panicw(fmt.Sprintf("Failed to Close Broadcaster: %v", err), "err", err) + } } - if err := b.tracker.closeInternal(); err != nil { - b.logger.Panicw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err) + if r, ok := b.tracker.(resetableService); ok { + if err := r.closeInternal(); err != nil { + b.logger.Panicw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err) + } } - if err := b.confirmer.closeInternal(); err != nil { - b.logger.Panicw(fmt.Sprintf("Failed to Close Confirmer: %v", err), "err", err) + if r, ok := b.confirmer.(resetableService); ok { + if err := r.closeInternal(); err != nil { + b.logger.Panicw(fmt.Sprintf("Failed to Close Confirmer: %v", err), "err", err) + } } if f != nil { f() @@ -372,64 +404,72 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() { // execReset will not return until either: // 1. Broadcaster, Confirmer, and Tracker all started successfully // 2. chStop was closed (txmgr exit) - wg.Add(3) - go func() { - defer wg.Done() - // Retry indefinitely on failure - backoff := newRedialBackoff() - for { - select { - case <-time.After(backoff.Duration()): - if err := b.broadcaster.startInternal(ctx); err != nil { - b.logger.Criticalw("Failed to start Broadcaster", "err", err) - b.SvcErrBuffer.Append(err) - continue + if r, ok := b.broadcaster.(resetableService); ok { + wg.Add(1) + go func() { + defer wg.Done() + // Retry indefinitely on failure + backoff := newRedialBackoff() + for { + select { + case <-time.After(backoff.Duration()): + if err := r.startInternal(ctx); err != nil { + b.logger.Criticalw("Failed to start Broadcaster", "err", err) + b.SvcErrBuffer.Append(err) + continue + } + return + case <-b.chStop: + stopOnce.Do(func() { stopped = true }) + return } - return - case <-b.chStop: - stopOnce.Do(func() { stopped = true }) - return } - } - }() - go func() { - defer wg.Done() - // Retry indefinitely on failure - backoff := newRedialBackoff() - for { - select { - case <-time.After(backoff.Duration()): - if err := b.tracker.startInternal(ctx); err != nil { - b.logger.Criticalw("Failed to start Tracker", "err", err) - b.SvcErrBuffer.Append(err) - continue + }() + } + if r, ok := b.tracker.(resetableService); ok { + wg.Add(1) + go func() { + defer wg.Done() + // Retry indefinitely on failure + backoff := newRedialBackoff() + for { + select { + case <-time.After(backoff.Duration()): + if err := r.startInternal(ctx); err != nil { + b.logger.Criticalw("Failed to start Tracker", "err", err) + b.SvcErrBuffer.Append(err) + continue + } + return + case <-b.chStop: + stopOnce.Do(func() { stopped = true }) + return } - return - case <-b.chStop: - stopOnce.Do(func() { stopped = true }) - return } - } - }() - go func() { - defer wg.Done() - // Retry indefinitely on failure - backoff := newRedialBackoff() - for { - select { - case <-time.After(backoff.Duration()): - if err := b.confirmer.startInternal(ctx); err != nil { - b.logger.Criticalw("Failed to start Confirmer", "err", err) - b.SvcErrBuffer.Append(err) - continue + }() + } + if r, ok := b.confirmer.(resetableService); ok { + wg.Add(1) + go func() { + defer wg.Done() + // Retry indefinitely on failure + backoff := newRedialBackoff() + for { + select { + case <-time.After(backoff.Duration()): + if err := r.startInternal(ctx); err != nil { + b.logger.Criticalw("Failed to start Confirmer", "err", err) + b.SvcErrBuffer.Append(err) + continue + } + return + case <-b.chStop: + stopOnce.Do(func() { stopped = true }) + return } - return - case <-b.chStop: - stopOnce.Do(func() { stopped = true }) - return } - } - }() + }() + } wg.Wait() } @@ -442,8 +482,8 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() { case address := <-b.trigger: b.broadcaster.Trigger(address) case head := <-b.chHeads: - b.confirmer.mb.Deliver(head) - b.tracker.mb.Deliver(head.BlockNumber()) + b.confirmer.Deliver(head) + b.tracker.Deliver(head.BlockNumber()) b.finalizer.DeliverLatestHead(head) case reset := <-b.reset: // This check prevents the weird edge-case where you can select