diff --git a/docs/rpc_observability.md b/docs/rpc_observability.md new file mode 100644 index 0000000..ab981c5 --- /dev/null +++ b/docs/rpc_observability.md @@ -0,0 +1,54 @@ +# RPC Observability (Beholder) + +RPC client metrics are published to Beholder and surface in Prometheus/Grafana when `RPCClientBase` is constructed with a non-nil `metrics.RPCClientMetrics`. + +## Metrics + +| Metric | Type | Description | +|--------|------|-------------| +| `rpc_request_latency_ms` | Histogram | RPC call latency in milliseconds (per call) | +| `rpc_request_errors_total` | Counter | Total number of failed RPC requests | + +Labels: `env`, `network`, `chain_id`, `rpc_provider`, `call` (e.g. `latest_block`, `latest_finalized_block`). + +## Example Prometheus / Grafana Queries + +### Latency over time + +- **p99 latency by env and chain:** + ```promql + histogram_quantile(0.99, sum(rate(rpc_request_latency_ms_bucket[5m])) by (le, env, network, chain_id)) + ``` +- **p50 latency for a given environment:** + ```promql + histogram_quantile(0.5, sum(rate(rpc_request_latency_ms_bucket{env="staging"}[5m])) by (le, network, chain_id)) + ``` + +### Error rate over time + +- **Errors per second by env and chain:** + ```promql + sum(rate(rpc_request_errors_total[5m])) by (env, network, chain_id, rpc_provider) + ``` +- **Error rate for a specific RPC provider:** + ```promql + sum(rate(rpc_request_errors_total{rpc_provider="primary"}[5m])) by (env, network, chain_id) + ``` + +### Request rate + +- **Requests per second by call type:** + ```promql + sum(rate(rpc_request_latency_ms_count[5m])) by (call, env, network) + ``` + +## Enabling metrics + +Create `RPCClientMetrics` with `metrics.NewRPCClientMetrics(metrics.RPCClientMetricsConfig{...})` and pass it as the last argument to `multinode.NewRPCClientBase(...)`. The follow-up interface refactor will make it easier for multinode/chain integrations to supply `env`, `network`, `chain_id`, and `rpc_provider`. + +## Follow-up: multinode integration (PR 2) + +After the metrics module changes are merged, a second PR will: + +1. Update `multinode/go.mod`: bump `github.com/smartcontractkit/chainlink-framework/metrics` to the new version that includes `RPCClientMetrics`. +2. Add RPC metrics support in multinode: add optional `rpcMetrics metrics.RPCClientMetrics` to `RPCClientBase` and `NewRPCClientBase`, and call `RecordRequest` in `LatestBlock` and `LatestFinalizedBlock` (with latency and error recording). diff --git a/metrics/multinode.go b/metrics/multinode.go index c278913..5bc1ff5 100644 --- a/metrics/multinode.go +++ b/metrics/multinode.go @@ -135,29 +135,29 @@ type GenericMultiNodeMetrics interface { var _ GenericMultiNodeMetrics = &multiNodeMetrics{} type multiNodeMetrics struct { - network string - chainID string - nodeStates metric.Int64Gauge - nodeClientVersion metric.Int64Gauge - nodeVerifies metric.Int64Counter - nodeVerifiesFailed metric.Int64Counter - nodeVerifiesSuccess metric.Int64Counter - nodeTransitionsToAlive metric.Int64Counter - nodeTransitionsToInSync metric.Int64Counter - nodeTransitionsToOutOfSync metric.Int64Counter - nodeTransitionsToUnreachable metric.Int64Counter - nodeTransitionsToInvalidChainID metric.Int64Counter - nodeTransitionsToUnusable metric.Int64Counter - nodeTransitionsToSyncing metric.Int64Counter - highestSeenBlock metric.Int64Gauge - highestFinalizedBlock metric.Int64Gauge - seenBlocks metric.Int64Counter - polls metric.Int64Counter - pollsFailed metric.Int64Counter - pollsSuccess metric.Int64Counter - finalizedStateFailed metric.Int64Counter - nodeTransitionsToFinalizedStateNotAvailable metric.Int64Counter - invariantViolations metric.Int64Counter + network string + chainID string + nodeStates metric.Int64Gauge + nodeClientVersion metric.Int64Gauge + nodeVerifies metric.Int64Counter + nodeVerifiesFailed metric.Int64Counter + nodeVerifiesSuccess metric.Int64Counter + nodeTransitionsToAlive metric.Int64Counter + nodeTransitionsToInSync metric.Int64Counter + nodeTransitionsToOutOfSync metric.Int64Counter + nodeTransitionsToUnreachable metric.Int64Counter + nodeTransitionsToInvalidChainID metric.Int64Counter + nodeTransitionsToUnusable metric.Int64Counter + nodeTransitionsToSyncing metric.Int64Counter + highestSeenBlock metric.Int64Gauge + highestFinalizedBlock metric.Int64Gauge + seenBlocks metric.Int64Counter + polls metric.Int64Counter + pollsFailed metric.Int64Counter + pollsSuccess metric.Int64Counter + finalizedStateFailed metric.Int64Counter + nodeTransitionsToFinalizedStateNotAvailable metric.Int64Counter + invariantViolations metric.Int64Counter } func NewGenericMultiNodeMetrics(network string, chainID string) (GenericMultiNodeMetrics, error) { @@ -289,7 +289,7 @@ func NewGenericMultiNodeMetrics(network string, chainID string) (GenericMultiNod pollsSuccess: pollsSuccess, finalizedStateFailed: finalizedStateFailed, nodeTransitionsToFinalizedStateNotAvailable: nodeTransitionsToFinalizedStateNotAvailable, - invariantViolations: invariantViolations, + invariantViolations: invariantViolations, }, nil } diff --git a/metrics/rpc_client.go b/metrics/rpc_client.go new file mode 100644 index 0000000..b803ad3 --- /dev/null +++ b/metrics/rpc_client.go @@ -0,0 +1,125 @@ +// RPC client observability using Beholder. +// +// This file defines rpc_request_latency_ms and rpc_request_errors_total, emitted +// from the RPC client when RPCClientBase is constructed with a non-nil RPCClientMetrics. +// Metrics are queryable in Prometheus/Grafana by env, network, chain_id, and rpc_provider. +// +// Example Prometheus/Grafana queries: +// +// - Latency over time (e.g. p99 by env and chain): +// histogram_quantile(0.99, sum(rate(rpc_request_latency_ms_bucket[5m])) by (le, env, network, chain_id)) +// +// - Error rate over time (errors per second by env and chain): +// sum(rate(rpc_request_errors_total[5m])) by (env, network, chain_id, rpc_provider) +// +// - Request rate by call type: +// sum(rate(rpc_request_latency_ms_count[5m])) by (call, env, network) +package metrics + +import ( + "context" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +const ( + // RPCRequestLatencyMs is the Beholder/Prometheus metric name for RPC call latency in milliseconds. + RPCRequestLatencyMs = "rpc_request_latency_ms" + // RPCRequestErrorsTotal is the Beholder/Prometheus metric name for total RPC call errors. + RPCRequestErrorsTotal = "rpc_request_errors_total" +) + +var ( + rpcRequestLatencyBuckets = []float64{ + 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, + } + promRPCRequestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: RPCRequestLatencyMs, + Help: "RPC request latency in milliseconds (per call)", + Buckets: rpcRequestLatencyBuckets, + }, []string{"env", "network", "chain_id", "rpc_provider", "call"}) + promRPCRequestErrors = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: RPCRequestErrorsTotal, + Help: "Total number of failed RPC requests", + }, []string{"env", "network", "chain_id", "rpc_provider", "call"}) +) + +// RPCClientMetrics records RPC latency and error metrics for observability via Beholder/Prometheus. +// Metrics are queryable by environment, network, chain_id, and rpc_provider in Grafana. +type RPCClientMetrics interface { + // RecordRequest records latency for an RPC call. If err is non-nil, also increments the error counter. + // callName identifies the operation (e.g. "latest_block", "latest_finalized_block"). + RecordRequest(ctx context.Context, callName string, latencyMs float64, err error) +} + +var _ RPCClientMetrics = (*rpcClientMetrics)(nil) + +type rpcClientMetrics struct { + env string + network string + chainID string + rpcProvider string + latency metric.Float64Histogram + errorsTotal metric.Int64Counter +} + +// RPCClientMetricsConfig holds labels for RPC client metrics. +// Empty strings are allowed; they will still be emitted as labels for filtering. +type RPCClientMetricsConfig struct { + Env string // e.g. "staging", "production" + Network string // chain/network name + ChainID string // chain ID + RPCProvider string // RPC provider or node name (optional) +} + +// NewRPCClientMetrics creates RPC client metrics that publish to Beholder and Prometheus. +// Callers (e.g. chain-specific RPC clients or multinode) should pass env, network, chainID, and optionally rpcProvider +// so metrics can be queried in Grafana by environment, chain/network, and RPC provider. +func NewRPCClientMetrics(cfg RPCClientMetricsConfig) (RPCClientMetrics, error) { + latency, err := beholder.GetMeter().Float64Histogram(RPCRequestLatencyMs) + if err != nil { + return nil, fmt.Errorf("failed to register RPC request latency metric: %w", err) + } + errorsTotal, err := beholder.GetMeter().Int64Counter(RPCRequestErrorsTotal) + if err != nil { + return nil, fmt.Errorf("failed to register RPC request errors metric: %w", err) + } + return &rpcClientMetrics{ + env: cfg.Env, + network: cfg.Network, + chainID: cfg.ChainID, + rpcProvider: cfg.RPCProvider, + latency: latency, + errorsTotal: errorsTotal, + }, nil +} + +func (m *rpcClientMetrics) RecordRequest(ctx context.Context, callName string, latencyMs float64, err error) { + attrs := metric.WithAttributes( + attribute.String("env", m.env), + attribute.String("network", m.network), + attribute.String("chain_id", m.chainID), + attribute.String("rpc_provider", m.rpcProvider), + attribute.String("call", callName), + ) + promRPCRequestLatency.WithLabelValues(m.env, m.network, m.chainID, m.rpcProvider, callName).Observe(latencyMs) + m.latency.Record(ctx, latencyMs, attrs) + if err != nil { + promRPCRequestErrors.WithLabelValues(m.env, m.network, m.chainID, m.rpcProvider, callName).Inc() + m.errorsTotal.Add(ctx, 1, attrs) + } +} + +// NoopRPCClientMetrics is a no-op implementation for when metrics are disabled. +type NoopRPCClientMetrics struct{} + +func (NoopRPCClientMetrics) RecordRequest(context.Context, string, float64, error) {} + +// Ensure NoopRPCClientMetrics implements RPCClientMetrics. +var _ RPCClientMetrics = NoopRPCClientMetrics{} diff --git a/metrics/rpc_client_test.go b/metrics/rpc_client_test.go new file mode 100644 index 0000000..df16e59 --- /dev/null +++ b/metrics/rpc_client_test.go @@ -0,0 +1,32 @@ +package metrics + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewRPCClientMetrics(t *testing.T) { + m, err := NewRPCClientMetrics(RPCClientMetricsConfig{ + Env: "staging", + Network: "ethereum", + ChainID: "1", + RPCProvider: "primary", + }) + require.NoError(t, err) + require.NotNil(t, m) + + ctx := context.Background() + m.RecordRequest(ctx, "latest_block", 100.0, nil) + m.RecordRequest(ctx, "latest_block", 50.0, errors.New("rpc error")) +} + +func TestNoopRPCClientMetrics_RecordRequest(t *testing.T) { + var m NoopRPCClientMetrics + ctx := context.Background() + m.RecordRequest(ctx, "latest_block", 100.0, nil) + m.RecordRequest(ctx, "latest_block", 50.0, errors.New("rpc error")) + // Noop should not panic +} diff --git a/multinode/go.mod b/multinode/go.mod index c18efc1..519b4b6 100644 --- a/multinode/go.mod +++ b/multinode/go.mod @@ -77,3 +77,5 @@ require ( google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/smartcontractkit/chainlink-framework/metrics => ../metrics diff --git a/multinode/go.sum b/multinode/go.sum index 459485f..9ec295e 100644 --- a/multinode/go.sum +++ b/multinode/go.sum @@ -80,8 +80,6 @@ github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bf github.com/smartcontractkit/chainlink-common v0.10.1-0.20260305114348-b8bbac30bfc7/go.mod h1:0ghbAr7tRO0tT5ZqBXhOyzgUO37tNNe33Yn0hskauVM= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 h1:ysZjKH+BpWlQhF93kr/Lc668UlCvT9NjfcsGdZT19I8= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2/go.mod h1:jo+cUqNcHwN8IF7SInQNXDZ8qzBsyMpnLdYbDswviFc= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index b4a886c..ce4f827 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -9,6 +9,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-framework/metrics" ) type RPCClientBaseConfig interface { @@ -33,6 +34,9 @@ type RPCClientBase[HEAD Head] struct { latestBlock func(ctx context.Context) (HEAD, error) latestFinalizedBlock func(ctx context.Context) (HEAD, error) + // rpcMetrics is optional; when non-nil, LatestBlock and LatestFinalizedBlock record latency and errors. + rpcMetrics metrics.RPCClientMetrics + // lifeCycleCh can be closed to immediately cancel all in-flight requests on // this RPC. Closing and replacing should be serialized through // lifeCycleMu since it can happen on state transitions as well as RPCClientBase Close. @@ -52,6 +56,7 @@ func NewRPCClientBase[HEAD Head]( cfg RPCClientBaseConfig, ctxTimeout time.Duration, log logger.Logger, latestBlock func(ctx context.Context) (HEAD, error), latestFinalizedBlock func(ctx context.Context) (HEAD, error), + rpcMetrics metrics.RPCClientMetrics, ) *RPCClientBase[HEAD] { return &RPCClientBase[HEAD]{ cfg: cfg, @@ -59,6 +64,7 @@ func NewRPCClientBase[HEAD Head]( ctxTimeout: ctxTimeout, latestBlock: latestBlock, latestFinalizedBlock: latestFinalizedBlock, + rpcMetrics: rpcMetrics, subs: make(map[Subscription]struct{}), lifeCycleCh: make(chan struct{}), } @@ -156,15 +162,25 @@ func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() + start := time.Now() head, err := m.latestBlock(ctx) + latencyMs := float64(time.Since(start).Milliseconds()) if err != nil { + if m.rpcMetrics != nil { + m.rpcMetrics.RecordRequest(ctx, "latest_block", latencyMs, err) + } return head, err } - if !head.IsValid() { - return head, errors.New("invalid head") + err := errors.New("invalid head") + if m.rpcMetrics != nil { + m.rpcMetrics.RecordRequest(ctx, "latest_block", latencyMs, err) + } + return head, err + } + if m.rpcMetrics != nil { + m.rpcMetrics.RecordRequest(ctx, "latest_block", latencyMs, nil) } - m.OnNewHead(ctx, lifeCycleCh, head) return head, nil } @@ -173,15 +189,25 @@ func (m *RPCClientBase[HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, e ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() + start := time.Now() head, err := m.latestFinalizedBlock(ctx) + latencyMs := float64(time.Since(start).Milliseconds()) if err != nil { + if m.rpcMetrics != nil { + m.rpcMetrics.RecordRequest(ctx, "latest_finalized_block", latencyMs, err) + } return head, err } - if !head.IsValid() { - return head, errors.New("invalid head") + err := errors.New("invalid head") + if m.rpcMetrics != nil { + m.rpcMetrics.RecordRequest(ctx, "latest_finalized_block", latencyMs, err) + } + return head, err + } + if m.rpcMetrics != nil { + m.rpcMetrics.RecordRequest(ctx, "latest_finalized_block", latencyMs, nil) } - m.OnNewFinalizedHead(ctx, lifeCycleCh, head) return head, nil } diff --git a/multinode/rpc_client_base_test.go b/multinode/rpc_client_base_test.go index 25afd4d..033791e 100644 --- a/multinode/rpc_client_base_test.go +++ b/multinode/rpc_client_base_test.go @@ -67,7 +67,7 @@ func newTestRPC(t *testing.T) *testRPC { } rpc := &testRPC{} - rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock) + rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock, nil) t.Cleanup(rpc.Close) return rpc }