Skip to content
3 changes: 2 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ FROM go-deps AS go-builder
ARG VERSION
ARG GIT_HASH
ARG GIT_TIME
ARG GO_TAGS

# Copy source code
COPY . .
Expand All @@ -50,7 +51,7 @@ COPY --from=webui-builder /webserver/build ./internal/webserver/build

# Generate protobuf and build the application
RUN go generate ./api/pb/ && \
CGO_ENABLED=0 go build -ldflags "\
CGO_ENABLED=0 go build -tags "${GO_TAGS}" -ldflags "\
-X 'main.commit=${GIT_HASH}' \
-X 'main.date=${GIT_TIME}' \
-X 'main.version=${VERSION}'" ./cmd/pgwatch
Expand Down
187 changes: 187 additions & 0 deletions docs/developer/reaper-batch-consolidation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Reaper Batch Consolidation — Implementation Summary

## Overview

This document summarizes the implementation of the pgwatch reaper goroutine consolidation, which replaces the previous one-goroutine-per-(source × metric) architecture with a one-goroutine-per-source model using pgx Batch queries.

---

## What Changed

### Architecture: Before vs After

| Aspect | Before | After |
|--------|--------|-------|
| Goroutine model | 1 goroutine per (source × metric) | 1 goroutine per source |
| SQL execution | 1 `Query()` call per metric per tick | 1 `SendBatch()` call per source per tick |
| Query protocol | Individual queries, each a network round-trip | pgx pipeline protocol — multiple queries in one round-trip |
| Cancel granularity | Per `source¤¤¤metric` key | Per source name |
| Config hot-reload | Cancel + re-spawn per metric goroutine | `UpdateSchedules()` on existing `SourceReaper` |

### Goroutine Reduction

| Scenario | Before | After | Reduction |
|----------|--------|-------|-----------|
| 10 sources × exhaustive (32 metrics) | 320 | 10 | **97%** |
| 50 sources × exhaustive | 1,600 | 50 | **97%** |
| 1 source × basic (4 metrics) | 4 | 1 | **75%** |

### Network Round-Trip Reduction

With the `exhaustive` preset at 60-second alignment, ~12 SQL metrics are due simultaneously.

| Before | After | Reduction |
|--------|-------|-----------|
| 12 separate `Query()` calls | 1 `SendBatch()` call | **~92%** |

At peak alignment (t = 7200s, all 32 metrics due): **32 → 1 round-trip = 97% reduction**.

---

## Implementation Phases

### Phase 1: Core Infrastructure ✅

- Added `SendBatch(ctx, *pgx.Batch) pgx.BatchResults` to `PgxPoolIface` interface
- Created `SourceReaper` struct with per-source state: metric schedules, tick interval, connection
- Implemented `GCD()` / `GCDSlice()` for computing tick interval from metric intervals
- Implemented `isDue()` check with zero-value = "never fetched" semantics
- Minimum tick interval floor: **5 seconds** (prevents excessive wake-ups for coprime intervals)

### Phase 2: Batch Query Execution ✅

- `executeBatch()`: Builds `pgx.Batch` from due metrics, sends in one round-trip, dispatches results per-metric
- Preserves: instance-level caching, primary/standby filtering, `AddSysinfoToMeasurements`, server restart detection
- `fetchSequentialMetric()`: Fallback for non-Postgres sources (pgbouncer, pgpool) using simple protocol
- `fetchOSMetric()`, `fetchSpecialMetric()`: Handle gopsutil and special metrics inline
- `BatchQueryMeasurements()`: Standalone batch helper with deterministic (sorted) key ordering

### Phase 3: Main Loop Integration ✅

- `Reap()` now spawns `go sr.Run(sourceCtx)` per source instead of per-metric goroutines
- `cancelFuncs` map simplified from `map[string]context.CancelFunc` keyed by `db¤¤¤metric` to keyed by source name
- Added `sourceReapers map[string]*SourceReaper` to `Reaper` struct
- `ShutdownOldWorkers()` simplified — only checks if source was removed from config
- Removed dead `reapMetricMeasurements()` function (was ~100 lines)

### Phase 4: Change Detection Batching ✅

- `GetObjectChangesMeasurement()` now calls `prefetchChangeDetectionData()` to batch-fetch all hash queries (`sproc_hashes`, `table_hashes`, `index_hashes`, `privilege_changes`) in one `pgx.Batch`
- Added `Detect*ChangesWithData()` variants that accept pre-fetched data, falling back to original methods if nil
- Configuration changes (`DetectConfigurationChanges`) remain unbatched — different `Scan()` pattern with typed variables

### Phase 5: Cleanup & Observability ✅

- New Prometheus metrics in `observability.go`:
- `pgwatch_reaper_batch_size` (histogram) — queries per batch
- `pgwatch_reaper_batch_duration_seconds` (histogram) — wall-clock time per batch
- `pgwatch_reaper_metric_fetch_total` (counter, labels: source, status) — success/error counts
- `pgwatch_reaper_active_source_reapers` (gauge) — currently running source reapers
- Batch timeout: 80% of tick interval, prevents slow queries from blocking the next tick

---

## Files Changed

### New Files

| File | Lines | Purpose |
|------|-------|---------|
| `internal/reaper/source_reaper.go` | 494 | SourceReaper struct, GCD, batch execution, Run loop |
| `internal/reaper/source_reaper_test.go` | 471 | pgxmock unit tests (12 test functions, 20+ subtests) |
| `internal/reaper/source_reaper_integration_test.go` | 301 | testcontainers integration tests (6 test functions) |
| `internal/reaper/observability.go` | 38 | Prometheus metrics for batch observability |

### Modified Files

| File | Changes | Purpose |
|------|---------|---------|
| `internal/db/conn.go` | +1 line | Added `SendBatch` to `PgxPoolIface` interface |
| `internal/reaper/reaper.go` | +21 / -186 lines | Per-source goroutines, simplified cancel management |
| `internal/reaper/database.go` | +303 lines | Batched change detection, `prefetchChangeDetectionData` |
| `internal/reaper/reaper_test.go` | +15 / -14 lines | Updated tests for per-source cancel pattern |

**Total: 4 new files (1,304 lines), 4 modified files (+340 / -200 net)**

---

## Test Coverage

### Unit Tests (pgxmock) — 12 test functions, 20+ subtests

| Test | What it verifies |
|------|-----------------|
| `TestGCD` | Euclidean GCD for two integers |
| `TestGCDSlice` | GCD across slices: empty, single, coprime, exhaustive preset (30s) |
| `TestCalcTickInterval` | Tick interval calculation: normal, floor to 5s, single metric, empty |
| `TestIsDue` | Never-fetched (always due), recently fetched (not due), past interval (due) |
| `TestSourceReaper_DueMetrics` | Correct partitioning of due vs not-yet-due metrics |
| `TestNewSourceReaper` | Constructor sets schedules and tick interval |
| `TestUpdateSchedules` | Hot-reload preserves lastFetch for retained metrics, purges removed |
| `TestSourceReaper_ExecuteBatch` | Full batch path: 2 metrics → pgxmock batch → 2 envelopes on channel |
| `TestSourceReaper_RunOneIteration` | Run() loop fires, collects metrics, exits on context cancel |
| `TestSourceReaper_DetectServerRestart` | Detects uptime regression → emits `object_changes` envelope |
| `TestSourceReaper_NonPostgresSequential` | Sequential fallback path for postgres source |
| `TestBatchQueryMeasurements` | Standalone batch (Postgres) and sequential (non-Postgres) paths |

### Integration Tests (testcontainers) — 6 test functions

| Test | What it verifies |
|------|-----------------|
| `TestIntegration_BatchQueryMeasurements` | 4 real SQL queries batched against Postgres 18, all return correct data |
| `TestIntegration_ExecuteBatch` | Full `executeBatch()` path with 2 metric definitions → envelopes arrive |
| `TestIntegration_SourceReaper_RunCollectsMetrics` | `Run()` loop starts, collects 2 metrics within 15s, exits cleanly |
| `TestIntegration_BatchVsSequentialConsistency` | Batch and sequential paths return identical results for same query |
| `TestIntegration_BatchEmptySQL` | Empty/whitespace SQL queries are silently skipped |
| `TestIntegration_BatchMultipleMetricsSameRoundTrip` | 10 queries sent in one batch, all 10 return results |

### Existing Tests — 0 regressions

All 152 test cases in `internal/reaper/` pass, including all pre-existing tests for `DetectSprocChanges`, `DetectTableChanges`, `DetectIndexChanges`, `DetectPrivilegeChanges`, `DetectConfigurationChanges`, `FetchMetric`, `LoadSources`, `LoadMetrics`, log parser tests, and OS metric tests.

---

## Design Decisions

| Decision | Rationale |
|----------|-----------|
| GCD-based tick loop (Option A) | Zero external dependencies, natural fit with `context` cancellation, simple reasoning |
| No external scheduler (gocron) | gocron v2 uses one goroutine per job, defeating the consolidation purpose |
| 5-second minimum tick | Prevents excessive wake-ups for coprime intervals (e.g., GCD(7, 13) = 1) |
| Sequential fallback for non-Postgres | pgbouncer/pgpool use `SimpleProtocol` and don't support pipeline batching |
| `server_log_event_counts` keeps its own goroutine | Streaming CSV parser with long-running I/O, not batchable |
| Batch timeout = 80% of tick interval | Prevents slow queries from blocking the next tick cycle |
| Sorted keys in `BatchQueryMeasurements` | Ensures deterministic batch ordering for testing and debugging |
| `Detect*ChangesWithData()` variants | Allow pre-fetched batch data while preserving original methods as fallbacks |

---

## Expected Production Impact

### Resource Usage

- **Memory**: ~97% reduction in goroutine stack allocations (320 × 8KB default stack → 10 × 8KB)
- **CPU scheduling**: Fewer goroutines means less scheduler overhead and context switching
- **Connection pool**: Batch acquires 1 connection per tick instead of N concurrent acquires; reduces pool contention

### Network

- **Round-trips**: ~92% reduction at 60s alignment for exhaustive preset
- **Latency**: Batch queries benefit from TCP connection reuse and PostgreSQL's pipeline protocol
- **Bandwidth**: Slight reduction from fewer TCP handshakes and acknowledgements

### Observability

- `pgwatch_reaper_batch_size` histogram reveals how many queries are batched per tick
- `pgwatch_reaper_batch_duration_seconds` tracks end-to-end batch latency
- `pgwatch_reaper_metric_fetch_total` with source/status labels enables per-source error rate alerting
- `pgwatch_reaper_active_source_reapers` gauge shows current source count

---

## Future Enhancements

1. **Overflow workers (Option D)**: Offload known-slow metrics (e.g., `table_bloat_approx_summary_sql`) to a separate goroutine if they exceed a time threshold, preventing them from blocking the batch
2. **Adaptive tick interval**: Dynamically adjust tick interval based on observed query latency
3. **Per-metric batch timeout**: Use `SET LOCAL statement_timeout` within the batch for metrics with `StatementTimeoutSeconds` configured
4. **Batch configuration changes**: Batch the `DetectConfigurationChanges` hash queries (currently excluded due to different `Scan()` pattern)
4 changes: 2 additions & 2 deletions docs/reference/metric_definitions.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ only floats can be stored!
extra attributes, if any (see below for options). Hit the "ADD METRIC"
button to store.
1. Activate the newly added metric by including it in some existing
    *Preset Config* in the "PRESETS" page or add it directly to the monitored DB,
together with an interval, into the "METRICS" tab when editing a source on the "SOURCES" page.
*Preset Config* in the "PRESETS" page or add it directly to the monitored DB,
together with a fetching interval - integral number of seconds - into the "METRICS" tab when editing a source on the "SOURCES" page.

### For *YAML* based setups

Expand Down
15 changes: 6 additions & 9 deletions docs/tutorial/custom_installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,27 +354,24 @@ The content of a file is a array of sources definitions, like this:

```yaml
- name: test1 # An arbitrary unique name for the monitored source
kind: postgres # One of the:
kind: postgres # One of:
# - postgres
# - postgres-continuous-discovery
# - pgbouncer
# - pgpool
# - patroni
# - patroni-continuous-discovery
# - patroni-namespace-discover
# Defaults to postgres if not specified
conn_str: postgresql://pgwatch:xyz@somehost/mydb
preset_metrics: exhaustive # from list of presets defined in "metrics.yaml" or in the config DB
custom_metrics: # map of metrics and intervals, if both preset_metrics and custom_metrics are specified, custom wins
backends: 300
custom_metrics: # map of metrics and intervals, if both preset_metrics and custom_metrics are specified, preset wins
backends: 300 # integral number of seconds
archiver: 120
preset_metrics_standby: # optional preset configuration for standby state, same as preset_metrics
custom_metrics_standby: # optional custom metrics for standby state, same as custom_metrics
include_pattern: # regex to filter databases to actually monitor for the "continuous" modes
exclude_pattern:
is_enabled: true
group: default # just for logical grouping of DB hosts or for "sharding", i.e. splitting the workload between many gatherer daemons
custom_tags: # option to add arbitrary tags for every stored data row,
aws_instance_id: i-0af01c0123456789a # for example to fetch data from some other source onto a same Grafana graph
group: default # just for logical grouping of DB hosts or for "sharding", i.e. splitting the workload between many gatherer daemons (via --group option)
custom_tags: # option to add arbitrary tags for every stored data row,
aws_instance_id: i-0af01c0123456789a # for example to fetch data from some other source onto a same Grafana graph
...
```
1 change: 1 addition & 0 deletions internal/db/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type PgxPoolIface interface {
Close()
Config() *pgxpool.Config
Ping(ctx context.Context) error
SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults
Stat() *pgxpool.Stat
}

Expand Down
Loading
Loading