Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions chains/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) closeInternal()
return nil
}

func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) SetEnabledAddresses(addrs []ADDR) {
eb.enabledAddresses = addrs
}

func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) SetResumeCallback(callback ResumeCallback) {
eb.resumeCallback = callback
}
Expand All @@ -240,6 +244,10 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) HealthReport() m
return map[string]error{eb.Name(): eb.Healthy()}
}

func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) Ready() error {
return eb.StateMachine.Ready()
}

// Trigger forces the monitor for a particular address to recheck for new txes
// Logs error and does nothing if address was not registered on startup
func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) Trigger(addr ADDR) {
Expand Down
12 changes: 12 additions & 0 deletions chains/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) closeInternal()
return nil
}

func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) Deliver(head HEAD) {
ec.mb.Deliver(head)
}

func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) SetEnabledAddresses(addrs []ADDR) {
ec.enabledAddresses = addrs
}

func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) SetResumeCallback(callback ResumeCallback) {
ec.resumeCallback = callback
}
Expand All @@ -181,6 +189,10 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) HealthReport()
return map[string]error{ec.Name(): ec.Healthy()}
}

func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) Ready() error {
return ec.StateMachine.Ready()
}

func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
defer ec.wg.Done()
ctx, cancel := ec.stopCh.NewCtx()
Expand Down
20 changes: 16 additions & 4 deletions chains/txmgr/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) Close() error {
})
}

func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) Name() string {
return tr.lggr.Name()
}

func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) HealthReport() map[string]error {
return map[string]error{tr.Name(): tr.Healthy()}
}

func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) closeInternal() error {
tr.initSync.Lock()
defer tr.initSync.Unlock()
Expand Down Expand Up @@ -163,6 +171,10 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop(ctx context.Con
}
}

func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) Deliver(num int64) {
tr.mb.Deliver(num)
}

func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) GetAbandonedAddresses() []ADDR {
tr.lock.Lock()
defer tr.lock.Unlock()
Expand All @@ -187,7 +199,7 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) IsStarted() bool {
return tr.isStarted
}

func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) getEnabledAddresses() []ADDR {
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) GetEnabledAddresses() []ADDR {
tr.lock.RLock()
defer tr.lock.RUnlock()
return slices.Collect(maps.Keys(tr.enabledAddrs))
Expand All @@ -206,12 +218,12 @@ func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) ensureEnabledAddresses(
if err != nil {
return fmt.Errorf("failed to get enabled addresses for chain: %w", err)
}
tr.setEnabledAddresses(enabledAddrs)
tr.SetEnabledAddresses(enabledAddrs)
return nil
}

// setEnabledAddresses sets enabled addresses. Caller must hold tr.lock, or the Tracker must be unstarted (pre-startInternal, or post-closeInternal).
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) setEnabledAddresses(enabledAddrs []ADDR) {
// SetEnabledAddresses sets enabled addresses. Caller must hold tr.lock, or the Tracker must be unstarted (pre-startInternal, or post-closeInternal).
func (tr *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]) SetEnabledAddresses(enabledAddrs []ADDR) {
if len(enabledAddrs) == 0 {
tr.lggr.Warnf("enabled address list is empty")
}
Expand Down
180 changes: 110 additions & 70 deletions chains/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,32 @@ type reset struct {
done chan error
}

type BroadcasterI[ADDR chains.Hashable] interface {
services.Service
SetEnabledAddresses([]ADDR)
SetResumeCallback(ResumeCallback)
Trigger(ADDR)
}

type ConfirmerI[HEAD chains.Head[BHASH], ADDR chains.Hashable, BHASH chains.Hashable] interface {
services.Service
Deliver(HEAD)
SetEnabledAddresses([]ADDR)
SetResumeCallback(ResumeCallback)
}

type TrackerI[ADDR chains.Hashable] interface {
services.Service
Deliver(int64)
GetEnabledAddresses() []ADDR
SetEnabledAddresses([]ADDR)
}

type resetableService interface {
startInternal(ctx context.Context) error
closeInternal() error
}

type Txm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH chains.Hashable, BHASH chains.Hashable, R txmgrtypes.ChainReceipt[THASH, BHASH], SEQ chains.Sequence, FEE fees.Fee] struct {
services.StateMachine
logger logger.SugaredLogger
Expand All @@ -105,9 +131,9 @@ type Txm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH cha

reaper *Reaper[CID]
resender *Resender[CID, ADDR, THASH, BHASH, R, SEQ, FEE]
broadcaster *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
confirmer *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]
tracker *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]
broadcaster BroadcasterI[ADDR]
confirmer ConfirmerI[HEAD, ADDR, BHASH]
tracker TrackerI[ADDR]
finalizer txmgrtypes.Finalizer[BHASH, HEAD]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
Expand Down Expand Up @@ -136,10 +162,10 @@ func NewTxm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH
fwdMgr txmgrtypes.ForwarderManager[ADDR],
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE],
txStore txmgrtypes.TxStore[ADDR, CID, THASH, BHASH, R, SEQ, FEE],
broadcaster *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE],
confirmer *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE],
broadcaster BroadcasterI[ADDR],
confirmer ConfirmerI[HEAD, ADDR, BHASH],
resender *Resender[CID, ADDR, THASH, BHASH, R, SEQ, FEE],
tracker *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE],
tracker TrackerI[ADDR],
finalizer txmgrtypes.Finalizer[BHASH, HEAD],
newErrorClassifierFunc NewErrorClassifier,
txmv2wrapper TxmV2Wrapper[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE],
Expand Down Expand Up @@ -201,7 +227,7 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) Start(ctx context.Cont
if err := ms.Start(ctx, b.tracker); err != nil {
return fmt.Errorf("Txm: Tracker failed to start: %w", err)
}
b.enabledAddrs = b.tracker.getEnabledAddresses()
b.enabledAddrs = b.tracker.GetEnabledAddresses()
slices.SortFunc(b.enabledAddrs, func(a, b ADDR) int { return strings.Compare(a.String(), b.String()) })

if err := ms.Start(ctx, b.finalizer); err != nil {
Expand Down Expand Up @@ -325,9 +351,9 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) HealthReport() map[str
// setEnabled sets enabled addresses in the broadcaster, tracker, and confirmer.
// Must only be called before starting, or after closing (during a reset).
func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) setEnabled(addrs []ADDR) {
b.broadcaster.enabledAddresses = addrs
b.tracker.setEnabledAddresses(addrs)
b.confirmer.enabledAddresses = addrs
b.broadcaster.SetEnabledAddresses(addrs)
b.tracker.SetEnabledAddresses(addrs)
b.confirmer.SetEnabledAddresses(addrs)
}

func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
Expand All @@ -349,14 +375,20 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
// These should always close successfully, since it should be logically
// impossible to enter this code path with ec/eb in a state other than
// "Started"
if err := b.broadcaster.closeInternal(); err != nil {
b.logger.Panicw(fmt.Sprintf("Failed to Close Broadcaster: %v", err), "err", err)
if r, ok := b.broadcaster.(resetableService); ok {
if err := r.closeInternal(); err != nil {
b.logger.Panicw(fmt.Sprintf("Failed to Close Broadcaster: %v", err), "err", err)
}
}
if err := b.tracker.closeInternal(); err != nil {
b.logger.Panicw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err)
if r, ok := b.tracker.(resetableService); ok {
if err := r.closeInternal(); err != nil {
b.logger.Panicw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err)
}
}
if err := b.confirmer.closeInternal(); err != nil {
b.logger.Panicw(fmt.Sprintf("Failed to Close Confirmer: %v", err), "err", err)
if r, ok := b.confirmer.(resetableService); ok {
if err := r.closeInternal(); err != nil {
b.logger.Panicw(fmt.Sprintf("Failed to Close Confirmer: %v", err), "err", err)
}
}
if f != nil {
f()
Expand All @@ -372,64 +404,72 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
// execReset will not return until either:
// 1. Broadcaster, Confirmer, and Tracker all started successfully
// 2. chStop was closed (txmgr exit)
wg.Add(3)
go func() {
defer wg.Done()
// Retry indefinitely on failure
backoff := newRedialBackoff()
for {
select {
case <-time.After(backoff.Duration()):
if err := b.broadcaster.startInternal(ctx); err != nil {
b.logger.Criticalw("Failed to start Broadcaster", "err", err)
b.SvcErrBuffer.Append(err)
continue
if r, ok := b.broadcaster.(resetableService); ok {
wg.Add(1)
go func() {
defer wg.Done()
// Retry indefinitely on failure
backoff := newRedialBackoff()
for {
select {
case <-time.After(backoff.Duration()):
if err := r.startInternal(ctx); err != nil {
b.logger.Criticalw("Failed to start Broadcaster", "err", err)
b.SvcErrBuffer.Append(err)
continue
}
return
case <-b.chStop:
stopOnce.Do(func() { stopped = true })
return
}
return
case <-b.chStop:
stopOnce.Do(func() { stopped = true })
return
}
}
}()
go func() {
defer wg.Done()
// Retry indefinitely on failure
backoff := newRedialBackoff()
for {
select {
case <-time.After(backoff.Duration()):
if err := b.tracker.startInternal(ctx); err != nil {
b.logger.Criticalw("Failed to start Tracker", "err", err)
b.SvcErrBuffer.Append(err)
continue
}()
}
if r, ok := b.tracker.(resetableService); ok {
wg.Add(1)
go func() {
defer wg.Done()
// Retry indefinitely on failure
backoff := newRedialBackoff()
for {
select {
case <-time.After(backoff.Duration()):
if err := r.startInternal(ctx); err != nil {
b.logger.Criticalw("Failed to start Tracker", "err", err)
b.SvcErrBuffer.Append(err)
continue
}
return
case <-b.chStop:
stopOnce.Do(func() { stopped = true })
return
}
return
case <-b.chStop:
stopOnce.Do(func() { stopped = true })
return
}
}
}()
go func() {
defer wg.Done()
// Retry indefinitely on failure
backoff := newRedialBackoff()
for {
select {
case <-time.After(backoff.Duration()):
if err := b.confirmer.startInternal(ctx); err != nil {
b.logger.Criticalw("Failed to start Confirmer", "err", err)
b.SvcErrBuffer.Append(err)
continue
}()
}
if r, ok := b.confirmer.(resetableService); ok {
wg.Add(1)
go func() {
defer wg.Done()
// Retry indefinitely on failure
backoff := newRedialBackoff()
for {
select {
case <-time.After(backoff.Duration()):
if err := r.startInternal(ctx); err != nil {
b.logger.Criticalw("Failed to start Confirmer", "err", err)
b.SvcErrBuffer.Append(err)
continue
}
return
case <-b.chStop:
stopOnce.Do(func() { stopped = true })
return
}
return
case <-b.chStop:
stopOnce.Do(func() { stopped = true })
return
}
}
}()
}()
}

wg.Wait()
}
Expand All @@ -442,8 +482,8 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) runLoop() {
case address := <-b.trigger:
b.broadcaster.Trigger(address)
case head := <-b.chHeads:
b.confirmer.mb.Deliver(head)
b.tracker.mb.Deliver(head.BlockNumber())
b.confirmer.Deliver(head)
b.tracker.Deliver(head.BlockNumber())
b.finalizer.DeliverLatestHead(head)
case reset := <-b.reset:
// This check prevents the weird edge-case where you can select
Expand Down
Loading