Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ All notable changes to this project will be documented in this file.
- Telemetry
- Add optional TLS support to state-ingest server via `--tls-cert-file` and `--tls-key-file` flags; when set, the server listens on both HTTP (`:8080`) and HTTPS (`:8443`) simultaneously
- Remove `--additional-child-probes` CLI flag from telemetry-agent; child geoprobe discovery now relies entirely on the onchain Geolocation program
- Add BGP status submitter: on each tick, reads BGP socket state from the device namespace, maps each activated user to their tunnel peer IP, and submits `SetUserBGPStatus` onchain; supports a configurable down grace period and periodic keepalive refresh; enabled via `--bgp-status-enable` with `--bgp-status-interval`, `--bgp-status-refresh-interval`, and `--bgp-status-down-grace-period` flags
- Monitor
- Add ClickHouse as a telemetry backend for the global monitor alongside existing InfluxDB
- E2E tests
- Add `TestE2E_GeoprobeIcmpTargets` verifying end-to-end ICMP outbound offset delivery via onchain `outbound-icmp` targets
- Refactor geoprobe E2E tests to use testcontainers entrypoints and onchain target discovery
- Add `TestE2E_UserBGPStatus` verifying that the telemetry BGP status submitter correctly reports onchain status transitions as clients connect and establish BGP sessions
- SDK
- Add `BGPStatus` type (Unknown/Up/Down) and `SetUserBGPStatus` executor instruction to the Go serviceability SDK
- Smartcontract
- Implement `SetUserBGPStatus` processor: validates metrics publisher authorization, updates `bgp_status`, `last_bgp_reported_at`, and `last_bgp_up_at` fields on the user account
- Add human-readable error messages for serviceability program errors in the Go SDK, including program log extraction for enhanced debugging
Expand Down
69 changes: 66 additions & 3 deletions controlplane/telemetry/cmd/telemetry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (
"github.com/malbeclabs/doublezero/config"
"github.com/malbeclabs/doublezero/controlplane/agent/pkg/arista"
aristapb "github.com/malbeclabs/doublezero/controlplane/proto/arista/gen/pb-go/arista/EosSdkRpc"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/bgpstatus"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/geoprobe"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/gnmitunnel"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/metrics"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netns"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil"
telemetrysvc "github.com/malbeclabs/doublezero/controlplane/telemetry/internal/serviceability"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/state"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/telemetry"
telemetryconfig "github.com/malbeclabs/doublezero/controlplane/telemetry/pkg/config"
Expand Down Expand Up @@ -49,6 +51,8 @@ const (
defaultLocalDevicePubkey = ""
defaultSubmitterMaxConcurrency = 10
defaultStateCollectInterval = 60 * time.Second
defaultBGPStatusInterval = 60 * time.Second
defaultBGPStatusRefreshInterval = 6 * time.Hour

waitForNamespaceTimeout = 30 * time.Second
defaultStateIngestHTTPClientTimeout = 10 * time.Second
Expand Down Expand Up @@ -88,6 +92,12 @@ var (
// geoprobe flags
geolocationProgramID = flag.String("geolocation-program-id", "", "The ID of the geolocation program for onchain GeoProbe discovery. If env is provided, this flag is ignored.")

// bgp status submitter flags
bgpStatusEnable = flag.Bool("bgp-status-enable", false, "Enable onchain BGP status submission after each collection tick.")
bgpStatusInterval = flag.Duration("bgp-status-interval", defaultBGPStatusInterval, "Interval between BGP status collection ticks.")
bgpStatusRefreshInterval = flag.Duration("bgp-status-refresh-interval", defaultBGPStatusRefreshInterval, "Periodic re-submission interval to keep last_bgp_reported_at fresh even when status is unchanged.")
bgpStatusDownGracePeriod = flag.Duration("bgp-status-down-grace-period", 0, "Minimum duration a user must be absent before reporting Down status (0 = report immediately).")

// Set by LDFLAGS
version = "dev"
commit = "none"
Expand Down Expand Up @@ -269,11 +279,15 @@ func main() {
os.Exit(1)
}
localNet := netutil.NewLocalNet(log)
cachedSvcClient := telemetrysvc.NewCachingFetcher(
serviceability.New(rpcClient, serviceabilityProgramID),
telemetrysvc.DefaultCacheTTL,
)
peerDiscovery, err := telemetry.NewLedgerPeerDiscovery(
&telemetry.LedgerPeerDiscoveryConfig{
Logger: log,
LocalDevicePK: localDevicePK,
ProgramClient: serviceability.New(rpcClient, serviceabilityProgramID),
ProgramClient: cachedSvcClient,
LocalNet: localNet,
TWAMPPort: uint16(*twampListenPort),
RefreshInterval: *peersRefreshInterval,
Expand Down Expand Up @@ -313,7 +327,7 @@ func main() {
TWAMPReflector: reflector,
PeerDiscovery: peerDiscovery,
TelemetryProgramClient: sdktelemetry.New(log, rpcClient, &keypair, telemetryProgramID),
ServiceabilityProgramClient: serviceability.New(rpcClient, serviceabilityProgramID),
ServiceabilityProgramClient: cachedSvcClient,
RPCClient: rpcClient,
Keypair: keypair,
GetCurrentEpochFunc: func(ctx context.Context) (uint64, error) {
Expand All @@ -333,7 +347,7 @@ func main() {
os.Exit(1)
}

errCh := make(chan error, 2)
errCh := make(chan error, 3)

// Run the onchain device-link latency collector.
go func() {
Expand All @@ -354,6 +368,13 @@ func main() {
gnmiTunnelClientErrCh = startGNMITunnelClient(ctx, cancel, log, localDevicePK)
}

// Run BGP status submitter if enabled.
var bgpStatusErrCh <-chan error
if *bgpStatusEnable {
bgpStatusErrCh = startBGPStatusSubmitter(ctx, cancel, log, keypair, localDevicePK,
serviceabilityProgramID, localNet, *bgpNamespace, cachedSvcClient, rpcClient)
}

// Wait for the context to be done or an error to be returned.
select {
case <-ctx.Done():
Expand All @@ -370,7 +391,49 @@ func main() {
log.Error("gnmi tunnel client exited with error", "error", err)
cancel()
os.Exit(1)
case err := <-bgpStatusErrCh:
log.Error("BGP status submitter exited with error", "error", err)
cancel()
os.Exit(1)
}
}

func startBGPStatusSubmitter(
ctx context.Context,
cancel context.CancelFunc,
log *slog.Logger,
keypair solana.PrivateKey,
localDevicePK solana.PublicKey,
serviceabilityProgramID solana.PublicKey,
localNet netutil.LocalNet,
bgpNamespace string,
svcClient telemetrysvc.ProgramDataProvider,
rpcClient *solanarpc.Client,
) <-chan error {
executor := serviceability.NewExecutor(log, rpcClient, &keypair, serviceabilityProgramID)

sub, err := bgpstatus.NewSubmitter(bgpstatus.Config{
Log: log,
Executor: executor,
ServiceabilityClient: svcClient,
LocalNet: localNet,
LocalDevicePK: localDevicePK,
BGPNamespace: bgpNamespace,
Interval: *bgpStatusInterval,
PeriodicRefreshInterval: *bgpStatusRefreshInterval,
DownGracePeriod: *bgpStatusDownGracePeriod,
})
if err != nil {
log.Error("failed to create BGP status submitter", "error", err)
os.Exit(1)
}

log.Info("Starting BGP status submitter",
"interval", *bgpStatusInterval,
"refreshInterval", *bgpStatusRefreshInterval,
"downGracePeriod", *bgpStatusDownGracePeriod,
)
return sub.Start(ctx, cancel)
}

func startStateCollector(ctx context.Context, cancel context.CancelFunc, log *slog.Logger, keypair solana.PrivateKey, localDevicePK solana.PublicKey, bgpNamespace string) <-chan error {
Expand Down
213 changes: 213 additions & 0 deletions controlplane/telemetry/internal/bgpstatus/bgpstatus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package bgpstatus

import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"time"

"github.com/gagliardetto/solana-go"
"github.com/jonboulle/clockwork"
"github.com/malbeclabs/doublezero/controlplane/telemetry/internal/netutil"
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
)

const (
taskChannelCapacity = 256
defaultInterval = 60 * time.Second
defaultRefreshInterval = 6 * time.Hour
submitMaxRetries = 3
submitBaseBackoff = 100 * time.Millisecond
)

// BGPStatusExecutor submits a SetUserBGPStatus instruction onchain.
type BGPStatusExecutor interface {
SetUserBGPStatus(ctx context.Context, u serviceability.UserBGPStatusUpdate) (solana.Signature, error)
}

// ServiceabilityClient fetches the current program state from the ledger.
type ServiceabilityClient interface {
GetProgramData(ctx context.Context) (*serviceability.ProgramData, error)
}

// Config holds all parameters for the BGP status submitter.
type Config struct {
Log *slog.Logger
Executor BGPStatusExecutor
ServiceabilityClient ServiceabilityClient
LocalNet netutil.LocalNet
LocalDevicePK solana.PublicKey
BGPNamespace string
Interval time.Duration // default: 60s
PeriodicRefreshInterval time.Duration // default: 6h
DownGracePeriod time.Duration // default: 0
Clock clockwork.Clock
}

func (c *Config) validate() error {
if c.Log == nil {
return errors.New("log is required")
}
if c.Executor == nil {
return errors.New("executor is required")
}
if c.ServiceabilityClient == nil {
return errors.New("serviceability client is required")
}
if c.LocalNet == nil {
return errors.New("local net is required")
}
if c.LocalDevicePK.IsZero() {
return errors.New("local device pubkey is required")
}
if c.BGPNamespace == "" {
return errors.New("bgp namespace is required")
}
if c.Interval <= 0 {
c.Interval = defaultInterval
}
if c.PeriodicRefreshInterval <= 0 {
c.PeriodicRefreshInterval = defaultRefreshInterval
}
if c.Clock == nil {
c.Clock = clockwork.NewRealClock()
}
return nil
}

// userState tracks submission state for a single user.
type userState struct {
lastOnchainStatus serviceability.BGPStatus
lastWriteTime time.Time
lastUpObservedAt time.Time
}

// submitTask is queued to the background worker for onchain submission.
type submitTask struct {
user serviceability.User
status serviceability.BGPStatus
}

// Submitter collects BGP socket state on each tick, determines per-user BGP
// status, and submits SetUserBGPStatus onchain via a non-blocking worker.
type Submitter struct {
cfg Config
log *slog.Logger
userState map[string]*userState // keyed by user PubKey base58
pending map[string]bool // users currently in-flight in the worker
mu sync.Mutex
taskCh chan submitTask
}

// NewSubmitter creates a Submitter after validating the config.
func NewSubmitter(cfg Config) (*Submitter, error) {
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("invalid bgpstatus config: %w", err)
}
return &Submitter{
cfg: cfg,
log: cfg.Log,
userState: make(map[string]*userState),
pending: make(map[string]bool),
taskCh: make(chan submitTask, taskChannelCapacity),
}, nil
}

// Start launches the submitter in the background and returns a channel that
// receives a fatal error (or is closed on clean shutdown). It mirrors the
// state.Collector.Start pattern.
func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan error {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
if err := s.run(ctx); err != nil {
s.log.Error("bgpstatus: submitter failed", "error", err)
errCh <- err
}
}()
return errCh
}

// userStateFor returns or creates the per-user tracking entry (caller must hold s.mu).
// initialStatus is used only when creating a new entry; it seeds lastOnchainStatus so
// that a restarted submitter correctly handles users whose onchain state is already Up.
func (s *Submitter) userStateFor(key string, initialStatus serviceability.BGPStatus) *userState {
us, ok := s.userState[key]
if !ok {
us = &userState{lastOnchainStatus: initialStatus}
s.userState[key] = us
}
return us
}

// bgpSocket is the minimal BGP socket representation used by the pure helpers.
// The Linux-specific submitter.go converts state.BGPSocketState to this type.
type bgpSocket struct {
RemoteIP string
State string
}

// --- Pure helpers (no Linux syscalls; fully testable on all platforms) ---

// buildEstablishedIPSet returns a set of remote IP strings for BGP sessions
// that are currently in the ESTABLISHED state.
func buildEstablishedIPSet(sockets []bgpSocket) map[string]struct{} {
m := make(map[string]struct{}, len(sockets))
for _, sock := range sockets {
if sock.State == "ESTABLISHED" {
m[sock.RemoteIP] = struct{}{}
}
}
return m
}

// tunnelNetToIPNet parses the onchain [5]byte tunnel-net encoding into a
// *net.IPNet. The format is [4 bytes IPv4 prefix | 1 byte CIDR length].
func tunnelNetToIPNet(b [5]byte) *net.IPNet {
ip := net.IPv4(b[0], b[1], b[2], b[3])
mask := net.CIDRMask(int(b[4]), 32)
return &net.IPNet{IP: ip.To4(), Mask: mask}
}

// computeEffectiveStatus derives the BGP status to report, applying the down
// grace period: if observedUp is false but the user was last seen Up within
// gracePeriod, we still report Up to avoid transient flaps.
func computeEffectiveStatus(
observedUp bool,
us *userState,
now time.Time,
gracePeriod time.Duration,
) serviceability.BGPStatus {
if observedUp {
return serviceability.BGPStatusUp
}
if us.lastUpObservedAt.IsZero() {
return serviceability.BGPStatusDown
}
if gracePeriod > 0 && now.Sub(us.lastUpObservedAt) < gracePeriod {
return serviceability.BGPStatusUp
}
return serviceability.BGPStatusDown
}

// shouldSubmit returns true when a submission is warranted: either the status
// has changed from what was last confirmed onchain, or it is time for a
// periodic keepalive write.
func shouldSubmit(
us *userState,
newStatus serviceability.BGPStatus,
now time.Time,
refreshInterval time.Duration,
) bool {
if us.lastWriteTime.IsZero() {
return true
}
if us.lastOnchainStatus != newStatus {
return true
}
return now.Sub(us.lastWriteTime) >= refreshInterval
}
24 changes: 24 additions & 0 deletions controlplane/telemetry/internal/bgpstatus/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package bgpstatus

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
metricSubmissionsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "doublezero_bgpstatus_submissions_total",
Help: "Total onchain BGP status submissions by BGP status and result",
},
[]string{"bgp_status", "result"},
)

metricSubmissionDuration = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "doublezero_bgpstatus_submission_duration_seconds",
Help: "Duration of successful onchain BGP status submissions",
Buckets: []float64{0.1, 0.5, 1, 2, 5, 10, 30, 60},
},
)
)
Loading
Loading