telemetry: submit BGP session status onchain per user#3487
telemetry: submit BGP session status onchain per user#3487juan-malbeclabs wants to merge 8 commits intomainfrom
Conversation
31debd6 to
971082d
Compare
4d1f467 to
c6fd942
Compare
…r instruction Adds the BGPStatus enum (Unknown/Up/Down) with JSON serialization and the SetUserBGPStatus instruction (code 106) to the Go serviceability SDK.
Introduces the bgpstatus package, which reads BGP neighbor state from Arista devices and submits SetUserBGPStatus instructions onchain. Wires it into the telemetry binary with four new flags: --bgp-status-enable, --bgp-status-interval, --bgp-status-refresh-interval, and --bgp-status-down-grace-period.
Adds an end-to-end test that starts a devnet with the BGP status submitter enabled and verifies the onchain state reflects the actual BGP session status. Also extends DeviceTelemetrySpec with the four BGP status flags and updates user list golden files to include the new bgp_status column.
- In the bgpstatus submitter, when FindLocalTunnel returns ErrLocalTunnelNotFound but the last known onchain status is Up, fall through with observedUp=false so the Down transition is submitted. Previously the submitter always continued on tunnel-not-found, so a clean-disconnect scenario (tunnel interface removed) never triggered a Down submission. - Change the e2e disconnect step to kill doublezerod ungracefully (SIGKILL) instead of running "doublezero disconnect": a clean disconnect deletes the user account onchain before the submitter can record Down, whereas killing the daemon drops the BGP session while the user remains activated. - Add volume cleanup to "make e2e-test-cleanup" so persistent ledger volumes from test-keep runs don't carry stale state into subsequent runs.
Introduce a CachingFetcher in controlplane/telemetry/internal/serviceability/ that wraps any ProgramDataProvider and deduplicates RPC calls within a 5s TTL window using sync.RWMutex + singleflight, mirroring the pattern in client/doublezerod/internal/onchain/fetcher.go. Wire the BGP status submitter through the cached client so multiple telemetry components can share a single GetProgramData result per window instead of each issuing independent RPCs.
Add observability for the BGP status pipeline:
CachingFetcher (controlplane/telemetry/internal/serviceability):
- doublezero_telemetry_programdata_fetch_duration_seconds: RPC latency
- doublezero_telemetry_programdata_fetch_total{result}: fetch outcomes
- doublezero_telemetry_programdata_stale_cache_age_seconds: staleness on error
BGP status submitter (controlplane/telemetry/internal/bgpstatus):
- doublezero_bgpstatus_submissions_total{bgp_status,result}: onchain submissions by status and outcome
- doublezero_bgpstatus_submission_duration_seconds: onchain transaction latency
Create a single CachingFetcher instance in main() and pass it to all three consumers — ledger peer discovery, the telemetry collector, and the BGP status submitter — so they share one GetProgramData result per TTL window instead of each issuing independent RPCs. Change Config.ServiceabilityProgramClient from *serviceability.Client to the ServiceabilityProgramClient interface so the cached wrapper can be injected there too.
43ab70b to
cfc6729
Compare
| continue | ||
| } | ||
|
|
||
| userPK := solana.PublicKeyFromBytes(user.PubKey[:]).String() |
There was a problem hiding this comment.
After a telemetry restart, userState is empty so lastOnchainStatus is zero (BGPStatusUnknown). If a user was previously Up onchain but the tunnel has since disappeared, this branch hits continue because Unknown != Up, and the Down transition is never submitted — the user stays Up onchain indefinitely.
Seed lastOnchainStatus from the onchain user.BgpStatus field (already available on the User struct) when creating the userState entry, or hydrate state from program data before the first tick.
|
|
||
| // Slow path: fetch via singleflight so concurrent callers share one RPC. | ||
| v, err, _ := f.group.Do("fetch", func() (any, error) { | ||
| // Re-check cache — another goroutine may have refreshed it while we waited. |
There was a problem hiding this comment.
The singleflight.Do callback captures the first caller's ctx. If that caller's context is cancelled while the RPC is in-flight, the fetch fails for all waiters — even if their contexts are still valid. On a cold cache (no stale data to fall back on), one cancelled context causes all concurrent callers to get an error.
Use context.WithoutCancel(ctx) (or a detached context with a timeout) for the RPC call inside the singleflight callback.
| // Start launches the submitter in the background and returns a channel that | ||
| // receives a fatal error (or is closed on clean shutdown). It mirrors the | ||
| // state.Collector.Start pattern. | ||
| func (s *Submitter) Start(ctx context.Context, cancel context.CancelFunc) <-chan error { |
There was a problem hiding this comment.
This map grows as users are seen but is never pruned. If a user is deactivated or moved to another device, their entry persists forever — a slow memory leak over long uptimes.
Sweep userState at the end of each tick to remove keys not present in the current programData.Users for this device.
Resolves: #3465
Summary of Changes
Diff Breakdown
Mostly new code: a self-contained submitter package, a shared caching layer, and their tests.
Key files (click to expand)
Testing Verification