Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/node/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func InitChain(
pollingInterval time.Duration,
chainEnabled bool,
minimumGasTipCap uint64,
blockCacheTTLPercent uint64,
blockSyncInterval uint64,
) (transaction.Backend, common.Address, int64, transaction.Monitor, transaction.Service, error) {
backend := backendnoop.New(chainID)

Expand All @@ -73,7 +73,7 @@ func InitChain(

logger.Info("connected to blockchain backend", "version", versionString)

backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, pollingInterval, blockCacheTTLPercent)
backend = wrapped.NewBackend(ethclient.NewClient(rpcClient), minimumGasTipCap, pollingInterval, blockSyncInterval)
}

backendChainID, err := backend.ChainID(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type Options struct {
BlockchainRpcEndpoint string
BlockProfile bool
BlockTime time.Duration
BlockCacheTTLPercent uint64
BlockSyncInterval uint64
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not assigned anywhere... We can remove it as option if we are going with hardcoded value of 10:

const defaultBlockSyncInterval = 10

or we to introduce addiional flag?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add additional flag for flexibility

BootnodeMode bool
Bootnodes []string
CacheCapacity uint64
Expand Down Expand Up @@ -410,7 +410,7 @@ func NewBee(
o.BlockTime,
chainEnabled,
o.MinimumGasTipCap,
o.BlockCacheTTLPercent,
o.BlockSyncInterval,
)
if err != nil {
return nil, fmt.Errorf("init chain: %w", err)
Expand Down
34 changes: 22 additions & 12 deletions pkg/transaction/wrapped/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import (
)

type Loader[T any] func() (T, time.Time, error)
type ReuseEvaluator[T any] func(value T, expiresAt, now time.Time) (bool, time.Time)

type ExpiringSingleFlightCache[T any] struct {
mu sync.RWMutex
value T
valid bool
expiresAt time.Time

group singleflight.Group[string, any]
Expand All @@ -41,43 +44,50 @@ func (c *ExpiringSingleFlightCache[T]) Collectors() []prometheus.Collector {
}
}

func (c *ExpiringSingleFlightCache[T]) Get(now time.Time) (T, bool) {
func (c *ExpiringSingleFlightCache[T]) Peek() (T, time.Time, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

if now.Before(c.expiresAt) {
return c.value, true
if c.valid {
return c.value, c.expiresAt, true
}

var zero T
return zero, false
return zero, time.Time{}, false
}

func (c *ExpiringSingleFlightCache[T]) Set(value T, expiresAt time.Time) {
c.mu.Lock()
defer c.mu.Unlock()

c.value = value
c.valid = true
c.expiresAt = expiresAt
}

func (c *ExpiringSingleFlightCache[T]) GetOrLoad(ctx context.Context, now time.Time, loader Loader[T]) (T, error) {
if v, ok := c.Get(now); ok {
c.metrics.Hits.Inc()
return v, nil
func (c *ExpiringSingleFlightCache[T]) PeekOrLoad(ctx context.Context, now time.Time, canReuse ReuseEvaluator[T], loader Loader[T]) (T, error) {
if value, expiresAt, ok := c.Peek(); ok {
reuse, newExpiresAt := canReuse(value, expiresAt, now)
if reuse {
c.metrics.Hits.Inc()
if !newExpiresAt.IsZero() && !newExpiresAt.Equal(expiresAt) {
c.Set(value, newExpiresAt)
}
return value, nil
}
}

c.metrics.Misses.Inc()

result, shared, err := c.group.Do(ctx, c.key, func(ctx context.Context) (any, error) {
c.metrics.Loads.Inc()
val, expiresAt, err := loader()
value, expiresAt, err := loader()
if err != nil {
c.metrics.LoadErrors.Inc()
return val, err
return value, err
}
c.Set(val, expiresAt)
return val, nil
c.Set(value, expiresAt)
return value, nil
})

if shared {
Expand Down
160 changes: 117 additions & 43 deletions pkg/transaction/wrapped/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,51 +27,62 @@ func newTestCache() *ExpiringSingleFlightCache[uint64] {
}
}

func TestSetGet(t *testing.T) {
func TestSetPeek(t *testing.T) {
t.Parallel()

now := time.Now()
expiresAt := now.Add(time.Second)
c := newTestCache()
c.Set(42, expiresAt)

val, ok := c.Get(now)
val, gotExpiresAt, ok := c.Peek()
assert.True(t, ok)
assert.Equal(t, uint64(42), val)
assert.Equal(t, expiresAt, gotExpiresAt)
}

func TestTTLExpiry(t *testing.T) {
func TestPeekReturnsExpiredValue(t *testing.T) {
t.Parallel()

now := time.Now()
c := newTestCache()
c.Set(42, now)

_, ok := c.Get(now.Add(time.Second + time.Millisecond))
assert.False(t, ok)
val, expiresAt, ok := c.Peek()
assert.True(t, ok)
assert.Equal(t, uint64(42), val)
assert.Equal(t, now, expiresAt)
}

func TestGetOrLoadMiss(t *testing.T) {
func TestPeekOrLoadMiss(t *testing.T) {
t.Parallel()

c := newTestCache()
var loadCount atomic.Int32

val, err := c.GetOrLoad(context.Background(), time.Now(), func() (uint64, time.Time, error) {
loadCount.Add(1)
return 99, time.Now().Add(time.Second), nil
})
val, err := c.PeekOrLoad(
context.Background(),
time.Now(),
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return false, time.Time{}
},
func() (uint64, time.Time, error) {
loadCount.Add(1)
return 99, time.Now().Add(time.Second), nil
},
)

require.NoError(t, err)
assert.Equal(t, uint64(99), val)
assert.Equal(t, int32(1), loadCount.Load())

cached, ok := c.Get(time.Now())
cached, expiresAt, ok := c.Peek()
assert.True(t, ok)
assert.Equal(t, uint64(99), cached)
assert.True(t, expiresAt.After(time.Now()))
}

func TestGetOrLoadHit(t *testing.T) {
func TestPeekOrLoadHit(t *testing.T) {
t.Parallel()
const expectedVal = uint64(42)

Expand All @@ -81,34 +92,48 @@ func TestGetOrLoadHit(t *testing.T) {
c.Set(expectedVal, expiresAt)

var loadCount atomic.Int32
val, err := c.GetOrLoad(context.Background(), now, func() (uint64, time.Time, error) {
loadCount.Add(1)
return expectedVal, now.Add(time.Second), nil
})
val, err := c.PeekOrLoad(
context.Background(),
now,
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return now.Before(expiresAt), expiresAt
},
func() (uint64, time.Time, error) {
loadCount.Add(1)
return expectedVal, now.Add(time.Second), nil
},
)

require.NoError(t, err)
assert.Equal(t, expectedVal, val)
assert.Equal(t, int32(0), loadCount.Load())
}

func TestGetOrLoadError(t *testing.T) {
func TestPeekOrLoadError(t *testing.T) {
t.Parallel()

c := newTestCache()
errLoad := errors.New("load failed")

val, err := c.GetOrLoad(context.Background(), time.Now(), func() (uint64, time.Time, error) {
return 0, time.Time{}, errLoad
})
val, err := c.PeekOrLoad(
context.Background(),
time.Now(),
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return false, time.Time{}
},
func() (uint64, time.Time, error) {
return 0, time.Time{}, errLoad
},
)

assert.ErrorIs(t, err, errLoad)
assert.Equal(t, uint64(0), val)

_, ok := c.Get(time.Now())
_, _, ok := c.Peek()
assert.False(t, ok)
}

func TestGetOrLoadSingleflight(t *testing.T) {
func TestPeekOrLoadSingleflight(t *testing.T) {
const value = uint64(77)
synctest.Test(t, func(t *testing.T) {
c := newTestCache()
Expand All @@ -125,11 +150,18 @@ func TestGetOrLoadSingleflight(t *testing.T) {
go func(idx int) {
defer wg.Done()
now := time.Now()
results[idx], errs[idx] = c.GetOrLoad(context.Background(), now, func() (uint64, time.Time, error) {
loadCount.Add(1)
<-gate
return value, now.Add(time.Second), nil
})
results[idx], errs[idx] = c.PeekOrLoad(
context.Background(),
now,
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return now.Before(expiresAt), expiresAt
},
func() (uint64, time.Time, error) {
loadCount.Add(1)
<-gate
return value, now.Add(time.Second), nil
},
)
}(i)
}

Expand All @@ -145,22 +177,29 @@ func TestGetOrLoadSingleflight(t *testing.T) {
})
}

func TestGetOrLoadReloadAfterExpiry(t *testing.T) {
func TestPeekOrLoadReloadAfterExpiry(t *testing.T) {
t.Parallel()

now := time.Now()
c := newTestCache()
c.Set(42, now)

val, err := c.GetOrLoad(context.Background(), now.Add(time.Second+time.Millisecond), func() (uint64, time.Time, error) {
return 100, now.Add(time.Second + time.Millisecond), nil
})
val, err := c.PeekOrLoad(
context.Background(),
now.Add(time.Second+time.Millisecond),
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return now.Before(expiresAt), expiresAt
},
func() (uint64, time.Time, error) {
return 100, now.Add(time.Second + time.Millisecond), nil
},
)

require.NoError(t, err)
assert.Equal(t, uint64(100), val)
}

func TestGetOrLoadContextCancellation(t *testing.T) {
func TestPeekOrLoadContextCancellation(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const expectedVal = 55
c := newTestCache()
Expand All @@ -176,18 +215,32 @@ func TestGetOrLoadContextCancellation(t *testing.T) {
wg.Add(2)
go func() {
defer wg.Done()
result1, err1 = c.GetOrLoad(ctx1, time.Now(), func() (uint64, time.Time, error) {
<-gate
return expectedVal, time.Now().Add(time.Second), nil
})
result1, err1 = c.PeekOrLoad(
ctx1,
time.Now(),
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return now.Before(expiresAt), expiresAt
},
func() (uint64, time.Time, error) {
<-gate
return expectedVal, time.Now().Add(time.Second), nil
},
)
}()

go func() {
defer wg.Done()
result2, err2 = c.GetOrLoad(ctx2, time.Now(), func() (uint64, time.Time, error) {
<-gate
return expectedVal, time.Now().Add(time.Second), nil
})
result2, err2 = c.PeekOrLoad(
ctx2,
time.Now(),
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return now.Before(expiresAt), expiresAt
},
func() (uint64, time.Time, error) {
<-gate
return expectedVal, time.Now().Add(time.Second), nil
},
)
}()

time.Sleep(50 * time.Millisecond)
Expand All @@ -214,12 +267,33 @@ func TestMetrics(t *testing.T) {
ctx := context.Background()

// miss + load error
_, _ = c.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { return 0, time.Time{}, errors.New("fail") })
_, _ = c.PeekOrLoad(
ctx,
now,
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return now.Before(expiresAt), expiresAt
},
func() (uint64, time.Time, error) { return 0, time.Time{}, errors.New("fail") },
)

// miss + load
_, _ = c.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { return 42, expiresAt, nil })
_, _ = c.PeekOrLoad(
ctx,
now,
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return now.Before(expiresAt), expiresAt
},
func() (uint64, time.Time, error) { return 42, expiresAt, nil },
)
// hit
_, _ = c.GetOrLoad(ctx, now, func() (uint64, time.Time, error) { return 0, expiresAt, nil })
_, _ = c.PeekOrLoad(
ctx,
now,
func(value uint64, expiresAt, now time.Time) (bool, time.Time) {
return now.Before(expiresAt), expiresAt
},
func() (uint64, time.Time, error) { return 0, expiresAt, nil },
)

assert.Equal(t, float64(1), testutil.ToFloat64(c.metrics.Hits))
assert.Equal(t, float64(2), testutil.ToFloat64(c.metrics.Misses))
Expand Down
Loading
Loading