WIP: reaper goroutine consolidation via pgx.Batch queries#1316
WIP: reaper goroutine consolidation via pgx.Batch queries#1316pashagolub wants to merge 11 commits intomasterfrom
pgx.Batch queries#1316Conversation
Coverage Report for CI Build 24205226414Coverage increased (+1.1%) to 84.471%Details
Uncovered Changes
Coverage Regressions5 previously-covered lines in 1 file lost coverage.
Coverage Stats
💛 - Coveralls |
| func (sr *SourceReaper) activeMetrics() map[string]time.Duration { | ||
| sr.md.RLock() | ||
| defer sr.md.RUnlock() | ||
| src := sr.md.Metrics | ||
| if sr.md.IsInRecovery && len(sr.md.MetricsStandby) > 0 { | ||
| src = sr.md.MetricsStandby | ||
| } |
There was a problem hiding this comment.
The usage of RLock() is confusing me, not that we shouldn't be using it, but are we using Lock() and RLock() in other places that update/read from md.Metrics? From what I can see we don't.
There was a problem hiding this comment.
-
Fixed this specific case for
md.Metricsandmd.MetricsStandbyin d26c03d, so now theLoadMetrics()will always grabmd.Lock()before updating these structs, and given that the source reaper always usesmd.RLock()before accessing them, we are safe. -
An important note: there are other places in the code that read from
md.Metricsandmd.MetricsStandby, but all of them execute in the main reaper routine - which is the only routine that issues a write inLoadMetrics()as stated above - so I saw that there is no need to grab additionalmd.RLock()[s] for them, as the sequential order guarantees safety. -
Also, I noticed that the source
RWMutexis rarely used to protect fields on write to them, I doubt if we do have race conditions because of that. This needs careful investigation in a separate issue.
@pashagolub I need your review on this. Specifically, what do you think of point 2 in terms of maintainability? Should we explicitly use md.RLock() so everything is extra clear and hence avoid introducing buggy changes in future updates/PRs? Or document that in a comment that states that the user should add RLock() if the method is to be accessed from another routine or when another writer gets introduced?
| switch { | ||
| case name == specialMetricServerLogEventCounts: | ||
| if sr.lastFetch[name].IsZero() { | ||
| go func() { | ||
| if e := sr.runLogParser(ctx); e != nil { | ||
| l.WithError(e).Error("log parser error") | ||
| } | ||
| }() | ||
| } | ||
| case IsDirectlyFetchableMetric(sr.md, name): | ||
| err = sr.fetchOSMetric(ctx, name) | ||
| case name == specialMetricChangeEvents || name == specialMetricInstanceUp: | ||
| err = sr.fetchSpecialMetric(ctx, name) | ||
| default: | ||
| metric, ok := metricDefs.GetMetricDef(name) | ||
| if !ok { | ||
| l.WithField("metric", name).Warning("metric definition not found") | ||
| continue | ||
| } | ||
| if sr.isRoleExcluded(metric) { | ||
| continue | ||
| } | ||
| if cached := sr.reaper.GetMeasurementCache(sr.cacheKey(metric, name)); len(cached) > 0 { | ||
| sr.sendEnvelope(name, metric.StorageName, cached) | ||
| break | ||
| } | ||
| sql := metric.GetSQL(sr.md.Version) | ||
| if sql == "" { | ||
| l.WithField("source", sr.md.Name).WithField("version", sr.md.Version).Warning("no SQL found for metric version") | ||
| break | ||
| } | ||
| batch = append(batch, batchEntry{name: name, metric: metric, sql: sql}) | ||
| } | ||
| if err != nil { | ||
| l.WithError(err).WithField("metric", name).Error("failed to fetch metric") | ||
| } | ||
| sr.lastFetch[name] = now | ||
| } |
There was a problem hiding this comment.
There is an inconsistency here, for the special metrics the lastFetch map is updated after the metric fetch but for regular metrics its updated directly after batching the query.
The old approach for doing this was that for all metrics the sleep interval is started (corresponding to updating lastFetch here) after the metric is fetched to include the metric query processing time.
4256426 to
2d2b750
Compare
…l metrics Move the `isRoleExcluded` check earlier in the metric fetch loop to apply consistently to all metric types, including special metrics like `instance_up` and `change_events`. Previously, special metrics weren't excluded if they have a required `node_status` different than the server's
Co-authored-by: Mazen Kamal <71020170+Mazen050@users.noreply.github.com>
038a773 to
d26c03d
Compare
The improvements stem from consolidating N per-metric goroutines into a single per-source
SourceReaperthat uses a GCD-based (Greatest Common Divisor) tick loop andpgx.Batchfor batched SQL execution, dramatically reducing goroutine overhead, connection contention, scheduling pressure, and memory allocation churn.Performance Comparison: Batch Metric Fetching vs Master
Test Environment
debugpreset — 55+ metrics at 30s intervals)pprof(enables/debug/pprof/on port 6060)Branches Tested
master393b9529abatch_metric_fetching3d70c78a4pgx.BatchArchitecture Comparison
pgx.Batchtime.Afterin each goroutineGoroutine Analysis (via
pprof /debug/pprof/goroutine)Total Goroutine Count
Goroutine Breakdown by Component
reapMetricMeasurementsSourceReaper.Runpgxpool.backgroundHealthChecklog.BrokerHook.pollsinks.PostgresWriterreaper.Reap(main loop)reaper.WriteMeasurementswebserver.Initnet/httpsignalThe batch implementation eliminates 414 per-metric goroutines, replacing them with just 6 per-source goroutines. All infrastructure goroutines (pool health checks, sinks, logging, web server) remain identical between both implementations.
Stack Memory
The 4× reduction in stack memory directly reflects the goroutine count difference — each goroutine requires its own stack.
Memory Statistics (Go runtime
MemStatsvia logs)Heap Allocation (
Alloc— live objects on the heap)Cumulative Allocations (
TotalAlloc— total bytes allocated over time)System Memory (
Sys— total memory obtained from OS)Heap Profile Analysis (via
pprof /debug/pprof/heap)Docker Container Stats (at steady state)
Allocation Rate Analysis
Calculated from
TotalAllocdeltas between measurements (each 2-minute window):At steady state (after initial warmup), the batch implementation allocates approximately 8–9× less memory per minute.
GC Pressure Analysis
While GC cycle counts are similar, the CPU fraction spent on GC is ~3× lower for batch because each cycle processes far fewer live objects (45K vs 510K).
Summary
The
batch_metric_fetchingbranch delivers significant resource improvements across all dimensions: