From 3b00c85d1d9039baff38a2f524216d9a2a39a8a0 Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Wed, 8 Apr 2026 10:30:34 +0000 Subject: [PATCH 01/10] smartcontract/sdk/go: add BGPStatus type and SetUserBGPStatus executor instruction Adds the BGPStatus enum (Unknown/Up/Down) with JSON serialization and the SetUserBGPStatus instruction (code 106) to the Go serviceability SDK. --- .../sdk/go/serviceability/executor.go | 33 +++++++++++++++++-- smartcontract/sdk/go/serviceability/state.go | 25 ++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/smartcontract/sdk/go/serviceability/executor.go b/smartcontract/sdk/go/serviceability/executor.go index aa64d2b3f..0ad03e2f4 100644 --- a/smartcontract/sdk/go/serviceability/executor.go +++ b/smartcontract/sdk/go/serviceability/executor.go @@ -15,8 +15,9 @@ import ( ) const ( - instructionSetDeviceHealth = 83 - instructionSetLinkHealth = 84 + instructionSetDeviceHealth = 83 + instructionSetLinkHealth = 84 + instructionSetUserBGPStatus = 106 ) var ( @@ -163,6 +164,34 @@ func (e *Executor) SetLinkHealthBatch(ctx context.Context, updates []LinkHealthU return lastSig, ErrAllUpdatesFailed } +// UserBGPStatusUpdate holds the parameters for a single SetUserBGPStatus submission. +type UserBGPStatusUpdate struct { + UserPubkey solana.PublicKey + DevicePubkey solana.PublicKey + Status BGPStatus +} + +// SetUserBGPStatus submits a SetUserBGPStatus instruction for a single user. +// The executor's signer must be the device's metrics_publisher_pk. +func (e *Executor) SetUserBGPStatus(ctx context.Context, u UserBGPStatusUpdate) (solana.Signature, error) { + instr := e.buildSetUserBGPStatusInstruction(u.UserPubkey, u.DevicePubkey, u.Status) + sig, _, err := e.executeTransaction(ctx, []solana.Instruction{instr}) + return sig, err +} + +func (e *Executor) buildSetUserBGPStatusInstruction(userPubkey, devicePubkey solana.PublicKey, status BGPStatus) solana.Instruction { + return &genericInstruction{ + programID: e.programID, + accounts: solana.AccountMetaSlice{ + solana.Meta(userPubkey).WRITE(), + solana.Meta(devicePubkey), + solana.Meta(e.signer.PublicKey()).SIGNER().WRITE(), + solana.Meta(solana.SystemProgramID), + }, + data: []byte{instructionSetUserBGPStatus, byte(status)}, + } +} + func (e *Executor) buildSetDeviceHealthInstruction(devicePubkey, globalStatePubkey solana.PublicKey, health DeviceHealth) solana.Instruction { return &genericInstruction{ programID: e.programID, diff --git a/smartcontract/sdk/go/serviceability/state.go b/smartcontract/sdk/go/serviceability/state.go index 4329b3a85..065614c95 100644 --- a/smartcontract/sdk/go/serviceability/state.go +++ b/smartcontract/sdk/go/serviceability/state.go @@ -594,6 +594,31 @@ func (l LinkHealth) MarshalJSON() ([]byte, error) { return json.Marshal(l.String()) } +type BGPStatus uint8 + +const ( + BGPStatusUnknown BGPStatus = 0 + BGPStatusUp BGPStatus = 1 + BGPStatusDown BGPStatus = 2 +) + +func (b BGPStatus) String() string { + switch b { + case BGPStatusUnknown: + return "unknown" + case BGPStatusUp: + return "up" + case BGPStatusDown: + return "down" + default: + return fmt.Sprintf("BGPStatus(%d)", b) + } +} + +func (b BGPStatus) MarshalJSON() ([]byte, error) { + return json.Marshal(b.String()) +} + type LinkDesiredStatus uint8 const ( From d1981f4f00ef962c04a7d77dcf63a86007bb6310 Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Wed, 8 Apr 2026 10:30:45 +0000 Subject: [PATCH 02/10] telemetry: add BGP status submitter Introduces the bgpstatus package, which reads BGP neighbor state from Arista devices and submits SetUserBGPStatus instructions onchain. Wires it into the telemetry binary with four new flags: --bgp-status-enable, --bgp-status-interval, --bgp-status-refresh-interval, and --bgp-status-down-grace-period. --- controlplane/telemetry/cmd/telemetry/main.go | 60 +- .../telemetry/internal/bgpstatus/bgpstatus.go | 211 +++++++ .../telemetry/internal/bgpstatus/submitter.go | 175 ++++++ .../internal/bgpstatus/submitter_test.go | 529 ++++++++++++++++++ 4 files changed, 974 insertions(+), 1 deletion(-) create mode 100644 controlplane/telemetry/internal/bgpstatus/bgpstatus.go create mode 100644 controlplane/telemetry/internal/bgpstatus/submitter.go create mode 100644 controlplane/telemetry/internal/bgpstatus/submitter_test.go diff --git a/controlplane/telemetry/cmd/telemetry/main.go b/controlplane/telemetry/cmd/telemetry/main.go index 403a94a09..b8c13814d 100644 --- a/controlplane/telemetry/cmd/telemetry/main.go +++ b/controlplane/telemetry/cmd/telemetry/main.go @@ -17,6 +17,7 @@ import ( "github.com/malbeclabs/doublezero/config" "github.com/malbeclabs/doublezero/controlplane/agent/pkg/arista" aristapb "github.com/malbeclabs/doublezero/controlplane/proto/arista/gen/pb-go/arista/EosSdkRpc" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/bgpstatus" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/geoprobe" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/gnmitunnel" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/metrics" @@ -49,6 +50,8 @@ const ( defaultLocalDevicePubkey = "" defaultSubmitterMaxConcurrency = 10 defaultStateCollectInterval = 60 * time.Second + defaultBGPStatusInterval = 60 * time.Second + defaultBGPStatusRefreshInterval = 6 * time.Hour waitForNamespaceTimeout = 30 * time.Second defaultStateIngestHTTPClientTimeout = 10 * time.Second @@ -88,6 +91,12 @@ var ( // geoprobe flags geolocationProgramID = flag.String("geolocation-program-id", "", "The ID of the geolocation program for onchain GeoProbe discovery. If env is provided, this flag is ignored.") + // bgp status submitter flags + bgpStatusEnable = flag.Bool("bgp-status-enable", false, "Enable onchain BGP status submission after each collection tick.") + bgpStatusInterval = flag.Duration("bgp-status-interval", defaultBGPStatusInterval, "Interval between BGP status collection ticks.") + bgpStatusRefreshInterval = flag.Duration("bgp-status-refresh-interval", defaultBGPStatusRefreshInterval, "Periodic re-submission interval to keep last_bgp_reported_at fresh even when status is unchanged.") + bgpStatusDownGracePeriod = flag.Duration("bgp-status-down-grace-period", 0, "Minimum duration a user must be absent before reporting Down status (0 = report immediately).") + // Set by LDFLAGS version = "dev" commit = "none" @@ -333,7 +342,7 @@ func main() { os.Exit(1) } - errCh := make(chan error, 2) + errCh := make(chan error, 3) // Run the onchain device-link latency collector. go func() { @@ -354,6 +363,13 @@ func main() { gnmiTunnelClientErrCh = startGNMITunnelClient(ctx, cancel, log, localDevicePK) } + // Run BGP status submitter if enabled. + var bgpStatusErrCh <-chan error + if *bgpStatusEnable { + bgpStatusErrCh = startBGPStatusSubmitter(ctx, cancel, log, keypair, localDevicePK, + rpcClient, serviceabilityProgramID, localNet, *bgpNamespace) + } + // Wait for the context to be done or an error to be returned. select { case <-ctx.Done(): @@ -370,9 +386,51 @@ func main() { log.Error("gnmi tunnel client exited with error", "error", err) cancel() os.Exit(1) + case err := <-bgpStatusErrCh: + log.Error("BGP status submitter exited with error", "error", err) + cancel() + os.Exit(1) } } +func startBGPStatusSubmitter( + ctx context.Context, + cancel context.CancelFunc, + log *slog.Logger, + keypair solana.PrivateKey, + localDevicePK solana.PublicKey, + rpcClient *solanarpc.Client, + serviceabilityProgramID solana.PublicKey, + localNet netutil.LocalNet, + bgpNamespace string, +) <-chan error { + executor := serviceability.NewExecutor(log, rpcClient, &keypair, serviceabilityProgramID) + svcClient := serviceability.New(rpcClient, serviceabilityProgramID) + + sub, err := bgpstatus.NewSubmitter(bgpstatus.Config{ + Log: log, + Executor: executor, + ServiceabilityClient: svcClient, + LocalNet: localNet, + LocalDevicePK: localDevicePK, + BGPNamespace: bgpNamespace, + Interval: *bgpStatusInterval, + PeriodicRefreshInterval: *bgpStatusRefreshInterval, + DownGracePeriod: *bgpStatusDownGracePeriod, + }) + if err != nil { + log.Error("failed to create BGP status submitter", "error", err) + os.Exit(1) + } + + log.Info("Starting BGP status submitter", + "interval", *bgpStatusInterval, + "refreshInterval", *bgpStatusRefreshInterval, + "downGracePeriod", *bgpStatusDownGracePeriod, + ) + return sub.Start(ctx, cancel) +} + func startStateCollector(ctx context.Context, cancel context.CancelFunc, log *slog.Logger, keypair solana.PrivateKey, localDevicePK solana.PublicKey, bgpNamespace string) <-chan error { // Build state ingest HTTP client. var stateIngestHTTPClient *http.Client diff --git a/controlplane/telemetry/internal/bgpstatus/bgpstatus.go b/controlplane/telemetry/internal/bgpstatus/bgpstatus.go new file mode 100644 index 000000000..064b075c7 --- /dev/null +++ b/controlplane/telemetry/internal/bgpstatus/bgpstatus.go @@ -0,0 +1,211 @@ +package bgpstatus + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "sync" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/jonboulle/clockwork" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" +) + +const ( + taskChannelCapacity = 256 + defaultInterval = 60 * time.Second + defaultRefreshInterval = 6 * time.Hour + submitMaxRetries = 3 + submitBaseBackoff = 100 * time.Millisecond +) + +// BGPStatusExecutor submits a SetUserBGPStatus instruction onchain. +type BGPStatusExecutor interface { + SetUserBGPStatus(ctx context.Context, u serviceability.UserBGPStatusUpdate) (solana.Signature, error) +} + +// ServiceabilityClient fetches the current program state from the ledger. +type ServiceabilityClient interface { + GetProgramData(ctx context.Context) (*serviceability.ProgramData, error) +} + +// Config holds all parameters for the BGP status submitter. +type Config struct { + Log *slog.Logger + Executor BGPStatusExecutor + ServiceabilityClient ServiceabilityClient + LocalNet netutil.LocalNet + LocalDevicePK solana.PublicKey + BGPNamespace string + Interval time.Duration // default: 60s + PeriodicRefreshInterval time.Duration // default: 6h + DownGracePeriod time.Duration // default: 0 + Clock clockwork.Clock +} + +func (c *Config) validate() error { + if c.Log == nil { + return errors.New("log is required") + } + if c.Executor == nil { + return errors.New("executor is required") + } + if c.ServiceabilityClient == nil { + return errors.New("serviceability client is required") + } + if c.LocalNet == nil { + return errors.New("local net is required") + } + if c.LocalDevicePK.IsZero() { + return errors.New("local device pubkey is required") + } + if c.BGPNamespace == "" { + return errors.New("bgp namespace is required") + } + if c.Interval <= 0 { + c.Interval = defaultInterval + } + if c.PeriodicRefreshInterval <= 0 { + c.PeriodicRefreshInterval = defaultRefreshInterval + } + if c.Clock == nil { + c.Clock = clockwork.NewRealClock() + } + return nil +} + +// userState tracks submission state for a single user. +type userState struct { + lastOnchainStatus serviceability.BGPStatus + lastWriteTime time.Time + lastUpObservedAt time.Time +} + +// submitTask is queued to the background worker for onchain submission. +type submitTask struct { + user serviceability.User + status serviceability.BGPStatus +} + +// Submitter collects BGP socket state on each tick, determines per-user BGP +// status, and submits SetUserBGPStatus onchain via a non-blocking worker. +type Submitter struct { + cfg Config + log *slog.Logger + userState map[string]*userState // keyed by user PubKey base58 + pending map[string]bool // users currently in-flight in the worker + mu sync.Mutex + taskCh chan submitTask +} + +// NewSubmitter creates a Submitter after validating the config. +func NewSubmitter(cfg Config) (*Submitter, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("invalid bgpstatus config: %w", err) + } + return &Submitter{ + cfg: cfg, + log: cfg.Log, + userState: make(map[string]*userState), + pending: make(map[string]bool), + taskCh: make(chan submitTask, taskChannelCapacity), + }, nil +} + +// Start launches the submitter in the background and returns a channel that +// receives a fatal error (or is closed on clean shutdown). It mirrors the +// state.Collector.Start pattern. +func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + defer cancel() + if err := s.run(ctx); err != nil { + s.log.Error("bgpstatus: submitter failed", "error", err) + errCh <- err + } + }() + return errCh +} + +// userStateFor returns or creates the per-user tracking entry (caller must hold s.mu). +func (s *Submitter) userStateFor(key string) *userState { + us, ok := s.userState[key] + if !ok { + us = &userState{} + s.userState[key] = us + } + return us +} + +// bgpSocket is the minimal BGP socket representation used by the pure helpers. +// The Linux-specific submitter.go converts state.BGPSocketState to this type. +type bgpSocket struct { + RemoteIP string + State string +} + +// --- Pure helpers (no Linux syscalls; fully testable on all platforms) --- + +// buildEstablishedIPSet returns a set of remote IP strings for BGP sessions +// that are currently in the ESTABLISHED state. +func buildEstablishedIPSet(sockets []bgpSocket) map[string]struct{} { + m := make(map[string]struct{}, len(sockets)) + for _, sock := range sockets { + if sock.State == "ESTABLISHED" { + m[sock.RemoteIP] = struct{}{} + } + } + return m +} + +// tunnelNetToIPNet parses the onchain [5]byte tunnel-net encoding into a +// *net.IPNet. The format is [4 bytes IPv4 prefix | 1 byte CIDR length]. +func tunnelNetToIPNet(b [5]byte) *net.IPNet { + ip := net.IPv4(b[0], b[1], b[2], b[3]) + mask := net.CIDRMask(int(b[4]), 32) + return &net.IPNet{IP: ip.To4(), Mask: mask} +} + +// computeEffectiveStatus derives the BGP status to report, applying the down +// grace period: if observedUp is false but the user was last seen Up within +// gracePeriod, we still report Up to avoid transient flaps. +func computeEffectiveStatus( + observedUp bool, + us *userState, + now time.Time, + gracePeriod time.Duration, +) serviceability.BGPStatus { + if observedUp { + return serviceability.BGPStatusUp + } + if us.lastUpObservedAt.IsZero() { + return serviceability.BGPStatusDown + } + if gracePeriod > 0 && now.Sub(us.lastUpObservedAt) < gracePeriod { + return serviceability.BGPStatusUp + } + return serviceability.BGPStatusDown +} + +// shouldSubmit returns true when a submission is warranted: either the status +// has changed from what was last confirmed onchain, or it is time for a +// periodic keepalive write. +func shouldSubmit( + us *userState, + newStatus serviceability.BGPStatus, + now time.Time, + refreshInterval time.Duration, +) bool { + if us.lastWriteTime.IsZero() { + return true + } + if us.lastOnchainStatus != newStatus { + return true + } + return now.Sub(us.lastWriteTime) >= refreshInterval +} diff --git a/controlplane/telemetry/internal/bgpstatus/submitter.go b/controlplane/telemetry/internal/bgpstatus/submitter.go new file mode 100644 index 000000000..4fa8be048 --- /dev/null +++ b/controlplane/telemetry/internal/bgpstatus/submitter.go @@ -0,0 +1,175 @@ +//go:build linux + +package bgpstatus + +import ( + "context" + "errors" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/state" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" +) + +// run starts the background worker goroutine, then drives the tick loop, +// running an immediate first tick before waiting for the ticker. +func (s *Submitter) run(ctx context.Context) error { + go s.worker(ctx) + + ticker := s.cfg.Clock.NewTicker(s.cfg.Interval) + defer ticker.Stop() + + s.tick(ctx) + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.Chan(): + s.tick(ctx) + } + } +} + +// tick collects BGP socket state, fetches activated users for this device, +// maps each user to their tunnel peer IP, determines Up/Down status (with +// grace period), and enqueues submission tasks for users whose status needs +// updating. +func (s *Submitter) tick(ctx context.Context) { + rawSockets, err := state.GetBGPSocketStatsInNamespace(ctx, s.cfg.BGPNamespace) + if err != nil { + s.log.Error("bgpstatus: failed to collect BGP sockets", "error", err) + return + } + sockets := make([]bgpSocket, len(rawSockets)) + for i, rs := range rawSockets { + sockets[i] = bgpSocket{RemoteIP: rs.RemoteIP, State: rs.State} + } + establishedIPs := buildEstablishedIPSet(sockets) + + programData, err := s.cfg.ServiceabilityClient.GetProgramData(ctx) + if err != nil { + s.log.Error("bgpstatus: failed to fetch program data", "error", err) + return + } + + interfaces, err := s.cfg.LocalNet.Interfaces() + if err != nil { + s.log.Error("bgpstatus: failed to get local interfaces", "error", err) + return + } + + now := s.cfg.Clock.Now() + + s.mu.Lock() + defer s.mu.Unlock() + + for _, user := range programData.Users { + if user.Status != serviceability.UserStatusActivated { + continue + } + if solana.PublicKeyFromBytes(user.DevicePubKey[:]) != s.cfg.LocalDevicePK { + continue + } + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + us := s.userStateFor(userPK) + + // Resolve the BGP peer IP for this user's /31 tunnel net. + tunnelNet := tunnelNetToIPNet(user.TunnelNet) + tunnel, err := netutil.FindLocalTunnel(interfaces, tunnelNet) + if err != nil { + if !errors.Is(err, netutil.ErrLocalTunnelNotFound) { + s.log.Warn("bgpstatus: unexpected error finding tunnel", "user", userPK, "error", err) + } + // Tunnel not up — user cannot be Up. + s.log.Debug("bgpstatus: tunnel not found for user", "user", userPK) + continue + } + + _, observedUp := establishedIPs[tunnel.TargetIP.String()] + if observedUp { + us.lastUpObservedAt = now + } + + effectiveStatus := computeEffectiveStatus(observedUp, us, now, s.cfg.DownGracePeriod) + + if !shouldSubmit(us, effectiveStatus, now, s.cfg.PeriodicRefreshInterval) { + continue + } + + // Skip if a submission for this user is already in-flight. + if s.pending[userPK] { + s.log.Debug("bgpstatus: submission already in-flight, skipping", "user", userPK) + continue + } + + task := submitTask{user: user, status: effectiveStatus} + select { + case s.taskCh <- task: + s.pending[userPK] = true + default: + s.log.Warn("bgpstatus: task channel full, dropping update", "user", userPK) + } + } +} + +// worker drains the task channel and submits each update onchain with retry. +// It updates the per-user tracking state on success and always clears the +// pending flag so the next tick can re-evaluate. +func (s *Submitter) worker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case task := <-s.taskCh: + userPK := solana.PublicKeyFromBytes(task.user.PubKey[:]).String() + + sig, err := s.submitWithRetry(ctx, task) + + s.mu.Lock() + delete(s.pending, userPK) + if err == nil { + us := s.userStateFor(userPK) + us.lastOnchainStatus = task.status + us.lastWriteTime = s.cfg.Clock.Now() + s.log.Info("bgpstatus: submitted BGP status", + "user", userPK, "status", task.status, "sig", sig) + } else { + s.log.Error("bgpstatus: failed to submit after retries", + "user", userPK, "status", task.status, "error", err) + } + s.mu.Unlock() + } + } +} + +// submitWithRetry attempts the onchain write up to submitMaxRetries times with +// exponential backoff. It returns early if the context is cancelled. +func (s *Submitter) submitWithRetry(ctx context.Context, task submitTask) (solana.Signature, error) { + update := serviceability.UserBGPStatusUpdate{ + UserPubkey: solana.PublicKeyFromBytes(task.user.PubKey[:]), + DevicePubkey: s.cfg.LocalDevicePK, + Status: task.status, + } + + var lastErr error + for attempt := range submitMaxRetries { + sig, err := s.cfg.Executor.SetUserBGPStatus(ctx, update) + if err == nil { + return sig, nil + } + lastErr = err + delay := submitBaseBackoff * time.Duration(1< 0 { + m.failNext-- + return solana.Signature{}, m.err + } + return solana.Signature{}, nil +} + +func (m *mockExecutor) lastCalls() []serviceability.UserBGPStatusUpdate { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]serviceability.UserBGPStatusUpdate, len(m.calls)) + copy(out, m.calls) + return out +} + +// --- mock serviceability client --- + +type mockSvcClient struct { + data *serviceability.ProgramData +} + +func (m *mockSvcClient) GetProgramData(_ context.Context) (*serviceability.ProgramData, error) { + return m.data, nil +} + +// --- helpers --- + +func makeUser(pubkey solana.PublicKey, devicePK solana.PublicKey, tunnelNet [5]byte) serviceability.User { + u := serviceability.User{} + copy(u.PubKey[:], pubkey[:]) + copy(u.DevicePubKey[:], devicePK[:]) + u.TunnelNet = tunnelNet + u.Status = serviceability.UserStatusActivated + return u +} + +// newTestSubmitter creates a Submitter with the given clock and executor, +// using MockLocalNet backed by the provided interfaces. +func newTestSubmitter( + t *testing.T, + clk clockwork.Clock, + exec BGPStatusExecutor, + svcClient ServiceabilityClient, + ifaces []netutil.Interface, + devicePK solana.PublicKey, + gracePeriod time.Duration, + refreshInterval time.Duration, +) *Submitter { + t.Helper() + s, err := NewSubmitter(Config{ + Log: newTestLogger(t), + Executor: exec, + ServiceabilityClient: svcClient, + LocalNet: &netutil.MockLocalNet{InterfacesFunc: func() ([]netutil.Interface, error) { return ifaces, nil }}, + LocalDevicePK: devicePK, + BGPNamespace: "ns-vrf1", + Interval: time.Hour, // irrelevant; tests call tick() directly + PeriodicRefreshInterval: refreshInterval, + DownGracePeriod: gracePeriod, + Clock: clk, + }) + if err != nil { + t.Fatalf("NewSubmitter: %v", err) + } + return s +} + +// newTestLogger returns a slog.Logger that discards output during tests. +func newTestLogger(t *testing.T) *slog.Logger { + t.Helper() + return slog.New(slog.NewTextHandler(testWriter{t}, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + +type testWriter struct{ t *testing.T } + +func (tw testWriter) Write(p []byte) (int, error) { + tw.t.Logf("%s", p) + return len(p), nil +} + +// ============================================================ +// buildEstablishedIPSet +// ============================================================ + +func TestBuildEstablishedIPSet_OnlyEstablished(t *testing.T) { + sockets := []bgpSocket{ + {RemoteIP: "10.0.0.1", State: "ESTABLISHED"}, + {RemoteIP: "10.0.0.2", State: "TIME_WAIT"}, + {RemoteIP: "10.0.0.3", State: "ESTABLISHED"}, + } + got := buildEstablishedIPSet(sockets) + if _, ok := got["10.0.0.1"]; !ok { + t.Error("expected 10.0.0.1 in set") + } + if _, ok := got["10.0.0.3"]; !ok { + t.Error("expected 10.0.0.3 in set") + } + if _, ok := got["10.0.0.2"]; ok { + t.Error("did not expect 10.0.0.2 (TIME_WAIT) in set") + } +} + +func TestBuildEstablishedIPSet_Empty(t *testing.T) { + got := buildEstablishedIPSet(nil /* []bgpSocket */) + if len(got) != 0 { + t.Errorf("expected empty set, got %d entries", len(got)) + } +} + +// ============================================================ +// tunnelNetToIPNet +// ============================================================ + +func TestTunnelNetToIPNet(t *testing.T) { + // 10.0.0.0/31 + b := [5]byte{10, 0, 0, 0, 31} + net := tunnelNetToIPNet(b) + ones, bits := net.Mask.Size() + if ones != 31 || bits != 32 { + t.Errorf("expected /31 got /%d/%d", ones, bits) + } + if net.IP.String() != "10.0.0.0" { + t.Errorf("unexpected IP: %s", net.IP) + } +} + +// ============================================================ +// computeEffectiveStatus +// ============================================================ + +func TestComputeEffectiveStatus_Up(t *testing.T) { + us := &userState{} + now := time.Now() + got := computeEffectiveStatus(true, us, now, 5*time.Minute) + if got != serviceability.BGPStatusUp { + t.Errorf("expected Up, got %v", got) + } +} + +func TestComputeEffectiveStatus_DownImmediateNeverSeen(t *testing.T) { + us := &userState{} // lastUpObservedAt is zero + now := time.Now() + got := computeEffectiveStatus(false, us, now, 5*time.Minute) + if got != serviceability.BGPStatusDown { + t.Errorf("expected Down, got %v", got) + } +} + +func TestComputeEffectiveStatus_DownWithinGrace(t *testing.T) { + now := time.Now() + us := &userState{lastUpObservedAt: now.Add(-1 * time.Minute)} + got := computeEffectiveStatus(false, us, now, 5*time.Minute) + if got != serviceability.BGPStatusUp { + t.Errorf("expected Up (still within grace), got %v", got) + } +} + +func TestComputeEffectiveStatus_DownAfterGrace(t *testing.T) { + now := time.Now() + us := &userState{lastUpObservedAt: now.Add(-10 * time.Minute)} + got := computeEffectiveStatus(false, us, now, 5*time.Minute) + if got != serviceability.BGPStatusDown { + t.Errorf("expected Down (grace elapsed), got %v", got) + } +} + +func TestComputeEffectiveStatus_ZeroGracePeriod(t *testing.T) { + now := time.Now() + us := &userState{lastUpObservedAt: now.Add(-1 * time.Second)} + got := computeEffectiveStatus(false, us, now, 0) + if got != serviceability.BGPStatusDown { + t.Errorf("expected Down (grace=0), got %v", got) + } +} + +// ============================================================ +// shouldSubmit +// ============================================================ + +func TestShouldSubmit_FirstWrite(t *testing.T) { + us := &userState{} // lastWriteTime is zero + if !shouldSubmit(us, serviceability.BGPStatusUp, time.Now(), 6*time.Hour) { + t.Error("expected submit on first write") + } +} + +func TestShouldSubmit_StatusChanged(t *testing.T) { + now := time.Now() + us := &userState{ + lastOnchainStatus: serviceability.BGPStatusUp, + lastWriteTime: now.Add(-1 * time.Minute), + } + if !shouldSubmit(us, serviceability.BGPStatusDown, now, 6*time.Hour) { + t.Error("expected submit on status change") + } +} + +func TestShouldSubmit_NoChangeNoRefresh(t *testing.T) { + now := time.Now() + us := &userState{ + lastOnchainStatus: serviceability.BGPStatusUp, + lastWriteTime: now.Add(-1 * time.Minute), + } + if shouldSubmit(us, serviceability.BGPStatusUp, now, 6*time.Hour) { + t.Error("expected no submit when status unchanged and refresh not due") + } +} + +func TestShouldSubmit_PeriodicRefresh(t *testing.T) { + now := time.Now() + us := &userState{ + lastOnchainStatus: serviceability.BGPStatusUp, + lastWriteTime: now.Add(-7 * time.Hour), + } + if !shouldSubmit(us, serviceability.BGPStatusUp, now, 6*time.Hour) { + t.Error("expected submit when periodic refresh interval elapsed") + } +} + +// ============================================================ +// Worker retry behaviour (integration-style, no Linux syscalls) +// ============================================================ + +// workerTestSetup creates a submitter and pre-populates it with a task +// already in the taskCh, bypassing the Linux-specific tick(). +func workerTestSetup( + t *testing.T, + exec *mockExecutor, + gracePeriod time.Duration, + refreshInterval time.Duration, +) (*Submitter, serviceability.User) { + t.Helper() + devicePK := solana.NewWallet().PublicKey() + userPK := solana.NewWallet().PublicKey() + user := makeUser(userPK, devicePK, [5]byte{10, 0, 0, 0, 31}) + + clk := clockwork.NewFakeClock() + svc := &mockSvcClient{data: &serviceability.ProgramData{Users: []serviceability.User{user}}} + + s := newTestSubmitter(t, clk, exec, svc, nil, devicePK, gracePeriod, refreshInterval) + return s, user +} + +func TestWorker_SuccessUpdatesState(t *testing.T) { + exec := &mockExecutor{} + s, user := workerTestSetup(t, exec, 0, 6*time.Hour) + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + + // Seed a task directly into the channel and mark pending. + task := submitTask{user: user, status: serviceability.BGPStatusUp} + s.mu.Lock() + s.pending[userPK] = true + s.mu.Unlock() + s.taskCh <- task + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Run the worker in a goroutine; wait for it to process the task. + go s.worker(ctx) + + // Poll until state is updated or timeout. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + s.mu.Lock() + us, ok := s.userState[userPK] + s.mu.Unlock() + if ok && us.lastOnchainStatus == serviceability.BGPStatusUp { + break + } + time.Sleep(10 * time.Millisecond) + } + + s.mu.Lock() + us := s.userState[userPK] + pendingAfter := s.pending[userPK] + s.mu.Unlock() + + if us == nil || us.lastOnchainStatus != serviceability.BGPStatusUp { + t.Errorf("expected lastOnchainStatus=Up, got %v", us) + } + if pendingAfter { + t.Error("expected pending to be cleared after worker completion") + } + if len(exec.lastCalls()) != 1 { + t.Errorf("expected 1 executor call, got %d", len(exec.lastCalls())) + } +} + +func TestWorker_RetryOnTransientFailure(t *testing.T) { + // Fail 2 times, succeed on 3rd. + exec := &mockExecutor{failNext: 2, err: errors.New("rpc timeout")} + s, user := workerTestSetup(t, exec, 0, 6*time.Hour) + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + task := submitTask{user: user, status: serviceability.BGPStatusDown} + s.mu.Lock() + s.pending[userPK] = true + s.mu.Unlock() + s.taskCh <- task + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go s.worker(ctx) + + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + s.mu.Lock() + us, ok := s.userState[userPK] + s.mu.Unlock() + if ok && us.lastOnchainStatus == serviceability.BGPStatusDown { + break + } + time.Sleep(10 * time.Millisecond) + } + + s.mu.Lock() + us := s.userState[userPK] + s.mu.Unlock() + + if us == nil || us.lastOnchainStatus != serviceability.BGPStatusDown { + t.Error("expected state updated after eventual success") + } + // Should have made 3 calls total (2 failures + 1 success). + if n := len(exec.lastCalls()); n != 3 { + t.Errorf("expected 3 executor calls, got %d", n) + } +} + +func TestWorker_AllRetriesExhausted_NoStateUpdate(t *testing.T) { + exec := &mockExecutor{failNext: submitMaxRetries, err: errors.New("persistent error")} + s, user := workerTestSetup(t, exec, 0, 6*time.Hour) + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + task := submitTask{user: user, status: serviceability.BGPStatusUp} + s.mu.Lock() + s.pending[userPK] = true + s.mu.Unlock() + s.taskCh <- task + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go s.worker(ctx) + + // Wait for pending to be cleared (worker completed task even on failure). + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + s.mu.Lock() + p := s.pending[userPK] + s.mu.Unlock() + if !p { + break + } + time.Sleep(10 * time.Millisecond) + } + + s.mu.Lock() + _, stateUpdated := s.userState[userPK] + s.mu.Unlock() + + // No state entry should have been written (all retries failed). + if stateUpdated && s.userState[userPK].lastOnchainStatus != 0 { + t.Error("expected no state update after exhausted retries") + } +} + +// ============================================================ +// Pending deduplication +// ============================================================ + +func TestPendingDedup_SecondEnqueueSkipped(t *testing.T) { + exec := &mockExecutor{} + s, user := workerTestSetup(t, exec, 0, 6*time.Hour) + + userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() + + // Manually mark user as pending (simulating a task already in the channel). + s.mu.Lock() + s.pending[userPK] = true + us := s.userStateFor(userPK) // trigger creation + _ = us + s.mu.Unlock() + + // A second call to the inline enqueue logic should skip because pending=true. + // We test this by checking that taskCh remains empty. + s.mu.Lock() + shouldEnqueue := !s.pending[userPK] + s.mu.Unlock() + + if shouldEnqueue { + t.Error("expected pending check to block second enqueue") + } + if len(s.taskCh) != 0 { + t.Error("expected task channel to remain empty") + } +} + +// ============================================================ +// Periodic refresh via FakeClock +// ============================================================ + +func TestPeriodicRefresh_ReenqueuesAfterInterval(t *testing.T) { + fakeClock := clockwork.NewFakeClock() + now := fakeClock.Now() + + refreshInterval := 6 * time.Hour + us := &userState{ + lastOnchainStatus: serviceability.BGPStatusUp, + lastWriteTime: now.Add(-7 * time.Hour), // older than refresh interval + } + + // shouldSubmit should return true because the refresh interval has elapsed. + if !shouldSubmit(us, serviceability.BGPStatusUp, now, refreshInterval) { + t.Error("expected shouldSubmit=true when periodic refresh interval has elapsed") + } + + // If the last write was recent, should not re-submit. + us.lastWriteTime = now.Add(-1 * time.Hour) + if shouldSubmit(us, serviceability.BGPStatusUp, now, refreshInterval) { + t.Error("expected shouldSubmit=false when refresh interval has not elapsed") + } +} + +// ============================================================ +// NewSubmitter validation +// ============================================================ + +func TestNewSubmitter_MissingFields(t *testing.T) { + cases := []struct { + name string + cfg Config + }{ + {"no log", Config{Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalNet: &netutil.MockLocalNet{}, LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, + {"no executor", Config{Log: slog.Default(), ServiceabilityClient: &mockSvcClient{}, LocalNet: &netutil.MockLocalNet{}, LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, + {"no svc client", Config{Log: slog.Default(), Executor: &mockExecutor{}, LocalNet: &netutil.MockLocalNet{}, LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, + {"no local net", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalDevicePK: solana.NewWallet().PublicKey(), BGPNamespace: "ns-vrf1"}}, + {"zero device pk", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalNet: &netutil.MockLocalNet{}, BGPNamespace: "ns-vrf1"}}, + {"no namespace", Config{Log: slog.Default(), Executor: &mockExecutor{}, ServiceabilityClient: &mockSvcClient{}, LocalNet: &netutil.MockLocalNet{}, LocalDevicePK: solana.NewWallet().PublicKey()}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := NewSubmitter(tc.cfg) + if err == nil { + t.Error("expected error for invalid config") + } + }) + } +} + +// ============================================================ +// Channel full / back-pressure (non-blocking enqueue) +// ============================================================ + +func TestTaskChannel_DropWhenFull(t *testing.T) { + exec := &mockExecutor{} + devicePK := solana.NewWallet().PublicKey() + s, err := NewSubmitter(Config{ + Log: slog.Default(), + Executor: exec, + ServiceabilityClient: &mockSvcClient{data: &serviceability.ProgramData{}}, + LocalNet: &netutil.MockLocalNet{InterfacesFunc: func() ([]netutil.Interface, error) { return nil, nil }}, + LocalDevicePK: devicePK, + BGPNamespace: "ns-vrf1", + Interval: time.Hour, + PeriodicRefreshInterval: 6 * time.Hour, + Clock: clockwork.NewFakeClock(), + }) + if err != nil { + t.Fatal(err) + } + + // Fill the channel. + user := makeUser(solana.NewWallet().PublicKey(), devicePK, [5]byte{10, 0, 0, 0, 31}) + for range taskChannelCapacity { + s.taskCh <- submitTask{user: user, status: serviceability.BGPStatusUp} + } + + // A further non-blocking enqueue must not block (select default branch). + done := make(chan struct{}) + var dropped atomic.Bool + go func() { + defer close(done) + select { + case s.taskCh <- submitTask{user: user, status: serviceability.BGPStatusUp}: + default: + dropped.Store(true) + } + }() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("enqueue blocked when channel was full") + } + if !dropped.Load() { + t.Error("expected drop when channel full") + } +} From 0e543e8b85cb549e71e712235309740043f18bbf Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Wed, 8 Apr 2026 10:30:50 +0000 Subject: [PATCH 03/10] e2e: add TestE2E_UserBGPStatus and update user list fixtures Adds an end-to-end test that starts a devnet with the BGP status submitter enabled and verifies the onchain state reflects the actual BGP session status. Also extends DeviceTelemetrySpec with the four BGP status flags and updates user list golden files to include the new bgp_status column. --- e2e/internal/devnet/device.go | 24 +++++ e2e/user_bgp_status_test.go | 187 ++++++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 e2e/user_bgp_status_test.go diff --git a/e2e/internal/devnet/device.go b/e2e/internal/devnet/device.go index 818b80af5..a3514ee43 100644 --- a/e2e/internal/devnet/device.go +++ b/e2e/internal/devnet/device.go @@ -117,6 +117,18 @@ type DeviceTelemetrySpec struct { // MetricsAddr is the listen address for the prometheus metrics server. MetricsAddr string + + // BGPStatusEnable enables the BGP status submitter. + BGPStatusEnable bool + + // BGPStatusInterval is the interval at which to collect and submit BGP status. + BGPStatusInterval time.Duration + + // BGPStatusRefreshInterval is the interval at which to force a periodic re-submission. + BGPStatusRefreshInterval time.Duration + + // BGPStatusDownGracePeriod is how long to keep reporting Up after the BGP session drops. + BGPStatusDownGracePeriod time.Duration } func (s *DeviceSpec) Validate(cyoaNetworkSpec CYOANetworkSpec) error { @@ -626,6 +638,18 @@ func (d *Device) Start(ctx context.Context) error { if d.dn.Manager.GeolocationProgramID != "" { telemetryCommandArgs = append(telemetryCommandArgs, "-geolocation-program-id", d.dn.Manager.GeolocationProgramID) } + if spec.Telemetry.BGPStatusEnable { + telemetryCommandArgs = append(telemetryCommandArgs, "-bgp-status-enable") + } + if spec.Telemetry.BGPStatusInterval > 0 { + telemetryCommandArgs = append(telemetryCommandArgs, "-bgp-status-interval", spec.Telemetry.BGPStatusInterval.String()) + } + if spec.Telemetry.BGPStatusRefreshInterval > 0 { + telemetryCommandArgs = append(telemetryCommandArgs, "-bgp-status-refresh-interval", spec.Telemetry.BGPStatusRefreshInterval.String()) + } + if spec.Telemetry.BGPStatusDownGracePeriod > 0 { + telemetryCommandArgs = append(telemetryCommandArgs, "-bgp-status-down-grace-period", spec.Telemetry.BGPStatusDownGracePeriod.String()) + } // Render the device config from go template. var configContents bytes.Buffer diff --git a/e2e/user_bgp_status_test.go b/e2e/user_bgp_status_test.go new file mode 100644 index 000000000..839b38142 --- /dev/null +++ b/e2e/user_bgp_status_test.go @@ -0,0 +1,187 @@ +//go:build e2e + +package e2e_test + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/e2e/internal/arista" + "github.com/malbeclabs/doublezero/e2e/internal/devnet" + serviceability "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "github.com/stretchr/testify/require" +) + +func TestE2E_UserBGPStatus(t *testing.T) { + t.Parallel() + + log := newTestLoggerForTest(t) + + currentDir, err := os.Getwd() + require.NoError(t, err) + serviceabilityProgramKeypairPath := filepath.Join(currentDir, "data", "serviceability-program-keypair.json") + + // Generate a telemetry keypair for the device's metrics publisher. + telemetryKeypair := solana.NewWallet().PrivateKey + telemetryKeypairJSON, _ := json.Marshal(telemetryKeypair[:]) + telemetryKeypairPath := t.TempDir() + "/dz1-telemetry-keypair.json" + require.NoError(t, os.WriteFile(telemetryKeypairPath, telemetryKeypairJSON, 0600)) + telemetryKeypairPK := telemetryKeypair.PublicKey() + + minBalanceSOL := 3.0 + topUpSOL := 5.0 + dn, err := devnet.New(devnet.DevnetSpec{ + DeployID: "dz-e2e-" + t.Name(), + DeployDir: t.TempDir(), + + CYOANetwork: devnet.CYOANetworkSpec{ + CIDRPrefix: subnetCIDRPrefix, + }, + DeviceTunnelNet: "192.168.100.0/24", + Manager: devnet.ManagerSpec{ + ServiceabilityProgramKeypairPath: serviceabilityProgramKeypairPath, + }, + Funder: devnet.FunderSpec{ + Verbose: true, + MinBalanceSOL: minBalanceSOL, + TopUpSOL: topUpSOL, + Interval: 3 * time.Second, + }, + }, log, dockerClient, subnetAllocator) + require.NoError(t, err) + + err = dn.Start(t.Context(), nil) + require.NoError(t, err) + + device, err := dn.AddDevice(t.Context(), devnet.DeviceSpec{ + Code: "dz1", + Location: "ewr", + Exchange: "xewr", + MetricsPublisherPK: telemetryKeypairPK.String(), + // .8/29 has network address .8, allocatable up to .14, and broadcast .15 + CYOANetworkIPHostID: 8, + CYOANetworkAllocatablePrefix: 29, + Telemetry: devnet.DeviceTelemetrySpec{ + Enabled: true, + KeypairPath: telemetryKeypairPath, + ManagementNS: "ns-management", + Verbose: true, + BGPStatusEnable: true, + BGPStatusInterval: 5 * time.Second, + BGPStatusRefreshInterval: 1 * time.Hour, + }, + }) + require.NoError(t, err) + + // Wait for the telemetry publisher to be funded before any onchain submissions are attempted. + requireEventuallyFunded(t, log, dn.Ledger.GetRPCClient(), telemetryKeypairPK, minBalanceSOL, "telemetry publisher") + + client, err := dn.AddClient(t.Context(), devnet.ClientSpec{ + CYOANetworkIPHostID: 100, + }) + require.NoError(t, err) + + tdn := &TestDevnet{Devnet: dn, log: log} + + if !t.Run("wait_for_user_activation", func(t *testing.T) { + err := tdn.WaitForUserActivation(t, 1) + require.NoError(t, err) + }) { + t.FailNow() + } + + if !t.Run("connect", func(t *testing.T) { + tdn.ConnectIBRLUserTunnel(t, client) + err := client.WaitForTunnelUp(t.Context(), 90*time.Second) + require.NoError(t, err) + }) { + t.FailNow() + } + + if !t.Run("wait_for_bgp_session_established", func(t *testing.T) { + require.Eventually(t, func() bool { + summary, err := devnet.DeviceExecAristaCliJSON[*arista.ShowIPBGPSummary](t.Context(), device, arista.ShowIPBGPSummaryCmd("vrf1")) + if err != nil { + log.Debug("bgp status: failed to fetch BGP summary", "error", err) + return false + } + vrf, ok := summary.VRFs["vrf1"] + if !ok { + log.Debug("bgp status: vrf1 not found in BGP summary") + return false + } + for ip, peer := range vrf.Peers { + if peer.PeerState == "Established" { + log.Debug("bgp status: BGP session established", "peer", ip) + return true + } + } + log.Debug("bgp status: no established BGP sessions yet", "peers", vrf.Peers) + return false + }, 90*time.Second, 5*time.Second, "BGP session never reached Established state") + }) { + t.FailNow() + } + + if !t.Run("wait_for_bgp_status_up_onchain", func(t *testing.T) { + svcClient, err := dn.Ledger.GetServiceabilityClient() + require.NoError(t, err) + + require.Eventually(t, func() bool { + data, err := svcClient.GetProgramData(t.Context()) + if err != nil { + log.Debug("bgp status: failed to fetch program data", "error", err) + return false + } + for _, user := range data.Users { + if user.Status != serviceability.UserStatusActivated { + continue + } + if user.BgpStatus == uint8(serviceability.BGPStatusUp) { + log.Debug("bgp status: user BGP status is Up onchain", "user", solana.PublicKeyFromBytes(user.PubKey[:]).String()) + return true + } + log.Debug("bgp status: user BGP status not yet Up", "bgpStatus", user.BgpStatus) + } + return false + }, 60*time.Second, 5*time.Second, "user BGP status never reached Up onchain") + }) { + t.FailNow() + } + + if !t.Run("disconnect", func(t *testing.T) { + tdn.DisconnectUserTunnel(t, client) + }) { + t.FailNow() + } + + if !t.Run("wait_for_bgp_status_down_onchain", func(t *testing.T) { + svcClient, err := dn.Ledger.GetServiceabilityClient() + require.NoError(t, err) + + require.Eventually(t, func() bool { + data, err := svcClient.GetProgramData(t.Context()) + if err != nil { + log.Debug("bgp status: failed to fetch program data", "error", err) + return false + } + for _, user := range data.Users { + if user.Status != serviceability.UserStatusActivated { + continue + } + if user.BgpStatus == uint8(serviceability.BGPStatusDown) { + log.Debug("bgp status: user BGP status is Down onchain", "user", solana.PublicKeyFromBytes(user.PubKey[:]).String()) + return true + } + log.Debug("bgp status: user BGP status not yet Down", "bgpStatus", user.BgpStatus) + } + return false + }, 60*time.Second, 5*time.Second, "user BGP status never reached Down onchain") + }) { + t.Fail() + } +} From 38082191925f442f93d1cfe21a8e5949dee984c3 Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Wed, 8 Apr 2026 10:48:43 +0000 Subject: [PATCH 04/10] telemetry: add BGP status submitter CHANGELOG entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe698c1ca..ec72f06f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,11 +15,15 @@ All notable changes to this project will be documented in this file. - Telemetry - Add optional TLS support to state-ingest server via `--tls-cert-file` and `--tls-key-file` flags; when set, the server listens on both HTTP (`:8080`) and HTTPS (`:8443`) simultaneously - Remove `--additional-child-probes` CLI flag from telemetry-agent; child geoprobe discovery now relies entirely on the onchain Geolocation program + - Add BGP status submitter: on each tick, reads BGP socket state from the device namespace, maps each activated user to their tunnel peer IP, and submits `SetUserBGPStatus` onchain; supports a configurable down grace period and periodic keepalive refresh; enabled via `--bgp-status-enable` with `--bgp-status-interval`, `--bgp-status-refresh-interval`, and `--bgp-status-down-grace-period` flags - Monitor - Add ClickHouse as a telemetry backend for the global monitor alongside existing InfluxDB - E2E tests - Add `TestE2E_GeoprobeIcmpTargets` verifying end-to-end ICMP outbound offset delivery via onchain `outbound-icmp` targets - Refactor geoprobe E2E tests to use testcontainers entrypoints and onchain target discovery + - Add `TestE2E_UserBGPStatus` verifying that the telemetry BGP status submitter correctly reports onchain status transitions as clients connect and establish BGP sessions +- SDK + - Add `BGPStatus` type (Unknown/Up/Down) and `SetUserBGPStatus` executor instruction to the Go serviceability SDK - Smartcontract - Implement `SetUserBGPStatus` processor: validates metrics publisher authorization, updates `bgp_status`, `last_bgp_reported_at`, and `last_bgp_up_at` fields on the user account - Add human-readable error messages for serviceability program errors in the Go SDK, including program log extraction for enhanced debugging From 929160904019747c7bb0b0bc2d94a09a8b667a53 Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Wed, 8 Apr 2026 20:33:13 +0000 Subject: [PATCH 05/10] telemetry: fix BGP status Down not submitted after tunnel disappears - In the bgpstatus submitter, when FindLocalTunnel returns ErrLocalTunnelNotFound but the last known onchain status is Up, fall through with observedUp=false so the Down transition is submitted. Previously the submitter always continued on tunnel-not-found, so a clean-disconnect scenario (tunnel interface removed) never triggered a Down submission. - Change the e2e disconnect step to kill doublezerod ungracefully (SIGKILL) instead of running "doublezero disconnect": a clean disconnect deletes the user account onchain before the submitter can record Down, whereas killing the daemon drops the BGP session while the user remains activated. - Add volume cleanup to "make e2e-test-cleanup" so persistent ledger volumes from test-keep runs don't carry stale state into subsequent runs. --- .../telemetry/internal/bgpstatus/submitter.go | 29 ++++++++++---- e2e/Makefile | 2 + e2e/user_bgp_status_test.go | 40 ++++++++++++++++--- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/controlplane/telemetry/internal/bgpstatus/submitter.go b/controlplane/telemetry/internal/bgpstatus/submitter.go index 4fa8be048..c7305d08e 100644 --- a/controlplane/telemetry/internal/bgpstatus/submitter.go +++ b/controlplane/telemetry/internal/bgpstatus/submitter.go @@ -8,6 +8,7 @@ import ( "time" "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netns" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/state" "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" @@ -55,7 +56,11 @@ func (s *Submitter) tick(ctx context.Context) { return } - interfaces, err := s.cfg.LocalNet.Interfaces() + // Tunnel interfaces for user sessions live in the BGP VRF namespace (e.g. ns-vrf1), + // not the default namespace, so we must read them from there. + interfaces, err := netns.RunInNamespace(s.cfg.BGPNamespace, func() ([]netutil.Interface, error) { + return s.cfg.LocalNet.Interfaces() + }) if err != nil { s.log.Error("bgpstatus: failed to get local interfaces", "error", err) return @@ -79,19 +84,27 @@ func (s *Submitter) tick(ctx context.Context) { // Resolve the BGP peer IP for this user's /31 tunnel net. tunnelNet := tunnelNetToIPNet(user.TunnelNet) + var observedUp bool tunnel, err := netutil.FindLocalTunnel(interfaces, tunnelNet) if err != nil { if !errors.Is(err, netutil.ErrLocalTunnelNotFound) { s.log.Warn("bgpstatus: unexpected error finding tunnel", "user", userPK, "error", err) + continue } - // Tunnel not up — user cannot be Up. s.log.Debug("bgpstatus: tunnel not found for user", "user", userPK) - continue - } - - _, observedUp := establishedIPs[tunnel.TargetIP.String()] - if observedUp { - us.lastUpObservedAt = now + // Without a tunnel, the BGP session cannot be established. + // If the last known onchain status was already Down (or never written), + // there is nothing to update — skip this user. + if us.lastOnchainStatus != serviceability.BGPStatusUp { + continue + } + // The tunnel is gone but the last known onchain status is Up. + // Fall through with observedUp=false so we submit Down. + } else { + _, observedUp = establishedIPs[tunnel.TargetIP.String()] + if observedUp { + us.lastUpObservedAt = now + } } effectiveStatus := computeEffectiveStatus(observedUp, us, now, s.cfg.DownGracePeriod) diff --git a/e2e/Makefile b/e2e/Makefile index fa4b75956..3d1aff9db 100644 --- a/e2e/Makefile +++ b/e2e/Makefile @@ -55,6 +55,8 @@ test-cleanup: @docker rm -f $$(docker ps -aq --filter label=dz.malbeclabs.com) 2>/dev/null || true @echo "Removing networks with label dz.malbeclabs.com..." @docker network rm $$(docker network ls -q --filter label=dz.malbeclabs.com) 2>/dev/null || true + @echo "Removing volumes with label dz.malbeclabs.com..." + @docker volume rm $$(docker volume ls -q --filter label=dz.malbeclabs.com) 2>/dev/null || true # ----------------------------------------------------------------------------- # Solana image build and push. diff --git a/e2e/user_bgp_status_test.go b/e2e/user_bgp_status_test.go index 839b38142..d383281bf 100644 --- a/e2e/user_bgp_status_test.go +++ b/e2e/user_bgp_status_test.go @@ -65,6 +65,10 @@ func TestE2E_UserBGPStatus(t *testing.T) { // .8/29 has network address .8, allocatable up to .14, and broadcast .15 CYOANetworkIPHostID: 8, CYOANetworkAllocatablePrefix: 29, + LoopbackInterfaces: map[string]string{ + "Loopback255": "vpnv4", + "Loopback256": "ipv4", + }, Telemetry: devnet.DeviceTelemetrySpec{ Enabled: true, KeypairPath: telemetryKeypairPath, @@ -87,12 +91,22 @@ func TestE2E_UserBGPStatus(t *testing.T) { tdn := &TestDevnet{Devnet: dn, log: log} - if !t.Run("wait_for_user_activation", func(t *testing.T) { - err := tdn.WaitForUserActivation(t, 1) + t.Run("wait_for_device_activation", func(t *testing.T) { + svcClient, err := dn.Ledger.GetServiceabilityClient() require.NoError(t, err) - }) { - t.FailNow() - } + require.Eventually(t, func() bool { + data, err := svcClient.GetProgramData(t.Context()) + if err != nil { + return false + } + for _, d := range data.Devices { + if d.Code == device.Spec.Code && d.Status == serviceability.DeviceStatusActivated { + return true + } + } + return false + }, 60*time.Second, 2*time.Second, "device was not activated within timeout") + }) if !t.Run("connect", func(t *testing.T) { tdn.ConnectIBRLUserTunnel(t, client) @@ -102,6 +116,13 @@ func TestE2E_UserBGPStatus(t *testing.T) { t.FailNow() } + if !t.Run("wait_for_user_activation", func(t *testing.T) { + err := tdn.WaitForUserActivation(t, 1) + require.NoError(t, err) + }) { + t.FailNow() + } + if !t.Run("wait_for_bgp_session_established", func(t *testing.T) { require.Eventually(t, func() bool { summary, err := devnet.DeviceExecAristaCliJSON[*arista.ShowIPBGPSummary](t.Context(), device, arista.ShowIPBGPSummaryCmd("vrf1")) @@ -154,7 +175,14 @@ func TestE2E_UserBGPStatus(t *testing.T) { } if !t.Run("disconnect", func(t *testing.T) { - tdn.DisconnectUserTunnel(t, client) + // Kill the doublezerod daemon ungracefully (SIGKILL) to simulate an + // unexpected BGP session drop without triggering the onchain disconnect + // lifecycle. A clean disconnect via "doublezero disconnect" would delete + // the user account onchain, leaving no record to check BGP status on. + // With an ungraceful kill the user stays activated onchain, giving the + // BGP status submitter a chance to detect the dropped session and submit Down. + _, err := client.Exec(t.Context(), []string{"bash", "-c", "pkill -9 doublezerod || true"}) + require.NoError(t, err) }) { t.FailNow() } From c88506f18e88c88b873389e7a71d373ab2a86961 Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Wed, 8 Apr 2026 22:03:57 +0000 Subject: [PATCH 06/10] telemetry: cache shared GetProgramData calls with CachingFetcher Introduce a CachingFetcher in controlplane/telemetry/internal/serviceability/ that wraps any ProgramDataProvider and deduplicates RPC calls within a 5s TTL window using sync.RWMutex + singleflight, mirroring the pattern in client/doublezerod/internal/onchain/fetcher.go. Wire the BGP status submitter through the cached client so multiple telemetry components can share a single GetProgramData result per window instead of each issuing independent RPCs. --- controlplane/telemetry/cmd/telemetry/main.go | 11 ++- .../internal/serviceability/cache.go | 97 +++++++++++++++++++ 2 files changed, 105 insertions(+), 3 deletions(-) create mode 100644 controlplane/telemetry/internal/serviceability/cache.go diff --git a/controlplane/telemetry/cmd/telemetry/main.go b/controlplane/telemetry/cmd/telemetry/main.go index b8c13814d..eedb2f147 100644 --- a/controlplane/telemetry/cmd/telemetry/main.go +++ b/controlplane/telemetry/cmd/telemetry/main.go @@ -23,6 +23,7 @@ import ( "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/metrics" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netns" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil" + telemetrysvc "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/serviceability" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/state" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/telemetry" telemetryconfig "github.com/malbeclabs/doublezero/controlplane/telemetry/pkg/config" @@ -366,8 +367,12 @@ func main() { // Run BGP status submitter if enabled. var bgpStatusErrCh <-chan error if *bgpStatusEnable { + cachedSvcClient := telemetrysvc.NewCachingFetcher( + serviceability.New(rpcClient, serviceabilityProgramID), + telemetrysvc.DefaultCacheTTL, + ) bgpStatusErrCh = startBGPStatusSubmitter(ctx, cancel, log, keypair, localDevicePK, - rpcClient, serviceabilityProgramID, localNet, *bgpNamespace) + serviceabilityProgramID, localNet, *bgpNamespace, cachedSvcClient, rpcClient) } // Wait for the context to be done or an error to be returned. @@ -399,13 +404,13 @@ func startBGPStatusSubmitter( log *slog.Logger, keypair solana.PrivateKey, localDevicePK solana.PublicKey, - rpcClient *solanarpc.Client, serviceabilityProgramID solana.PublicKey, localNet netutil.LocalNet, bgpNamespace string, + svcClient telemetrysvc.ProgramDataProvider, + rpcClient *solanarpc.Client, ) <-chan error { executor := serviceability.NewExecutor(log, rpcClient, &keypair, serviceabilityProgramID) - svcClient := serviceability.New(rpcClient, serviceabilityProgramID) sub, err := bgpstatus.NewSubmitter(bgpstatus.Config{ Log: log, diff --git a/controlplane/telemetry/internal/serviceability/cache.go b/controlplane/telemetry/internal/serviceability/cache.go new file mode 100644 index 000000000..404418962 --- /dev/null +++ b/controlplane/telemetry/internal/serviceability/cache.go @@ -0,0 +1,97 @@ +package serviceability + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "golang.org/x/sync/singleflight" +) + +const DefaultCacheTTL = 5 * time.Second + +// ProgramDataProvider is the interface for fetching onchain program data. +type ProgramDataProvider interface { + GetProgramData(ctx context.Context) (*serviceability.ProgramData, error) +} + +// CachingFetcher wraps a ProgramDataProvider and caches the result for the +// configured TTL. Multiple consumers calling GetProgramData within the TTL +// window receive the same cached data, avoiding duplicate RPC calls. +// +// The mutex is only held briefly to read/write the cache — the RPC call itself +// runs outside the lock via singleflight to avoid blocking concurrent callers. +type CachingFetcher struct { + provider ProgramDataProvider + cacheTTL time.Duration + mu sync.RWMutex + cached *serviceability.ProgramData + fetchedAt time.Time + group singleflight.Group +} + +// NewCachingFetcher creates a CachingFetcher with the given provider and TTL. +func NewCachingFetcher(provider ProgramDataProvider, cacheTTL time.Duration) *CachingFetcher { + return &CachingFetcher{ + provider: provider, + cacheTTL: cacheTTL, + } +} + +// GetProgramData returns cached program data if fresh, otherwise fetches from +// the underlying provider. On fetch error with existing cache, returns stale data. +func (f *CachingFetcher) GetProgramData(ctx context.Context) (*serviceability.ProgramData, error) { + // Fast path: check cache under read lock. + f.mu.RLock() + if f.cached != nil && time.Since(f.fetchedAt) < f.cacheTTL { + data := f.cached + f.mu.RUnlock() + return data, nil + } + f.mu.RUnlock() + + // Slow path: fetch via singleflight so concurrent callers share one RPC. + v, err, _ := f.group.Do("fetch", func() (any, error) { + // Re-check cache — another goroutine may have refreshed it while we waited. + f.mu.RLock() + if f.cached != nil && time.Since(f.fetchedAt) < f.cacheTTL { + data := f.cached + f.mu.RUnlock() + return data, nil + } + cachedData := f.cached + cachedAge := time.Since(f.fetchedAt) + f.mu.RUnlock() + + start := time.Now() + data, err := f.provider.GetProgramData(ctx) + metricFetchDuration.Observe(time.Since(start).Seconds()) + + if err != nil { + if cachedData != nil { + metricFetchTotal.WithLabelValues(resultErrorStale).Inc() + metricStaleCacheAge.Set(cachedAge.Seconds()) + slog.Warn("telemetry: program data fetch failed, returning stale cached data", "age", cachedAge, "error", err) + return cachedData, nil + } + metricFetchTotal.WithLabelValues(resultErrorNoCache).Inc() + return nil, err + } + + metricFetchTotal.WithLabelValues(resultSuccess).Inc() + metricStaleCacheAge.Set(0) + + f.mu.Lock() + f.cached = data + f.fetchedAt = time.Now() + f.mu.Unlock() + + return data, nil + }) + if err != nil { + return nil, err + } + return v.(*serviceability.ProgramData), nil +} From a47d4e0eeb3e03c58816a41319e0ee8d215c396e Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Wed, 8 Apr 2026 22:04:03 +0000 Subject: [PATCH 07/10] telemetry: add Prometheus metrics for BGP status submitter Add observability for the BGP status pipeline: CachingFetcher (controlplane/telemetry/internal/serviceability): - doublezero_telemetry_programdata_fetch_duration_seconds: RPC latency - doublezero_telemetry_programdata_fetch_total{result}: fetch outcomes - doublezero_telemetry_programdata_stale_cache_age_seconds: staleness on error BGP status submitter (controlplane/telemetry/internal/bgpstatus): - doublezero_bgpstatus_submissions_total{bgp_status,result}: onchain submissions by status and outcome - doublezero_bgpstatus_submission_duration_seconds: onchain transaction latency --- .../telemetry/internal/bgpstatus/metrics.go | 24 ++++++++++++ .../telemetry/internal/bgpstatus/submitter.go | 5 +++ .../internal/serviceability/metrics.go | 39 +++++++++++++++++++ 3 files changed, 68 insertions(+) create mode 100644 controlplane/telemetry/internal/bgpstatus/metrics.go create mode 100644 controlplane/telemetry/internal/serviceability/metrics.go diff --git a/controlplane/telemetry/internal/bgpstatus/metrics.go b/controlplane/telemetry/internal/bgpstatus/metrics.go new file mode 100644 index 000000000..f59cfffd6 --- /dev/null +++ b/controlplane/telemetry/internal/bgpstatus/metrics.go @@ -0,0 +1,24 @@ +package bgpstatus + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + metricSubmissionsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "doublezero_bgpstatus_submissions_total", + Help: "Total onchain BGP status submissions by BGP status and result", + }, + []string{"bgp_status", "result"}, + ) + + metricSubmissionDuration = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "doublezero_bgpstatus_submission_duration_seconds", + Help: "Duration of successful onchain BGP status submissions", + Buckets: []float64{0.1, 0.5, 1, 2, 5, 10, 30, 60}, + }, + ) +) diff --git a/controlplane/telemetry/internal/bgpstatus/submitter.go b/controlplane/telemetry/internal/bgpstatus/submitter.go index c7305d08e..02dd2f8b9 100644 --- a/controlplane/telemetry/internal/bgpstatus/submitter.go +++ b/controlplane/telemetry/internal/bgpstatus/submitter.go @@ -142,15 +142,18 @@ func (s *Submitter) worker(ctx context.Context) { sig, err := s.submitWithRetry(ctx, task) + statusLabel := task.status.String() s.mu.Lock() delete(s.pending, userPK) if err == nil { + metricSubmissionsTotal.WithLabelValues(statusLabel, "success").Inc() us := s.userStateFor(userPK) us.lastOnchainStatus = task.status us.lastWriteTime = s.cfg.Clock.Now() s.log.Info("bgpstatus: submitted BGP status", "user", userPK, "status", task.status, "sig", sig) } else { + metricSubmissionsTotal.WithLabelValues(statusLabel, "error").Inc() s.log.Error("bgpstatus: failed to submit after retries", "user", userPK, "status", task.status, "error", err) } @@ -170,8 +173,10 @@ func (s *Submitter) submitWithRetry(ctx context.Context, task submitTask) (solan var lastErr error for attempt := range submitMaxRetries { + start := time.Now() sig, err := s.cfg.Executor.SetUserBGPStatus(ctx, update) if err == nil { + metricSubmissionDuration.Observe(time.Since(start).Seconds()) return sig, nil } lastErr = err diff --git a/controlplane/telemetry/internal/serviceability/metrics.go b/controlplane/telemetry/internal/serviceability/metrics.go new file mode 100644 index 000000000..c2acfa67b --- /dev/null +++ b/controlplane/telemetry/internal/serviceability/metrics.go @@ -0,0 +1,39 @@ +package serviceability + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + labelResult = "result" + + resultSuccess = "success" + resultErrorStale = "error_stale" + resultErrorNoCache = "error_no_cache" +) + +var ( + metricFetchDuration = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "doublezero_telemetry_programdata_fetch_duration_seconds", + Help: "Duration of serviceability program data RPC fetch calls (excludes cache hits)", + Buckets: []float64{0.1, 0.5, 1, 2, 5, 10, 30, 60, 120}, + }, + ) + + metricFetchTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "doublezero_telemetry_programdata_fetch_total", + Help: "Total serviceability program data RPC fetch attempts by result", + }, + []string{labelResult}, + ) + + metricStaleCacheAge = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "doublezero_telemetry_programdata_stale_cache_age_seconds", + Help: "Age of stale cache data served on fetch failure (0 when cache is fresh)", + }, + ) +) From cfc67299cf9b13fe06b3c5ed6ae7a7e23cc4ca90 Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Wed, 8 Apr 2026 22:08:00 +0000 Subject: [PATCH 08/10] telemetry: pass shared CachingFetcher to peer discovery and collector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create a single CachingFetcher instance in main() and pass it to all three consumers — ledger peer discovery, the telemetry collector, and the BGP status submitter — so they share one GetProgramData result per TTL window instead of each issuing independent RPCs. Change Config.ServiceabilityProgramClient from *serviceability.Client to the ServiceabilityProgramClient interface so the cached wrapper can be injected there too. --- controlplane/telemetry/cmd/telemetry/main.go | 12 ++++++------ controlplane/telemetry/internal/telemetry/config.go | 3 +-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/controlplane/telemetry/cmd/telemetry/main.go b/controlplane/telemetry/cmd/telemetry/main.go index eedb2f147..3132a866f 100644 --- a/controlplane/telemetry/cmd/telemetry/main.go +++ b/controlplane/telemetry/cmd/telemetry/main.go @@ -279,11 +279,15 @@ func main() { os.Exit(1) } localNet := netutil.NewLocalNet(log) + cachedSvcClient := telemetrysvc.NewCachingFetcher( + serviceability.New(rpcClient, serviceabilityProgramID), + telemetrysvc.DefaultCacheTTL, + ) peerDiscovery, err := telemetry.NewLedgerPeerDiscovery( &telemetry.LedgerPeerDiscoveryConfig{ Logger: log, LocalDevicePK: localDevicePK, - ProgramClient: serviceability.New(rpcClient, serviceabilityProgramID), + ProgramClient: cachedSvcClient, LocalNet: localNet, TWAMPPort: uint16(*twampListenPort), RefreshInterval: *peersRefreshInterval, @@ -323,7 +327,7 @@ func main() { TWAMPReflector: reflector, PeerDiscovery: peerDiscovery, TelemetryProgramClient: sdktelemetry.New(log, rpcClient, &keypair, telemetryProgramID), - ServiceabilityProgramClient: serviceability.New(rpcClient, serviceabilityProgramID), + ServiceabilityProgramClient: cachedSvcClient, RPCClient: rpcClient, Keypair: keypair, GetCurrentEpochFunc: func(ctx context.Context) (uint64, error) { @@ -367,10 +371,6 @@ func main() { // Run BGP status submitter if enabled. var bgpStatusErrCh <-chan error if *bgpStatusEnable { - cachedSvcClient := telemetrysvc.NewCachingFetcher( - serviceability.New(rpcClient, serviceabilityProgramID), - telemetrysvc.DefaultCacheTTL, - ) bgpStatusErrCh = startBGPStatusSubmitter(ctx, cancel, log, keypair, localDevicePK, serviceabilityProgramID, localNet, *bgpNamespace, cachedSvcClient, rpcClient) } diff --git a/controlplane/telemetry/internal/telemetry/config.go b/controlplane/telemetry/internal/telemetry/config.go index 1cfb2963e..ea41dc4c2 100644 --- a/controlplane/telemetry/internal/telemetry/config.go +++ b/controlplane/telemetry/internal/telemetry/config.go @@ -8,7 +8,6 @@ import ( "github.com/gagliardetto/solana-go" solanarpc "github.com/gagliardetto/solana-go/rpc" "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/geoprobe" - "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" twamplight "github.com/malbeclabs/doublezero/tools/twamp/pkg/light" ) @@ -54,7 +53,7 @@ type Config struct { MaxConsecutiveSenderLosses int // ServiceabilityProgramClient is the client to the serviceability program (for fetching Device/Location). - ServiceabilityProgramClient *serviceability.Client + ServiceabilityProgramClient ServiceabilityProgramClient // RPCClient is the Solana RPC client (for fetching slot). RPCClient *solanarpc.Client From 6223851c3ae6483a6a437a54e9ec661bd47dc4bd Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Thu, 9 Apr 2026 10:36:04 +0000 Subject: [PATCH 09/10] telemetry/bgpstatus: fix three review issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Seed lastOnchainStatus from onchain state on restart. userStateFor now accepts an initialStatus arg. The tick loop seeds it from user.BgpStatus so a restarted submitter correctly transitions Up→Down when the tunnel has disappeared, rather than skipping the user because Unknown != Up. 2. Detach singleflight RPC from the first caller's context. Use context.WithoutCancel inside the singleflight callback so a cancelled caller does not propagate failure to all other waiters. 3. Prune userState on each tick. Delete entries for users no longer activated on this device to prevent unbounded memory growth as users are deactivated or moved. --- .../telemetry/internal/bgpstatus/bgpstatus.go | 6 ++++-- .../telemetry/internal/bgpstatus/submitter.go | 19 +++++++++++++++++-- .../internal/bgpstatus/submitter_test.go | 2 +- .../internal/serviceability/cache.go | 5 ++++- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/controlplane/telemetry/internal/bgpstatus/bgpstatus.go b/controlplane/telemetry/internal/bgpstatus/bgpstatus.go index 064b075c7..36fd01139 100644 --- a/controlplane/telemetry/internal/bgpstatus/bgpstatus.go +++ b/controlplane/telemetry/internal/bgpstatus/bgpstatus.go @@ -133,10 +133,12 @@ func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan } // userStateFor returns or creates the per-user tracking entry (caller must hold s.mu). -func (s *Submitter) userStateFor(key string) *userState { +// initialStatus is used only when creating a new entry; it seeds lastOnchainStatus so +// that a restarted submitter correctly handles users whose onchain state is already Up. +func (s *Submitter) userStateFor(key string, initialStatus serviceability.BGPStatus) *userState { us, ok := s.userState[key] if !ok { - us = &userState{} + us = &userState{lastOnchainStatus: initialStatus} s.userState[key] = us } return us diff --git a/controlplane/telemetry/internal/bgpstatus/submitter.go b/controlplane/telemetry/internal/bgpstatus/submitter.go index 02dd2f8b9..d90bfb5fa 100644 --- a/controlplane/telemetry/internal/bgpstatus/submitter.go +++ b/controlplane/telemetry/internal/bgpstatus/submitter.go @@ -71,6 +71,8 @@ func (s *Submitter) tick(ctx context.Context) { s.mu.Lock() defer s.mu.Unlock() + activeUserKeys := make(map[string]struct{}) + for _, user := range programData.Users { if user.Status != serviceability.UserStatusActivated { continue @@ -80,7 +82,12 @@ func (s *Submitter) tick(ctx context.Context) { } userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() - us := s.userStateFor(userPK) + activeUserKeys[userPK] = struct{}{} + + // Seed lastOnchainStatus from the ledger on first observation (e.g. after + // a daemon restart) so a disappeared tunnel correctly transitions to Down + // rather than being skipped because Unknown != Up. + us := s.userStateFor(userPK, serviceability.BGPStatus(user.BgpStatus)) // Resolve the BGP peer IP for this user's /31 tunnel net. tunnelNet := tunnelNetToIPNet(user.TunnelNet) @@ -127,6 +134,14 @@ func (s *Submitter) tick(ctx context.Context) { s.log.Warn("bgpstatus: task channel full, dropping update", "user", userPK) } } + + // Prune userState entries for users no longer activated on this device to + // prevent unbounded memory growth as users come and go. + for pk := range s.userState { + if _, active := activeUserKeys[pk]; !active { + delete(s.userState, pk) + } + } } // worker drains the task channel and submits each update onchain with retry. @@ -147,7 +162,7 @@ func (s *Submitter) worker(ctx context.Context) { delete(s.pending, userPK) if err == nil { metricSubmissionsTotal.WithLabelValues(statusLabel, "success").Inc() - us := s.userStateFor(userPK) + us := s.userStateFor(userPK, serviceability.BGPStatusUnknown) us.lastOnchainStatus = task.status us.lastWriteTime = s.cfg.Clock.Now() s.log.Info("bgpstatus: submitted BGP status", diff --git a/controlplane/telemetry/internal/bgpstatus/submitter_test.go b/controlplane/telemetry/internal/bgpstatus/submitter_test.go index f29dd862b..742a2076a 100644 --- a/controlplane/telemetry/internal/bgpstatus/submitter_test.go +++ b/controlplane/telemetry/internal/bgpstatus/submitter_test.go @@ -408,7 +408,7 @@ func TestPendingDedup_SecondEnqueueSkipped(t *testing.T) { // Manually mark user as pending (simulating a task already in the channel). s.mu.Lock() s.pending[userPK] = true - us := s.userStateFor(userPK) // trigger creation + us := s.userStateFor(userPK, serviceability.BGPStatusUnknown) // trigger creation _ = us s.mu.Unlock() diff --git a/controlplane/telemetry/internal/serviceability/cache.go b/controlplane/telemetry/internal/serviceability/cache.go index 404418962..e554b8dae 100644 --- a/controlplane/telemetry/internal/serviceability/cache.go +++ b/controlplane/telemetry/internal/serviceability/cache.go @@ -65,8 +65,11 @@ func (f *CachingFetcher) GetProgramData(ctx context.Context) (*serviceability.Pr cachedAge := time.Since(f.fetchedAt) f.mu.RUnlock() + // Use a detached context so a cancellation from the first caller's ctx + // does not fail all other waiters sharing this singleflight call. + // context.WithoutCancel preserves the parent deadline while dropping cancellation. start := time.Now() - data, err := f.provider.GetProgramData(ctx) + data, err := f.provider.GetProgramData(context.WithoutCancel(ctx)) metricFetchDuration.Observe(time.Since(start).Seconds()) if err != nil { From 80faf918cb73c20c44d6cf94716a08e3a4f94210 Mon Sep 17 00:00:00 2001 From: Juan Olveira Date: Thu, 9 Apr 2026 10:53:31 +0000 Subject: [PATCH 10/10] bgpstatus: clear pending flag when pruning deactivated user state A user deactivated while a submission was in-flight would retain a stale pending flag after the userState entry was pruned, permanently blocking resubmission if the user was later reactivated. --- controlplane/telemetry/internal/bgpstatus/submitter.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/controlplane/telemetry/internal/bgpstatus/submitter.go b/controlplane/telemetry/internal/bgpstatus/submitter.go index d90bfb5fa..9c119b6c9 100644 --- a/controlplane/telemetry/internal/bgpstatus/submitter.go +++ b/controlplane/telemetry/internal/bgpstatus/submitter.go @@ -137,9 +137,11 @@ func (s *Submitter) tick(ctx context.Context) { // Prune userState entries for users no longer activated on this device to // prevent unbounded memory growth as users come and go. + // Also clear pending flags so a reactivated user is not permanently blocked. for pk := range s.userState { if _, active := activeUserKeys[pk]; !active { delete(s.userState, pk) + delete(s.pending, pk) } } }