diff --git a/CHANGELOG.md b/CHANGELOG.md index fe698c1ca..ec72f06f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,11 +15,15 @@ All notable changes to this project will be documented in this file. - Telemetry - Add optional TLS support to state-ingest server via `--tls-cert-file` and `--tls-key-file` flags; when set, the server listens on both HTTP (`:8080`) and HTTPS (`:8443`) simultaneously - Remove `--additional-child-probes` CLI flag from telemetry-agent; child geoprobe discovery now relies entirely on the onchain Geolocation program + - Add BGP status submitter: on each tick, reads BGP socket state from the device namespace, maps each activated user to their tunnel peer IP, and submits `SetUserBGPStatus` onchain; supports a configurable down grace period and periodic keepalive refresh; enabled via `--bgp-status-enable` with `--bgp-status-interval`, `--bgp-status-refresh-interval`, and `--bgp-status-down-grace-period` flags - Monitor - Add ClickHouse as a telemetry backend for the global monitor alongside existing InfluxDB - E2E tests - Add `TestE2E_GeoprobeIcmpTargets` verifying end-to-end ICMP outbound offset delivery via onchain `outbound-icmp` targets - Refactor geoprobe E2E tests to use testcontainers entrypoints and onchain target discovery + - Add `TestE2E_UserBGPStatus` verifying that the telemetry BGP status submitter correctly reports onchain status transitions as clients connect and establish BGP sessions +- SDK + - Add `BGPStatus` type (Unknown/Up/Down) and `SetUserBGPStatus` executor instruction to the Go serviceability SDK - Smartcontract - Implement `SetUserBGPStatus` processor: validates metrics publisher authorization, updates `bgp_status`, `last_bgp_reported_at`, and `last_bgp_up_at` fields on the user account - Add human-readable error messages for serviceability program errors in the Go SDK, including program log extraction for enhanced debugging diff --git a/controlplane/telemetry/cmd/telemetry/main.go b/controlplane/telemetry/cmd/telemetry/main.go index 403a94a09..3132a866f 100644 --- a/controlplane/telemetry/cmd/telemetry/main.go +++ b/controlplane/telemetry/cmd/telemetry/main.go @@ -17,11 +17,13 @@ import ( "github.com/malbeclabs/doublezero/config" "github.com/malbeclabs/doublezero/controlplane/agent/pkg/arista" aristapb "github.com/malbeclabs/doublezero/controlplane/proto/arista/gen/pb-go/arista/EosSdkRpc" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/bgpstatus" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/geoprobe" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/gnmitunnel" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/metrics" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netns" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" + telemetrysvc "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/serviceability" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/state" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/telemetry" telemetryconfig "github.com/malbeclabs/doublezero/controlplane/telemetry/pkg/config" @@ -49,6 +51,8 @@ const ( defaultLocalDevicePubkey = "" defaultSubmitterMaxConcurrency = 10 defaultStateCollectInterval = 60 * time.Second + defaultBGPStatusInterval = 60 * time.Second + defaultBGPStatusRefreshInterval = 6 * time.Hour waitForNamespaceTimeout = 30 * time.Second defaultStateIngestHTTPClientTimeout = 10 * time.Second @@ -88,6 +92,12 @@ var ( // geoprobe flags geolocationProgramID = flag.String("geolocation-program-id", "", "The ID of the geolocation program for onchain GeoProbe discovery. If env is provided, this flag is ignored.") + // bgp status submitter flags + bgpStatusEnable = flag.Bool("bgp-status-enable", false, "Enable onchain BGP status submission after each collection tick.") + bgpStatusInterval = flag.Duration("bgp-status-interval", defaultBGPStatusInterval, "Interval between BGP status collection ticks.") + bgpStatusRefreshInterval = flag.Duration("bgp-status-refresh-interval", defaultBGPStatusRefreshInterval, "Periodic re-submission interval to keep last_bgp_reported_at fresh even when status is unchanged.") + bgpStatusDownGracePeriod = flag.Duration("bgp-status-down-grace-period", 0, "Minimum duration a user must be absent before reporting Down status (0 = report immediately).") + // Set by LDFLAGS version = "dev" commit = "none" @@ -269,11 +279,15 @@ func main() { os.Exit(1) } localNet := netutil.NewLocalNet(log) + cachedSvcClient := telemetrysvc.NewCachingFetcher( + serviceability.New(rpcClient, serviceabilityProgramID), + telemetrysvc.DefaultCacheTTL, + ) peerDiscovery, err := telemetry.NewLedgerPeerDiscovery( &telemetry.LedgerPeerDiscoveryConfig{ Logger: log, LocalDevicePK: localDevicePK, - ProgramClient: serviceability.New(rpcClient, serviceabilityProgramID), + ProgramClient: cachedSvcClient, LocalNet: localNet, TWAMPPort: uint16(*twampListenPort), RefreshInterval: *peersRefreshInterval, @@ -313,7 +327,7 @@ func main() { TWAMPReflector: reflector, PeerDiscovery: peerDiscovery, TelemetryProgramClient: sdktelemetry.New(log, rpcClient, &keypair, telemetryProgramID), - ServiceabilityProgramClient: serviceability.New(rpcClient, serviceabilityProgramID), + ServiceabilityProgramClient: cachedSvcClient, RPCClient: rpcClient, Keypair: keypair, GetCurrentEpochFunc: func(ctx context.Context) (uint64, error) { @@ -333,7 +347,7 @@ func main() { os.Exit(1) } - errCh := make(chan error, 2) + errCh := make(chan error, 3) // Run the onchain device-link latency collector. go func() { @@ -354,6 +368,13 @@ func main() { gnmiTunnelClientErrCh = startGNMITunnelClient(ctx, cancel, log, localDevicePK) } + // Run BGP status submitter if enabled. + var bgpStatusErrCh <-chan error + if *bgpStatusEnable { + bgpStatusErrCh = startBGPStatusSubmitter(ctx, cancel, log, keypair, localDevicePK, + serviceabilityProgramID, localNet, *bgpNamespace, cachedSvcClient, rpcClient) + } + // Wait for the context to be done or an error to be returned. select { case <-ctx.Done(): @@ -370,7 +391,49 @@ func main() { log.Error("gnmi tunnel client exited with error", "error", err) cancel() os.Exit(1) + case err := <-bgpStatusErrCh: + log.Error("BGP status submitter exited with error", "error", err) + cancel() + os.Exit(1) + } +} + +func startBGPStatusSubmitter( + ctx context.Context, + cancel context.CancelFunc, + log *slog.Logger, + keypair solana.PrivateKey, + localDevicePK solana.PublicKey, + serviceabilityProgramID solana.PublicKey, + localNet netutil.LocalNet, + bgpNamespace string, + svcClient telemetrysvc.ProgramDataProvider, + rpcClient *solanarpc.Client, +) <-chan error { + executor := serviceability.NewExecutor(log, rpcClient, &keypair, serviceabilityProgramID) + + sub, err := bgpstatus.NewSubmitter(bgpstatus.Config{ + Log: log, + Executor: executor, + ServiceabilityClient: svcClient, + LocalNet: localNet, + LocalDevicePK: localDevicePK, + BGPNamespace: bgpNamespace, + Interval: *bgpStatusInterval, + PeriodicRefreshInterval: *bgpStatusRefreshInterval, + DownGracePeriod: *bgpStatusDownGracePeriod, + }) + if err != nil { + log.Error("failed to create BGP status submitter", "error", err) + os.Exit(1) } + + log.Info("Starting BGP status submitter", + "interval", *bgpStatusInterval, + "refreshInterval", *bgpStatusRefreshInterval, + "downGracePeriod", *bgpStatusDownGracePeriod, + ) + return sub.Start(ctx, cancel) } func startStateCollector(ctx context.Context, cancel context.CancelFunc, log *slog.Logger, keypair solana.PrivateKey, localDevicePK solana.PublicKey, bgpNamespace string) <-chan error { diff --git a/controlplane/telemetry/internal/bgpstatus/bgpstatus.go b/controlplane/telemetry/internal/bgpstatus/bgpstatus.go new file mode 100644 index 000000000..36fd01139 --- /dev/null +++ b/controlplane/telemetry/internal/bgpstatus/bgpstatus.go @@ -0,0 +1,213 @@ +package bgpstatus + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "sync" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/jonboulle/clockwork" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" +) + +const ( + taskChannelCapacity = 256 + defaultInterval = 60 * time.Second + defaultRefreshInterval = 6 * time.Hour + submitMaxRetries = 3 + submitBaseBackoff = 100 * time.Millisecond +) + +// BGPStatusExecutor submits a SetUserBGPStatus instruction onchain. +type BGPStatusExecutor interface { + SetUserBGPStatus(ctx context.Context, u serviceability.UserBGPStatusUpdate) (solana.Signature, error) +} + +// ServiceabilityClient fetches the current program state from the ledger. +type ServiceabilityClient interface { + GetProgramData(ctx context.Context) (*serviceability.ProgramData, error) +} + +// Config holds all parameters for the BGP status submitter. +type Config struct { + Log *slog.Logger + Executor BGPStatusExecutor + ServiceabilityClient ServiceabilityClient + LocalNet netutil.LocalNet + LocalDevicePK solana.PublicKey + BGPNamespace string + Interval time.Duration // default: 60s + PeriodicRefreshInterval time.Duration // default: 6h + DownGracePeriod time.Duration // default: 0 + Clock clockwork.Clock +} + +func (c *Config) validate() error { + if c.Log == nil { + return errors.New("log is required") + } + if c.Executor == nil { + return errors.New("executor is required") + } + if c.ServiceabilityClient == nil { + return errors.New("serviceability client is required") + } + if c.LocalNet == nil { + return errors.New("local net is required") + } + if c.LocalDevicePK.IsZero() { + return errors.New("local device pubkey is required") + } + if c.BGPNamespace == "" { + return errors.New("bgp namespace is required") + } + if c.Interval <= 0 { + c.Interval = defaultInterval + } + if c.PeriodicRefreshInterval <= 0 { + c.PeriodicRefreshInterval = defaultRefreshInterval + } + if c.Clock == nil { + c.Clock = clockwork.NewRealClock() + } + return nil +} + +// userState tracks submission state for a single user. +type userState struct { + lastOnchainStatus serviceability.BGPStatus + lastWriteTime time.Time + lastUpObservedAt time.Time +} + +// submitTask is queued to the background worker for onchain submission. +type submitTask struct { + user serviceability.User + status serviceability.BGPStatus +} + +// Submitter collects BGP socket state on each tick, determines per-user BGP +// status, and submits SetUserBGPStatus onchain via a non-blocking worker. +type Submitter struct { + cfg Config + log *slog.Logger + userState map[string]*userState // keyed by user PubKey base58 + pending map[string]bool // users currently in-flight in the worker + mu sync.Mutex + taskCh chan submitTask +} + +// NewSubmitter creates a Submitter after validating the config. +func NewSubmitter(cfg Config) (*Submitter, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("invalid bgpstatus config: %w", err) + } + return &Submitter{ + cfg: cfg, + log: cfg.Log, + userState: make(map[string]*userState), + pending: make(map[string]bool), + taskCh: make(chan submitTask, taskChannelCapacity), + }, nil +} + +// Start launches the submitter in the background and returns a channel that +// receives a fatal error (or is closed on clean shutdown). It mirrors the +// state.Collector.Start pattern. +func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + defer cancel() + if err := s.run(ctx); err != nil { + s.log.Error("bgpstatus: submitter failed", "error", err) + errCh <- err + } + }() + return errCh +} + +// userStateFor returns or creates the per-user tracking entry (caller must hold s.mu). +// initialStatus is used only when creating a new entry; it seeds lastOnchainStatus so +// that a restarted submitter correctly handles users whose onchain state is already Up. +func (s *Submitter) userStateFor(key string, initialStatus serviceability.BGPStatus) *userState { + us, ok := s.userState[key] + if !ok { + us = &userState{lastOnchainStatus: initialStatus} + s.userState[key] = us + } + return us +} + +// bgpSocket is the minimal BGP socket representation used by the pure helpers. +// The Linux-specific submitter.go converts state.BGPSocketState to this type. +type bgpSocket struct { + RemoteIP string + State string +} + +// --- Pure helpers (no Linux syscalls; fully testable on all platforms) --- + +// buildEstablishedIPSet returns a set of remote IP strings for BGP sessions +// that are currently in the ESTABLISHED state. +func buildEstablishedIPSet(sockets []bgpSocket) map[string]struct{} { + m := make(map[string]struct{}, len(sockets)) + for _, sock := range sockets { + if sock.State == "ESTABLISHED" { + m[sock.RemoteIP] = struct{}{} + } + } + return m +} + +// tunnelNetToIPNet parses the onchain [5]byte tunnel-net encoding into a +// *net.IPNet. The format is [4 bytes IPv4 prefix | 1 byte CIDR length]. +func tunnelNetToIPNet(b [5]byte) *net.IPNet { + ip := net.IPv4(b[0], b[1], b[2], b[3]) + mask := net.CIDRMask(int(b[4]), 32) + return &net.IPNet{IP: ip.To4(), Mask: mask} +} + +// computeEffectiveStatus derives the BGP status to report, applying the down +// grace period: if observedUp is false but the user was last seen Up within +// gracePeriod, we still report Up to avoid transient flaps. +func computeEffectiveStatus( + observedUp bool, + us *userState, + now time.Time, + gracePeriod time.Duration, +) serviceability.BGPStatus { + if observedUp { + return serviceability.BGPStatusUp + } + if us.lastUpObservedAt.IsZero() { + return serviceability.BGPStatusDown + } + if gracePeriod > 0 && now.Sub(us.lastUpObservedAt) < gracePeriod { + return serviceability.BGPStatusUp + } + return serviceability.BGPStatusDown +} + +// shouldSubmit returns true when a submission is warranted: either the status +// has changed from what was last confirmed onchain, or it is time for a +// periodic keepalive write. +func shouldSubmit( + us *userState, + newStatus serviceability.BGPStatus, + now time.Time, + refreshInterval time.Duration, +) bool { + if us.lastWriteTime.IsZero() { + return true + } + if us.lastOnchainStatus != newStatus { + return true + } + return now.Sub(us.lastWriteTime) >= refreshInterval +} diff --git a/controlplane/telemetry/internal/bgpstatus/metrics.go b/controlplane/telemetry/internal/bgpstatus/metrics.go new file mode 100644 index 000000000..f59cfffd6 --- /dev/null +++ b/controlplane/telemetry/internal/bgpstatus/metrics.go @@ -0,0 +1,24 @@ +package bgpstatus + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + metricSubmissionsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "doublezero_bgpstatus_submissions_total", + Help: "Total onchain BGP status submissions by BGP status and result", + }, + []string{"bgp_status", "result"}, + ) + + metricSubmissionDuration = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "doublezero_bgpstatus_submission_duration_seconds", + Help: "Duration of successful onchain BGP status submissions", + Buckets: []float64{0.1, 0.5, 1, 2, 5, 10, 30, 60}, + }, + ) +) diff --git a/controlplane/telemetry/internal/bgpstatus/submitter.go b/controlplane/telemetry/internal/bgpstatus/submitter.go new file mode 100644 index 000000000..9c119b6c9 --- /dev/null +++ b/controlplane/telemetry/internal/bgpstatus/submitter.go @@ -0,0 +1,210 @@ +//go:build linux + +package bgpstatus + +import ( + "context" + "errors" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netns" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/state" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" +) + +// run starts the background worker goroutine, then drives the tick loop, +// running an immediate first tick before waiting for the ticker. +func (s *Submitter) run(ctx context.Context) error { + go s.worker(ctx) + + ticker := s.cfg.Clock.NewTicker(s.cfg.Interval) + defer ticker.Stop() + + s.tick(ctx) + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.Chan(): + s.tick(ctx) + } + } +} + +// tick collects BGP socket state, fetches activated users for this device, +// maps each user to their tunnel peer IP, determines Up/Down status (with +// grace period), and enqueues submission tasks for users whose status needs +// updating. +func (s *Submitter) tick(ctx context.Context) { + rawSockets, err := state.GetBGPSocketStatsInNamespace(ctx, s.cfg.BGPNamespace) + if err != nil { + s.log.Error("bgpstatus: failed to collect BGP sockets", "error", err) + return + } + sockets := make([]bgpSocket, len(rawSockets)) + for i, rs := range rawSockets { + sockets[i] = bgpSocket{RemoteIP: rs.RemoteIP, State: rs.State} + } + establishedIPs := buildEstablishedIPSet(sockets) + + programData, err := s.cfg.ServiceabilityClient.GetProgramData(ctx) + if err != nil { + s.log.Error("bgpstatus: failed to fetch program data", "error", err) + return + } + + // Tunnel interfaces for user sessions live in the BGP VRF namespace (e.g. ns-vrf1), + // not the default namespace, so we must read them from there. + interfaces, err := netns.RunInNamespace(s.cfg.BGPNamespace, func() ([]netutil.Interface, error) { + return s.cfg.LocalNet.Interfaces() + }) + if err != nil { + s.log.Error("bgpstatus: failed to get local interfaces", "error", err) + return + } + + now := s.cfg.Clock.Now() + + s.mu.Lock() + defer s.mu.Unlock() + + activeUserKeys := make(map[string]struct{}) + + for _, user := range programData.Users { + if user.Status != serviceability.UserStatusActivated { + continue + } + if solana.PublicKeyFromBytes(user.DevicePubKey[:]) != s.cfg.LocalDevicePK { + continue + } + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + activeUserKeys[userPK] = struct{}{} + + // Seed lastOnchainStatus from the ledger on first observation (e.g. after + // a daemon restart) so a disappeared tunnel correctly transitions to Down + // rather than being skipped because Unknown != Up. + us := s.userStateFor(userPK, serviceability.BGPStatus(user.BgpStatus)) + + // Resolve the BGP peer IP for this user's /31 tunnel net. + tunnelNet := tunnelNetToIPNet(user.TunnelNet) + var observedUp bool + tunnel, err := netutil.FindLocalTunnel(interfaces, tunnelNet) + if err != nil { + if !errors.Is(err, netutil.ErrLocalTunnelNotFound) { + s.log.Warn("bgpstatus: unexpected error finding tunnel", "user", userPK, "error", err) + continue + } + s.log.Debug("bgpstatus: tunnel not found for user", "user", userPK) + // Without a tunnel, the BGP session cannot be established. + // If the last known onchain status was already Down (or never written), + // there is nothing to update — skip this user. + if us.lastOnchainStatus != serviceability.BGPStatusUp { + continue + } + // The tunnel is gone but the last known onchain status is Up. + // Fall through with observedUp=false so we submit Down. + } else { + _, observedUp = establishedIPs[tunnel.TargetIP.String()] + if observedUp { + us.lastUpObservedAt = now + } + } + + effectiveStatus := computeEffectiveStatus(observedUp, us, now, s.cfg.DownGracePeriod) + + if !shouldSubmit(us, effectiveStatus, now, s.cfg.PeriodicRefreshInterval) { + continue + } + + // Skip if a submission for this user is already in-flight. + if s.pending[userPK] { + s.log.Debug("bgpstatus: submission already in-flight, skipping", "user", userPK) + continue + } + + task := submitTask{user: user, status: effectiveStatus} + select { + case s.taskCh <- task: + s.pending[userPK] = true + default: + s.log.Warn("bgpstatus: task channel full, dropping update", "user", userPK) + } + } + + // Prune userState entries for users no longer activated on this device to + // prevent unbounded memory growth as users come and go. + // Also clear pending flags so a reactivated user is not permanently blocked. + for pk := range s.userState { + if _, active := activeUserKeys[pk]; !active { + delete(s.userState, pk) + delete(s.pending, pk) + } + } +} + +// worker drains the task channel and submits each update onchain with retry. +// It updates the per-user tracking state on success and always clears the +// pending flag so the next tick can re-evaluate. +func (s *Submitter) worker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case task := <-s.taskCh: + userPK := solana.PublicKeyFromBytes(task.user.PubKey[:]).String() + + sig, err := s.submitWithRetry(ctx, task) + + statusLabel := task.status.String() + s.mu.Lock() + delete(s.pending, userPK) + if err == nil { + metricSubmissionsTotal.WithLabelValues(statusLabel, "success").Inc() + us := s.userStateFor(userPK, serviceability.BGPStatusUnknown) + us.lastOnchainStatus = task.status + us.lastWriteTime = s.cfg.Clock.Now() + s.log.Info("bgpstatus: submitted BGP status", + "user", userPK, "status", task.status, "sig", sig) + } else { + metricSubmissionsTotal.WithLabelValues(statusLabel, "error").Inc() + s.log.Error("bgpstatus: failed to submit after retries", + "user", userPK, "status", task.status, "error", err) + } + s.mu.Unlock() + } + } +} + +// submitWithRetry attempts the onchain write up to submitMaxRetries times with +// exponential backoff. It returns early if the context is cancelled. +func (s *Submitter) submitWithRetry(ctx context.Context, task submitTask) (solana.Signature, error) { + update := serviceability.UserBGPStatusUpdate{ + UserPubkey: solana.PublicKeyFromBytes(task.user.PubKey[:]), + DevicePubkey: s.cfg.LocalDevicePK, + Status: task.status, + } + + var lastErr error + for attempt := range submitMaxRetries { + start := time.Now() + sig, err := s.cfg.Executor.SetUserBGPStatus(ctx, update) + if err == nil { + metricSubmissionDuration.Observe(time.Since(start).Seconds()) + return sig, nil + } + lastErr = err + delay := submitBaseBackoff * time.Duration(1< 0 { + m.failNext-- + return solana.Signature{}, m.err + } + return solana.Signature{}, nil +} + +func (m *mockExecutor) lastCalls() []serviceability.UserBGPStatusUpdate { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]serviceability.UserBGPStatusUpdate, len(m.calls)) + copy(out, m.calls) + return out +} + +// --- mock serviceability client --- + +type mockSvcClient struct { + data *serviceability.ProgramData +} + +func (m *mockSvcClient) GetProgramData(_ context.Context) (*serviceability.ProgramData, error) { + return m.data, nil +} + +// --- helpers --- + +func makeUser(pubkey solana.PublicKey, devicePK solana.PublicKey, tunnelNet [5]byte) serviceability.User { + u := serviceability.User{} + copy(u.PubKey[:], pubkey[:]) + copy(u.DevicePubKey[:], devicePK[:]) + u.TunnelNet = tunnelNet + u.Status = serviceability.UserStatusActivated + return u +} + +// newTestSubmitter creates a Submitter with the given clock and executor, +// using MockLocalNet backed by the provided interfaces. +func newTestSubmitter( + t *testing.T, + clk clockwork.Clock, + exec BGPStatusExecutor, + svcClient ServiceabilityClient, + ifaces []netutil.Interface, + devicePK solana.PublicKey, + gracePeriod time.Duration, + refreshInterval time.Duration, +) *Submitter { + t.Helper() + s, err := NewSubmitter(Config{ + Log: newTestLogger(t), + Executor: exec, + ServiceabilityClient: svcClient, + LocalNet: &netutil.MockLocalNet{InterfacesFunc: func() ([]netutil.Interface, error) { return ifaces, nil }}, + LocalDevicePK: devicePK, + BGPNamespace: "ns-vrf1", + Interval: time.Hour, // irrelevant; tests call tick() directly + PeriodicRefreshInterval: refreshInterval, + DownGracePeriod: gracePeriod, + Clock: clk, + }) + if err != nil { + t.Fatalf("NewSubmitter: %v", err) + } + return s +} + +// newTestLogger returns a slog.Logger that discards output during tests. +func newTestLogger(t *testing.T) *slog.Logger { + t.Helper() + return slog.New(slog.NewTextHandler(testWriter{t}, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +type testWriter struct{ t *testing.T } + +func (tw testWriter) Write(p []byte) (int, error) { + tw.t.Logf("%s", p) + return len(p), nil +} + +// ============================================================ +// buildEstablishedIPSet +// ============================================================ + +func TestBuildEstablishedIPSet_OnlyEstablished(t *testing.T) { + sockets := []bgpSocket{ + {RemoteIP: "10.0.0.1", State: "ESTABLISHED"}, + {RemoteIP: "10.0.0.2", State: "TIME_WAIT"}, + {RemoteIP: "10.0.0.3", State: "ESTABLISHED"}, + } + got := buildEstablishedIPSet(sockets) + if _, ok := got["10.0.0.1"]; !ok { + t.Error("expected 10.0.0.1 in set") + } + if _, ok := got["10.0.0.3"]; !ok { + t.Error("expected 10.0.0.3 in set") + } + if _, ok := got["10.0.0.2"]; ok { + t.Error("did not expect 10.0.0.2 (TIME_WAIT) in set") + } +} + +func TestBuildEstablishedIPSet_Empty(t *testing.T) { + got := buildEstablishedIPSet(nil /* []bgpSocket */) + if len(got) != 0 { + t.Errorf("expected empty set, got %d entries", len(got)) + } +} + +// ============================================================ +// tunnelNetToIPNet +// ============================================================ + +func TestTunnelNetToIPNet(t *testing.T) { + // 10.0.0.0/31 + b := [5]byte{10, 0, 0, 0, 31} + net := tunnelNetToIPNet(b) + ones, bits := net.Mask.Size() + if ones != 31 || bits != 32 { + t.Errorf("expected /31 got /%d/%d", ones, bits) + } + if net.IP.String() != "10.0.0.0" { + t.Errorf("unexpected IP: %s", net.IP) + } +} + +// ============================================================ +// computeEffectiveStatus +// ============================================================ + +func TestComputeEffectiveStatus_Up(t *testing.T) { + us := &userState{} + now := time.Now() + got := computeEffectiveStatus(true, us, now, 5*time.Minute) + if got != serviceability.BGPStatusUp { + t.Errorf("expected Up, got %v", got) + } +} + +func TestComputeEffectiveStatus_DownImmediateNeverSeen(t *testing.T) { + us := &userState{} // lastUpObservedAt is zero + now := time.Now() + got := computeEffectiveStatus(false, us, now, 5*time.Minute) + if got != serviceability.BGPStatusDown { + t.Errorf("expected Down, got %v", got) + } +} + +func TestComputeEffectiveStatus_DownWithinGrace(t *testing.T) { + now := time.Now() + us := &userState{lastUpObservedAt: now.Add(-1 * time.Minute)} + got := computeEffectiveStatus(false, us, now, 5*time.Minute) + if got != serviceability.BGPStatusUp { + t.Errorf("expected Up (still within grace), got %v", got) + } +} + +func TestComputeEffectiveStatus_DownAfterGrace(t *testing.T) { + now := time.Now() + us := &userState{lastUpObservedAt: now.Add(-10 * time.Minute)} + got := computeEffectiveStatus(false, us, now, 5*time.Minute) + if got != serviceability.BGPStatusDown { + t.Errorf("expected Down (grace elapsed), got %v", got) + } +} + +func TestComputeEffectiveStatus_ZeroGracePeriod(t *testing.T) { + now := time.Now() + us := &userState{lastUpObservedAt: now.Add(-1 * time.Second)} + got := computeEffectiveStatus(false, us, now, 0) + if got != serviceability.BGPStatusDown { + t.Errorf("expected Down (grace=0), got %v", got) + } +} + +// ============================================================ +// shouldSubmit +// ============================================================ + +func TestShouldSubmit_FirstWrite(t *testing.T) { + us := &userState{} // lastWriteTime is zero + if !shouldSubmit(us, serviceability.BGPStatusUp, time.Now(), 6*time.Hour) { + t.Error("expected submit on first write") + } +} + +func TestShouldSubmit_StatusChanged(t *testing.T) { + now := time.Now() + us := &userState{ + lastOnchainStatus: serviceability.BGPStatusUp, + lastWriteTime: now.Add(-1 * time.Minute), + } + if !shouldSubmit(us, serviceability.BGPStatusDown, now, 6*time.Hour) { + t.Error("expected submit on status change") + } +} + +func TestShouldSubmit_NoChangeNoRefresh(t *testing.T) { + now := time.Now() + us := &userState{ + lastOnchainStatus: serviceability.BGPStatusUp, + lastWriteTime: now.Add(-1 * time.Minute), + } + if shouldSubmit(us, serviceability.BGPStatusUp, now, 6*time.Hour) { + t.Error("expected no submit when status unchanged and refresh not due") + } +} + +func TestShouldSubmit_PeriodicRefresh(t *testing.T) { + now := time.Now() + us := &userState{ + lastOnchainStatus: serviceability.BGPStatusUp, + lastWriteTime: now.Add(-7 * time.Hour), + } + if !shouldSubmit(us, serviceability.BGPStatusUp, now, 6*time.Hour) { + t.Error("expected submit when periodic refresh interval elapsed") + } +} + +// ============================================================ +// Worker retry behaviour (integration-style, no Linux syscalls) +// ============================================================ + +// workerTestSetup creates a submitter and pre-populates it with a task +// already in the taskCh, bypassing the Linux-specific tick(). +func workerTestSetup( + t *testing.T, + exec *mockExecutor, + gracePeriod time.Duration, + refreshInterval time.Duration, +) (*Submitter, serviceability.User) { + t.Helper() + devicePK := solana.NewWallet().PublicKey() + userPK := solana.NewWallet().PublicKey() + user := makeUser(userPK, devicePK, [5]byte{10, 0, 0, 0, 31}) + + clk := clockwork.NewFakeClock() + svc := &mockSvcClient{data: &serviceability.ProgramData{Users: []serviceability.User{user}}} + + s := newTestSubmitter(t, clk, exec, svc, nil, devicePK, gracePeriod, refreshInterval) + return s, user +} + +func TestWorker_SuccessUpdatesState(t *testing.T) { + exec := &mockExecutor{} + s, user := workerTestSetup(t, exec, 0, 6*time.Hour) + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + + // Seed a task directly into the channel and mark pending. + task := submitTask{user: user, status: serviceability.BGPStatusUp} + s.mu.Lock() + s.pending[userPK] = true + s.mu.Unlock() + s.taskCh <- task + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Run the worker in a goroutine; wait for it to process the task. + go s.worker(ctx) + + // Poll until state is updated or timeout. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + s.mu.Lock() + us, ok := s.userState[userPK] + s.mu.Unlock() + if ok && us.lastOnchainStatus == serviceability.BGPStatusUp { + break + } + time.Sleep(10 * time.Millisecond) + } + + s.mu.Lock() + us := s.userState[userPK] + pendingAfter := s.pending[userPK] + s.mu.Unlock() + + if us == nil || us.lastOnchainStatus != serviceability.BGPStatusUp { + t.Errorf("expected lastOnchainStatus=Up, got %v", us) + } + if pendingAfter { + t.Error("expected pending to be cleared after worker completion") + } + if len(exec.lastCalls()) != 1 { + t.Errorf("expected 1 executor call, got %d", len(exec.lastCalls())) + } +} + +func TestWorker_RetryOnTransientFailure(t *testing.T) { + // Fail 2 times, succeed on 3rd. + exec := &mockExecutor{failNext: 2, err: errors.New("rpc timeout")} + s, user := workerTestSetup(t, exec, 0, 6*time.Hour) + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + task := submitTask{user: user, status: serviceability.BGPStatusDown} + s.mu.Lock() + s.pending[userPK] = true + s.mu.Unlock() + s.taskCh <- task + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go s.worker(ctx) + + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + s.mu.Lock() + us, ok := s.userState[userPK] + s.mu.Unlock() + if ok && us.lastOnchainStatus == serviceability.BGPStatusDown { + break + } + time.Sleep(10 * time.Millisecond) + } + + s.mu.Lock() + us := s.userState[userPK] + s.mu.Unlock() + + if us == nil || us.lastOnchainStatus != serviceability.BGPStatusDown { + t.Error("expected state updated after eventual success") + } + // Should have made 3 calls total (2 failures + 1 success). + if n := len(exec.lastCalls()); n != 3 { + t.Errorf("expected 3 executor calls, got %d", n) + } +} + +func TestWorker_AllRetriesExhausted_NoStateUpdate(t *testing.T) { + exec := &mockExecutor{failNext: submitMaxRetries, err: errors.New("persistent error")} + s, user := workerTestSetup(t, exec, 0, 6*time.Hour) + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + task := submitTask{user: user, status: serviceability.BGPStatusUp} + s.mu.Lock() + s.pending[userPK] = true + s.mu.Unlock() + s.taskCh <- task + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go s.worker(ctx) + + // Wait for pending to be cleared (worker completed task even on failure). + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + s.mu.Lock() + p := s.pending[userPK] + s.mu.Unlock() + if !p { + break + } + time.Sleep(10 * time.Millisecond) + } + + s.mu.Lock() + _, stateUpdated := s.userState[userPK] + s.mu.Unlock() + + // No state entry should have been written (all retries failed). + if stateUpdated && s.userState[userPK].lastOnchainStatus != 0 { + t.Error("expected no state update after exhausted retries") + } +} + +// ============================================================ +// Pending deduplication +// ============================================================ + +func TestPendingDedup_SecondEnqueueSkipped(t *testing.T) { + exec := &mockExecutor{} + s, user := workerTestSetup(t, exec, 0, 6*time.Hour) + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + + // Manually mark user as pending (simulating a task already in the channel). + s.mu.Lock() + s.pending[userPK] = true + us := s.userStateFor(userPK, serviceability.BGPStatusUnknown) // trigger creation + _ = us + s.mu.Unlock() + + // A second call to the inline enqueue logic should skip because pending=true. + // We test this by checking that taskCh remains empty. + s.mu.Lock() + shouldEnqueue := !s.pending[userPK] + s.mu.Unlock() + + if shouldEnqueue { + t.Error("expected pending check to block second enqueue") + } + if len(s.taskCh) != 0 { + t.Error("expected task channel to remain empty") + } +} + +// ============================================================ +// Periodic refresh via FakeClock +// ============================================================ + +func TestPeriodicRefresh_ReenqueuesAfterInterval(t *testing.T) { + fakeClock := clockwork.NewFakeClock() + now := fakeClock.Now() + + refreshInterval := 6 * time.Hour + us := &userState{ + lastOnchainStatus: serviceability.BGPStatusUp, + lastWriteTime: now.Add(-7 * time.Hour), // older than refresh interval + } + + // shouldSubmit should return true because the refresh interval has elapsed. + if !shouldSubmit(us, serviceability.BGPStatusUp, now, refreshInterval) { + t.Error("expected shouldSubmit=true when periodic refresh interval has elapsed") + } + + // If the last write was recent, should not re-submit. + us.lastWriteTime = now.Add(-1 * time.Hour) + if shouldSubmit(us, serviceability.BGPStatusUp, now, refreshInterval) { + t.Error("expected shouldSubmit=false when refresh interval has not elapsed") + } +} + +// ============================================================ +// NewSubmitter validation +// ============================================================ + +func TestNewSubmitter_MissingFields(t *testing.T) { + cases := []struct { + name string + cfg Config + }{ + {"no log", Config{Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalNet: &netutil.MockLocalNet{}, LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, + {"no executor", Config{Log: slog.Default(), ServiceabilityClient: &mockSvcClient{}, LocalNet: &netutil.MockLocalNet{}, LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, + {"no svc client", Config{Log: slog.Default(), Executor: &mockExecutor{}, LocalNet: &netutil.MockLocalNet{}, LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, + {"no local net", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, + {"zero device pk", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalNet: &netutil.MockLocalNet{}, BGPNamespace: "ns-vrf1"}}, + {"no namespace", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalNet: &netutil.MockLocalNet{}, LocalDevicePK: solana.NewWallet().PublicKey()}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := NewSubmitter(tc.cfg) + if err == nil { + t.Error("expected error for invalid config") + } + }) + } +} + +// ============================================================ +// Channel full / back-pressure (non-blocking enqueue) +// ============================================================ + +func TestTaskChannel_DropWhenFull(t *testing.T) { + exec := &mockExecutor{} + devicePK := solana.NewWallet().PublicKey() + s, err := NewSubmitter(Config{ + Log: slog.Default(), + Executor: exec, + ServiceabilityClient: &mockSvcClient{data: &serviceability.ProgramData{}}, + LocalNet: &netutil.MockLocalNet{InterfacesFunc: func() ([]netutil.Interface, error) { return nil, nil }}, + LocalDevicePK: devicePK, + BGPNamespace: "ns-vrf1", + Interval: time.Hour, + PeriodicRefreshInterval: 6 * time.Hour, + Clock: clockwork.NewFakeClock(), + }) + if err != nil { + t.Fatal(err) + } + + // Fill the channel. + user := makeUser(solana.NewWallet().PublicKey(), devicePK, [5]byte{10, 0, 0, 0, 31}) + for range taskChannelCapacity { + s.taskCh <- submitTask{user: user, status: serviceability.BGPStatusUp} + } + + // A further non-blocking enqueue must not block (select default branch). + done := make(chan struct{}) + var dropped atomic.Bool + go func() { + defer close(done) + select { + case s.taskCh <- submitTask{user: user, status: serviceability.BGPStatusUp}: + default: + dropped.Store(true) + } + }() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("enqueue blocked when channel was full") + } + if !dropped.Load() { + t.Error("expected drop when channel full") + } +} diff --git a/controlplane/telemetry/internal/serviceability/cache.go b/controlplane/telemetry/internal/serviceability/cache.go new file mode 100644 index 000000000..e554b8dae --- /dev/null +++ b/controlplane/telemetry/internal/serviceability/cache.go @@ -0,0 +1,100 @@ +package serviceability + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "golang.org/x/sync/singleflight" +) + +const DefaultCacheTTL = 5 * time.Second + +// ProgramDataProvider is the interface for fetching onchain program data. +type ProgramDataProvider interface { + GetProgramData(ctx context.Context) (*serviceability.ProgramData, error) +} + +// CachingFetcher wraps a ProgramDataProvider and caches the result for the +// configured TTL. Multiple consumers calling GetProgramData within the TTL +// window receive the same cached data, avoiding duplicate RPC calls. +// +// The mutex is only held briefly to read/write the cache — the RPC call itself +// runs outside the lock via singleflight to avoid blocking concurrent callers. +type CachingFetcher struct { + provider ProgramDataProvider + cacheTTL time.Duration + mu sync.RWMutex + cached *serviceability.ProgramData + fetchedAt time.Time + group singleflight.Group +} + +// NewCachingFetcher creates a CachingFetcher with the given provider and TTL. +func NewCachingFetcher(provider ProgramDataProvider, cacheTTL time.Duration) *CachingFetcher { + return &CachingFetcher{ + provider: provider, + cacheTTL: cacheTTL, + } +} + +// GetProgramData returns cached program data if fresh, otherwise fetches from +// the underlying provider. On fetch error with existing cache, returns stale data. +func (f *CachingFetcher) GetProgramData(ctx context.Context) (*serviceability.ProgramData, error) { + // Fast path: check cache under read lock. + f.mu.RLock() + if f.cached != nil && time.Since(f.fetchedAt) < f.cacheTTL { + data := f.cached + f.mu.RUnlock() + return data, nil + } + f.mu.RUnlock() + + // Slow path: fetch via singleflight so concurrent callers share one RPC. + v, err, _ := f.group.Do("fetch", func() (any, error) { + // Re-check cache — another goroutine may have refreshed it while we waited. + f.mu.RLock() + if f.cached != nil && time.Since(f.fetchedAt) < f.cacheTTL { + data := f.cached + f.mu.RUnlock() + return data, nil + } + cachedData := f.cached + cachedAge := time.Since(f.fetchedAt) + f.mu.RUnlock() + + // Use a detached context so a cancellation from the first caller's ctx + // does not fail all other waiters sharing this singleflight call. + // context.WithoutCancel preserves the parent deadline while dropping cancellation. + start := time.Now() + data, err := f.provider.GetProgramData(context.WithoutCancel(ctx)) + metricFetchDuration.Observe(time.Since(start).Seconds()) + + if err != nil { + if cachedData != nil { + metricFetchTotal.WithLabelValues(resultErrorStale).Inc() + metricStaleCacheAge.Set(cachedAge.Seconds()) + slog.Warn("telemetry: program data fetch failed, returning stale cached data", "age", cachedAge, "error", err) + return cachedData, nil + } + metricFetchTotal.WithLabelValues(resultErrorNoCache).Inc() + return nil, err + } + + metricFetchTotal.WithLabelValues(resultSuccess).Inc() + metricStaleCacheAge.Set(0) + + f.mu.Lock() + f.cached = data + f.fetchedAt = time.Now() + f.mu.Unlock() + + return data, nil + }) + if err != nil { + return nil, err + } + return v.(*serviceability.ProgramData), nil +} diff --git a/controlplane/telemetry/internal/serviceability/metrics.go b/controlplane/telemetry/internal/serviceability/metrics.go new file mode 100644 index 000000000..c2acfa67b --- /dev/null +++ b/controlplane/telemetry/internal/serviceability/metrics.go @@ -0,0 +1,39 @@ +package serviceability + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + labelResult = "result" + + resultSuccess = "success" + resultErrorStale = "error_stale" + resultErrorNoCache = "error_no_cache" +) + +var ( + metricFetchDuration = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "doublezero_telemetry_programdata_fetch_duration_seconds", + Help: "Duration of serviceability program data RPC fetch calls (excludes cache hits)", + Buckets: []float64{0.1, 0.5, 1, 2, 5, 10, 30, 60, 120}, + }, + ) + + metricFetchTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "doublezero_telemetry_programdata_fetch_total", + Help: "Total serviceability program data RPC fetch attempts by result", + }, + []string{labelResult}, + ) + + metricStaleCacheAge = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "doublezero_telemetry_programdata_stale_cache_age_seconds", + Help: "Age of stale cache data served on fetch failure (0 when cache is fresh)", + }, + ) +) diff --git a/controlplane/telemetry/internal/telemetry/config.go b/controlplane/telemetry/internal/telemetry/config.go index 1cfb2963e..ea41dc4c2 100644 --- a/controlplane/telemetry/internal/telemetry/config.go +++ b/controlplane/telemetry/internal/telemetry/config.go @@ -8,7 +8,6 @@ import ( "github.com/gagliardetto/solana-go" solanarpc "github.com/gagliardetto/solana-go/rpc" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/geoprobe" - "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" twamplight "github.com/malbeclabs/doublezero/tools/twamp/pkg/light" ) @@ -54,7 +53,7 @@ type Config struct { MaxConsecutiveSenderLosses int // ServiceabilityProgramClient is the client to the serviceability program (for fetching Device/Location). - ServiceabilityProgramClient *serviceability.Client + ServiceabilityProgramClient ServiceabilityProgramClient // RPCClient is the Solana RPC client (for fetching slot). RPCClient *solanarpc.Client diff --git a/e2e/Makefile b/e2e/Makefile index fa4b75956..3d1aff9db 100644 --- a/e2e/Makefile +++ b/e2e/Makefile @@ -55,6 +55,8 @@ test-cleanup: @docker rm -f $$(docker ps -aq --filter label=dz.malbeclabs.com) 2>/dev/null || true @echo "Removing networks with label dz.malbeclabs.com..." @docker network rm $$(docker network ls -q --filter label=dz.malbeclabs.com) 2>/dev/null || true + @echo "Removing volumes with label dz.malbeclabs.com..." + @docker volume rm $$(docker volume ls -q --filter label=dz.malbeclabs.com) 2>/dev/null || true # ----------------------------------------------------------------------------- # Solana image build and push. diff --git a/e2e/internal/devnet/device.go b/e2e/internal/devnet/device.go index 818b80af5..a3514ee43 100644 --- a/e2e/internal/devnet/device.go +++ b/e2e/internal/devnet/device.go @@ -117,6 +117,18 @@ type DeviceTelemetrySpec struct { // MetricsAddr is the listen address for the prometheus metrics server. MetricsAddr string + + // BGPStatusEnable enables the BGP status submitter. + BGPStatusEnable bool + + // BGPStatusInterval is the interval at which to collect and submit BGP status. + BGPStatusInterval time.Duration + + // BGPStatusRefreshInterval is the interval at which to force a periodic re-submission. + BGPStatusRefreshInterval time.Duration + + // BGPStatusDownGracePeriod is how long to keep reporting Up after the BGP session drops. + BGPStatusDownGracePeriod time.Duration } func (s *DeviceSpec) Validate(cyoaNetworkSpec CYOANetworkSpec) error { @@ -626,6 +638,18 @@ func (d *Device) Start(ctx context.Context) error { if d.dn.Manager.GeolocationProgramID != "" { telemetryCommandArgs = append(telemetryCommandArgs, "-geolocation-program-id", d.dn.Manager.GeolocationProgramID) } + if spec.Telemetry.BGPStatusEnable { + telemetryCommandArgs = append(telemetryCommandArgs, "-bgp-status-enable") + } + if spec.Telemetry.BGPStatusInterval > 0 { + telemetryCommandArgs = append(telemetryCommandArgs, "-bgp-status-interval", spec.Telemetry.BGPStatusInterval.String()) + } + if spec.Telemetry.BGPStatusRefreshInterval > 0 { + telemetryCommandArgs = append(telemetryCommandArgs, "-bgp-status-refresh-interval", spec.Telemetry.BGPStatusRefreshInterval.String()) + } + if spec.Telemetry.BGPStatusDownGracePeriod > 0 { + telemetryCommandArgs = append(telemetryCommandArgs, "-bgp-status-down-grace-period", spec.Telemetry.BGPStatusDownGracePeriod.String()) + } // Render the device config from go template. var configContents bytes.Buffer diff --git a/e2e/user_bgp_status_test.go b/e2e/user_bgp_status_test.go new file mode 100644 index 000000000..d383281bf --- /dev/null +++ b/e2e/user_bgp_status_test.go @@ -0,0 +1,215 @@ +//go:build e2e + +package e2e_test + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/e2e/internal/arista" + "github.com/malbeclabs/doublezero/e2e/internal/devnet" + serviceability "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "github.com/stretchr/testify/require" +) + +func TestE2E_UserBGPStatus(t *testing.T) { + t.Parallel() + + log := newTestLoggerForTest(t) + + currentDir, err := os.Getwd() + require.NoError(t, err) + serviceabilityProgramKeypairPath := filepath.Join(currentDir, "data", "serviceability-program-keypair.json") + + // Generate a telemetry keypair for the device's metrics publisher. + telemetryKeypair := solana.NewWallet().PrivateKey + telemetryKeypairJSON, _ := json.Marshal(telemetryKeypair[:]) + telemetryKeypairPath := t.TempDir() + "/dz1-telemetry-keypair.json" + require.NoError(t, os.WriteFile(telemetryKeypairPath, telemetryKeypairJSON, 0600)) + telemetryKeypairPK := telemetryKeypair.PublicKey() + + minBalanceSOL := 3.0 + topUpSOL := 5.0 + dn, err := devnet.New(devnet.DevnetSpec{ + DeployID: "dz-e2e-" + t.Name(), + DeployDir: t.TempDir(), + + CYOANetwork: devnet.CYOANetworkSpec{ + CIDRPrefix: subnetCIDRPrefix, + }, + DeviceTunnelNet: "192.168.100.0/24", + Manager: devnet.ManagerSpec{ + ServiceabilityProgramKeypairPath: serviceabilityProgramKeypairPath, + }, + Funder: devnet.FunderSpec{ + Verbose: true, + MinBalanceSOL: minBalanceSOL, + TopUpSOL: topUpSOL, + Interval: 3 * time.Second, + }, + }, log, dockerClient, subnetAllocator) + require.NoError(t, err) + + err = dn.Start(t.Context(), nil) + require.NoError(t, err) + + device, err := dn.AddDevice(t.Context(), devnet.DeviceSpec{ + Code: "dz1", + Location: "ewr", + Exchange: "xewr", + MetricsPublisherPK: telemetryKeypairPK.String(), + // .8/29 has network address .8, allocatable up to .14, and broadcast .15 + CYOANetworkIPHostID: 8, + CYOANetworkAllocatablePrefix: 29, + LoopbackInterfaces: map[string]string{ + "Loopback255": "vpnv4", + "Loopback256": "ipv4", + }, + Telemetry: devnet.DeviceTelemetrySpec{ + Enabled: true, + KeypairPath: telemetryKeypairPath, + ManagementNS: "ns-management", + Verbose: true, + BGPStatusEnable: true, + BGPStatusInterval: 5 * time.Second, + BGPStatusRefreshInterval: 1 * time.Hour, + }, + }) + require.NoError(t, err) + + // Wait for the telemetry publisher to be funded before any onchain submissions are attempted. + requireEventuallyFunded(t, log, dn.Ledger.GetRPCClient(), telemetryKeypairPK, minBalanceSOL, "telemetry publisher") + + client, err := dn.AddClient(t.Context(), devnet.ClientSpec{ + CYOANetworkIPHostID: 100, + }) + require.NoError(t, err) + + tdn := &TestDevnet{Devnet: dn, log: log} + + t.Run("wait_for_device_activation", func(t *testing.T) { + svcClient, err := dn.Ledger.GetServiceabilityClient() + require.NoError(t, err) + require.Eventually(t, func() bool { + data, err := svcClient.GetProgramData(t.Context()) + if err != nil { + return false + } + for _, d := range data.Devices { + if d.Code == device.Spec.Code && d.Status == serviceability.DeviceStatusActivated { + return true + } + } + return false + }, 60*time.Second, 2*time.Second, "device was not activated within timeout") + }) + + if !t.Run("connect", func(t *testing.T) { + tdn.ConnectIBRLUserTunnel(t, client) + err := client.WaitForTunnelUp(t.Context(), 90*time.Second) + require.NoError(t, err) + }) { + t.FailNow() + } + + if !t.Run("wait_for_user_activation", func(t *testing.T) { + err := tdn.WaitForUserActivation(t, 1) + require.NoError(t, err) + }) { + t.FailNow() + } + + if !t.Run("wait_for_bgp_session_established", func(t *testing.T) { + require.Eventually(t, func() bool { + summary, err := devnet.DeviceExecAristaCliJSON[*arista.ShowIPBGPSummary](t.Context(), device, arista.ShowIPBGPSummaryCmd("vrf1")) + if err != nil { + log.Debug("bgp status: failed to fetch BGP summary", "error", err) + return false + } + vrf, ok := summary.VRFs["vrf1"] + if !ok { + log.Debug("bgp status: vrf1 not found in BGP summary") + return false + } + for ip, peer := range vrf.Peers { + if peer.PeerState == "Established" { + log.Debug("bgp status: BGP session established", "peer", ip) + return true + } + } + log.Debug("bgp status: no established BGP sessions yet", "peers", vrf.Peers) + return false + }, 90*time.Second, 5*time.Second, "BGP session never reached Established state") + }) { + t.FailNow() + } + + if !t.Run("wait_for_bgp_status_up_onchain", func(t *testing.T) { + svcClient, err := dn.Ledger.GetServiceabilityClient() + require.NoError(t, err) + + require.Eventually(t, func() bool { + data, err := svcClient.GetProgramData(t.Context()) + if err != nil { + log.Debug("bgp status: failed to fetch program data", "error", err) + return false + } + for _, user := range data.Users { + if user.Status != serviceability.UserStatusActivated { + continue + } + if user.BgpStatus == uint8(serviceability.BGPStatusUp) { + log.Debug("bgp status: user BGP status is Up onchain", "user", solana.PublicKeyFromBytes(user.PubKey[:]).String()) + return true + } + log.Debug("bgp status: user BGP status not yet Up", "bgpStatus", user.BgpStatus) + } + return false + }, 60*time.Second, 5*time.Second, "user BGP status never reached Up onchain") + }) { + t.FailNow() + } + + if !t.Run("disconnect", func(t *testing.T) { + // Kill the doublezerod daemon ungracefully (SIGKILL) to simulate an + // unexpected BGP session drop without triggering the onchain disconnect + // lifecycle. A clean disconnect via "doublezero disconnect" would delete + // the user account onchain, leaving no record to check BGP status on. + // With an ungraceful kill the user stays activated onchain, giving the + // BGP status submitter a chance to detect the dropped session and submit Down. + _, err := client.Exec(t.Context(), []string{"bash", "-c", "pkill -9 doublezerod || true"}) + require.NoError(t, err) + }) { + t.FailNow() + } + + if !t.Run("wait_for_bgp_status_down_onchain", func(t *testing.T) { + svcClient, err := dn.Ledger.GetServiceabilityClient() + require.NoError(t, err) + + require.Eventually(t, func() bool { + data, err := svcClient.GetProgramData(t.Context()) + if err != nil { + log.Debug("bgp status: failed to fetch program data", "error", err) + return false + } + for _, user := range data.Users { + if user.Status != serviceability.UserStatusActivated { + continue + } + if user.BgpStatus == uint8(serviceability.BGPStatusDown) { + log.Debug("bgp status: user BGP status is Down onchain", "user", solana.PublicKeyFromBytes(user.PubKey[:]).String()) + return true + } + log.Debug("bgp status: user BGP status not yet Down", "bgpStatus", user.BgpStatus) + } + return false + }, 60*time.Second, 5*time.Second, "user BGP status never reached Down onchain") + }) { + t.Fail() + } +} diff --git a/smartcontract/sdk/go/serviceability/executor.go b/smartcontract/sdk/go/serviceability/executor.go index aa64d2b3f..0ad03e2f4 100644 --- a/smartcontract/sdk/go/serviceability/executor.go +++ b/smartcontract/sdk/go/serviceability/executor.go @@ -15,8 +15,9 @@ import ( ) const ( - instructionSetDeviceHealth = 83 - instructionSetLinkHealth = 84 + instructionSetDeviceHealth = 83 + instructionSetLinkHealth = 84 + instructionSetUserBGPStatus = 106 ) var ( @@ -163,6 +164,34 @@ func (e *Executor) SetLinkHealthBatch(ctx context.Context, updates []LinkHealthU return lastSig, ErrAllUpdatesFailed } +// UserBGPStatusUpdate holds the parameters for a single SetUserBGPStatus submission. +type UserBGPStatusUpdate struct { + UserPubkey solana.PublicKey + DevicePubkey solana.PublicKey + Status BGPStatus +} + +// SetUserBGPStatus submits a SetUserBGPStatus instruction for a single user. +// The executor's signer must be the device's metrics_publisher_pk. +func (e *Executor) SetUserBGPStatus(ctx context.Context, u UserBGPStatusUpdate) (solana.Signature, error) { + instr := e.buildSetUserBGPStatusInstruction(u.UserPubkey, u.DevicePubkey, u.Status) + sig, _, err := e.executeTransaction(ctx, []solana.Instruction{instr}) + return sig, err +} + +func (e *Executor) buildSetUserBGPStatusInstruction(userPubkey, devicePubkey solana.PublicKey, status BGPStatus) solana.Instruction { + return &genericInstruction{ + programID: e.programID, + accounts: solana.AccountMetaSlice{ + solana.Meta(userPubkey).WRITE(), + solana.Meta(devicePubkey), + solana.Meta(e.signer.PublicKey()).SIGNER().WRITE(), + solana.Meta(solana.SystemProgramID), + }, + data: []byte{instructionSetUserBGPStatus, byte(status)}, + } +} + func (e *Executor) buildSetDeviceHealthInstruction(devicePubkey, globalStatePubkey solana.PublicKey, health DeviceHealth) solana.Instruction { return &genericInstruction{ programID: e.programID, diff --git a/smartcontract/sdk/go/serviceability/state.go b/smartcontract/sdk/go/serviceability/state.go index 4329b3a85..065614c95 100644 --- a/smartcontract/sdk/go/serviceability/state.go +++ b/smartcontract/sdk/go/serviceability/state.go @@ -594,6 +594,31 @@ func (l LinkHealth) MarshalJSON() ([]byte, error) { return json.Marshal(l.String()) } +type BGPStatus uint8 + +const ( + BGPStatusUnknown BGPStatus = 0 + BGPStatusUp BGPStatus = 1 + BGPStatusDown BGPStatus = 2 +) + +func (b BGPStatus) String() string { + switch b { + case BGPStatusUnknown: + return "unknown" + case BGPStatusUp: + return "up" + case BGPStatusDown: + return "down" + default: + return fmt.Sprintf("BGPStatus(%d)", b) + } +} + +func (b BGPStatus) MarshalJSON() ([]byte, error) { + return json.Marshal(b.String()) +} + type LinkDesiredStatus uint8 const (