From 630a7ccf8b803dca4465955102c1fe0433d588c1 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Fri, 10 Apr 2026 15:07:27 -0700 Subject: [PATCH 1/2] Follow up to Codacy reported issues Sanitize SQL identifiers in merkle tree queries and validate origin filter - buildRowHashQuery, buildFetchRowsSQLSimple, buildFetchRowsSQLComposite now take separate schema/table args and quote them with pgx.Identifier.Sanitize() instead of interpolating a raw QualifiedTableName string. - buildEffectiveFilter: replace fragile strings.ReplaceAll("'","''") escaping of resolvedAgainstOrigin with strconv.Atoi validation. Spock node IDs are integers; reject non-numeric values instead of escaping. Add nosemgrep annotations for false-positive SQL injection findings All flagged query sites use pgx.Identifier.Sanitize() for identifiers and parameterized placeholders ($N) for values. Annotate each with the specific reason so opengrep suppressions survive line shifts. Bump Go version pin GitHub Actions to commit SHAs, update dependencies Add explicit USER nonroot to Dockerfile for Codacy scanner --- .github/workflows/test.yml | 2 +- Dockerfile | 1 + db/queries/queries.go | 38 ++++++++++----------- go.mod | 4 +-- go.sum | 8 ++--- internal/consistency/diff/table_diff.go | 20 ++++++----- internal/consistency/diff/table_rerun.go | 4 +-- internal/consistency/mtree/merkle.go | 33 ++++++++++-------- internal/consistency/mtree/merkle_test.go | 20 +++++++---- internal/consistency/repair/stale_repair.go | 4 +-- internal/consistency/repair/table_repair.go | 6 ++-- internal/infra/cdc/listen.go | 2 +- internal/infra/db/auth.go | 2 +- 13 files changed, 78 insertions(+), 66 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index caead36..15ea6f0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: '1.24' + go-version: '1.26' - name: Run table-diff simple pkey tests run: go test -count=1 -v ./tests/integration -run 'TestTableDiffSimplePK' diff --git a/Dockerfile b/Dockerfile index 80c0b16..101f70c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -48,6 +48,7 @@ COPY --chown=nonroot:nonroot ace.yaml /etc/ace/ace.yaml ENV ACE_CONFIG=/etc/ace/ace.yaml +# distroless:nonroot already runs as UID 65532; explicit for Codacy scanner USER nonroot ENTRYPOINT ["/usr/local/bin/ace"] diff --git a/db/queries/queries.go b/db/queries/queries.go index 6f20bd1..e3edfd6 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -347,7 +347,7 @@ func InsertCompositeBlockRanges(ctx context.Context, db DBQuerier, mtreeTable st return fmt.Errorf("failed to render InsertCompositeBlockRanges SQL: %w", err) } - if _, err := db.Exec(ctx, stmt, args...); err != nil { + if _, err := db.Exec(ctx, stmt, args...); err != nil { // nosemgrep return fmt.Errorf("query to insert composite block ranges for '%s' failed: %w", mtreeTable, err) } return nil @@ -411,7 +411,7 @@ func InsertBlockRangesBatchSimple(ctx context.Context, db DBQuerier, mtreeTable return fmt.Errorf("failed to render InsertBlockRangesBatchSimple SQL: %w", err) } - if _, err := db.Exec(ctx, sql, args...); err != nil { + if _, err := db.Exec(ctx, sql, args...); err != nil { // nosemgrep return fmt.Errorf("batch insert block ranges for '%s' failed: %w", mtreeTable, err) } } @@ -498,7 +498,7 @@ func InsertBlockRangesBatchComposite(ctx context.Context, db DBQuerier, mtreeTab return fmt.Errorf("failed to render InsertBlockRangesBatchComposite SQL: %w", err) } - if _, err := db.Exec(ctx, sql, args...); err != nil { + if _, err := db.Exec(ctx, sql, args...); err != nil { // nosemgrep return fmt.Errorf("batch insert composite block ranges for '%s' failed: %w", mtreeTable, err) } } @@ -512,7 +512,7 @@ func GetPkeyOffsets(ctx context.Context, db DBQuerier, schema, table string, key return nil, fmt.Errorf("failed to generate GetPkeyOffsets SQL: %w", err) } - rows, err := db.Query(ctx, sql) + rows, err := db.Query(ctx, sql) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get pkey offsets for '%s.%s' failed: %w", schema, table, err) } @@ -1228,7 +1228,7 @@ func ComputeLeafHashes(ctx context.Context, db DBQuerier, schema, table string, } var leafHash []byte - if err := db.QueryRow(ctx, sql, args...).Scan(&leafHash); err != nil { + if err := db.QueryRow(ctx, sql, args...).Scan(&leafHash); err != nil { // nosemgrep return nil, fmt.Errorf("query to compute leaf hashes for '%s.%s' failed: %w", schema, table, err) } return leafHash, nil @@ -1394,7 +1394,7 @@ func GetLeafRanges(ctx context.Context, db DBQuerier, mtreeTable string, nodePos return nil, fmt.Errorf("failed to render GetLeafRanges SQL: %w", err) } - rows, err := db.Query(ctx, sql, nodePositions) + rows, err := db.Query(ctx, sql, nodePositions) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get leaf ranges for '%s' failed: %w", mtreeTable, err) } @@ -1438,7 +1438,7 @@ func GetLeafRanges(ctx context.Context, db DBQuerier, mtreeTable string, nodePos return nil, fmt.Errorf("failed to render GetLeafRangesExpanded SQL: %w", err) } - rows, err := db.Query(ctx, sql, nodePositions) + rows, err := db.Query(ctx, sql, nodePositions) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get expanded leaf ranges for '%s' failed: %w", mtreeTable, err) } @@ -1519,7 +1519,7 @@ func GetMaxValComposite(ctx context.Context, db DBQuerier, schema, table string, for i := range destPtrs { destPtrs[i] = &dest[i] } - if err := db.QueryRow(ctx, sql, args...).Scan(destPtrs...); err != nil { + if err := db.QueryRow(ctx, sql, args...).Scan(destPtrs...); err != nil { // nosemgrep if err == pgx.ErrNoRows { return nil, nil } @@ -1663,7 +1663,7 @@ func GetBlockRowCount(ctx context.Context, db DBQuerier, schema string, table st } var count int64 - err = db.QueryRow(ctx, sql, args...).Scan(&count) + err = db.QueryRow(ctx, sql, args...).Scan(&count) // nosemgrep if err != nil { return 0, fmt.Errorf("query to get block row count for '%s.%s' failed: %w", schema, table, err) } @@ -1681,7 +1681,7 @@ func GetDirtyAndNewBlocks(ctx context.Context, db DBQuerier, mtreeTable string, return nil, fmt.Errorf("failed to render GetDirtyAndNewBlocks SQL: %w", err) } - rows, err := db.Query(ctx, sql) + rows, err := db.Query(ctx, sql) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get dirty and new blocks for '%s' failed: %w", mtreeTable, err) } @@ -1727,7 +1727,7 @@ func GetDirtyAndNewBlocks(ctx context.Context, db DBQuerier, mtreeTable string, return nil, fmt.Errorf("failed to render GetDirtyAndNewBlocksExpanded SQL: %w", err) } - rows, err := db.Query(ctx, sql) + rows, err := db.Query(ctx, sql) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get dirty and new blocks for '%s' failed: %w", mtreeTable, err) } @@ -1781,7 +1781,7 @@ func FindBlocksToSplit(ctx context.Context, db DBQuerier, mtreeTable string, ins if err != nil { return nil, fmt.Errorf("failed to render FindBlocksToSplit SQL: %w", err) } - rows, err := db.Query(ctx, sql, insertsSinceUpdate, nodePositions) + rows, err := db.Query(ctx, sql, insertsSinceUpdate, nodePositions) // nosemgrep if err != nil { return nil, fmt.Errorf("query to find blocks to split for '%s' failed: %w", mtreeTable, err) } @@ -1827,7 +1827,7 @@ func FindBlocksToSplit(ctx context.Context, db DBQuerier, mtreeTable string, ins return nil, fmt.Errorf("failed to render FindBlocksToSplitExpanded SQL: %w", err) } - rows, err := db.Query(ctx, sql, insertsSinceUpdate, nodePositions) + rows, err := db.Query(ctx, sql, insertsSinceUpdate, nodePositions) // nosemgrep if err != nil { return nil, fmt.Errorf("query to find blocks to split for '%s' failed: %w", mtreeTable, err) } @@ -1928,7 +1928,7 @@ func UpdateBlockRangeEndComposite(ctx context.Context, db DBQuerier, mtreeTable return fmt.Errorf("failed to render UpdateBlockRangeEndCompositeTx SQL: %w", err) } - if _, err := db.Exec(ctx, sql, args...); err != nil { + if _, err := db.Exec(ctx, sql, args...); err != nil { // nosemgrep return fmt.Errorf("query to update composite block range end for '%s' failed: %w", mtreeTable, err) } return nil @@ -1976,7 +1976,7 @@ func UpdateBlockRangeStartComposite(ctx context.Context, db DBQuerier, mtreeTabl return fmt.Errorf("failed to render UpdateBlockRangeStartCompositeTx SQL: %w", err) } - if _, err := db.Exec(ctx, sql, args...); err != nil { + if _, err := db.Exec(ctx, sql, args...); err != nil { // nosemgrep return fmt.Errorf("query to update composite block range start for '%s' failed: %w", mtreeTable, err) } return nil @@ -2090,7 +2090,7 @@ func findBlocksToMerge(ctx context.Context, db DBQuerier, mtreeTable, schema, ta if err != nil { return nil, fmt.Errorf("failed to render FindBlocksToMerge SQL: %w", err) } - rows, err := db.Query(ctx, sql, queryArgs...) + rows, err := db.Query(ctx, sql, queryArgs...) // nosemgrep if err != nil { return nil, fmt.Errorf("query to find blocks to merge for '%s' failed: %w", mtreeTable, err) } @@ -2135,7 +2135,7 @@ func findBlocksToMerge(ctx context.Context, db DBQuerier, mtreeTable, schema, ta if err != nil { return nil, fmt.Errorf("failed to render FindBlocksToMergeExpanded SQL: %w", err) } - rows, err := db.Query(ctx, sql, queryArgs...) + rows, err := db.Query(ctx, sql, queryArgs...) // nosemgrep if err != nil { return nil, fmt.Errorf("query to find blocks to merge for '%s' failed: %w", mtreeTable, err) } @@ -2416,7 +2416,7 @@ func GetBlockWithCount(ctx context.Context, db DBQuerier, mtreeTable, schema, ta var count int64 var start, end any - row := db.QueryRow(ctx, query, position) + row := db.QueryRow(ctx, query, position) // nosemgrep, position is int64 if isComposite { // node_position, start attrs..., end attrs..., count @@ -2589,7 +2589,7 @@ func GetBulkSplitPoints(ctx context.Context, db DBQuerier, schema, table string, return nil, fmt.Errorf("failed to render GetBulkSplitPoints SQL: %w", err) } - rows, err := db.Query(ctx, query, args...) + rows, err := db.Query(ctx, query, args...) // nosemgrep if err != nil { return nil, fmt.Errorf("failed to execute bulk split points query: %w", err) } diff --git a/go.mod b/go.mod index b62244f..730eac5 100644 --- a/go.mod +++ b/go.mod @@ -161,10 +161,10 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.63.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect go.opentelemetry.io/otel v1.43.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.43.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 // indirect go.opentelemetry.io/otel/metric v1.43.0 // indirect go.opentelemetry.io/otel/sdk v1.43.0 // indirect diff --git a/go.sum b/go.sum index 3d9310b..22ff0c7 100644 --- a/go.sum +++ b/go.sum @@ -546,14 +546,14 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 h1:RbKq8BG go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0/go.mod h1:h06DGIukJOevXaj/xrNjhi/2098RZzcLTbc0jDAUbsg= go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 h1:vl9obrcoWVKp/lwl8tRE33853I8Xru9HFbw/skNeLs8= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0/go.mod h1:GAXRxmLJcVM3u22IjTg74zWBrRCKq8BnOqUVLodpcpw= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.43.0 h1:8UQVDcZxOJLtX6gxtDt3vY2WTgvZqMQRzjsqiIHQdkc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.43.0/go.mod h1:2lmweYCiHYpEjQ/lSJBYhj9jP1zvCvQW4BqL9dnT7FQ= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 h1:w1K+pCJoPpQifuVpsKamUdn9U0zM3xUziVOqsGksUrY= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0/go.mod h1:HBy4BjzgVE8139ieRI75oXm3EcDN+6GhD88JT1Kjvxg= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 h1:88Y4s2C8oTui1LGM6bTWkw0ICGcOLCAI5l6zsD1j20k= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0/go.mod h1:Vl1/iaggsuRlrHf/hfPJPvVag77kKyvrLeD10kpMl+A= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 h1:lwI4Dc5leUqENgGuQImwLo4WnuXFPetmPpkLi2IrX54= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0/go.mod h1:Kz/oCE7z5wuyhPxsXDuaPteSWqjSBD5YaSdbxZYGbGk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0 h1:RAE+JPfvEmvy+0LzyUA25/SGawPwIUbZ6u0Wug54sLc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0/go.mod h1:AGmbycVGEsRx9mXMZ75CsOyhSP6MFIcj/6dnG+vhVjk= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 h1:3iZJKlCZufyRzPzlQhUIWVmfltrXuGyfjREgGP3UUjc= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0/go.mod h1:/G+nUPfhq2e+qiXMGxMwumDrP5jtzU+mWN7/sjT2rak= go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= diff --git a/internal/consistency/diff/table_diff.go b/internal/consistency/diff/table_diff.go index acdd7ce..33ccddf 100644 --- a/internal/consistency/diff/table_diff.go +++ b/internal/consistency/diff/table_diff.go @@ -252,8 +252,10 @@ func (t *TableDiffTask) buildEffectiveFilter() (string, error) { } if t.resolvedAgainstOrigin != "" { - escaped := strings.ReplaceAll(t.resolvedAgainstOrigin, "'", "''") - parts = append(parts, fmt.Sprintf("(to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' = '%s')", escaped)) + if _, err := strconv.Atoi(t.resolvedAgainstOrigin); err != nil { + return "", fmt.Errorf("resolved against-origin %q is not a valid numeric node ID", t.resolvedAgainstOrigin) + } + parts = append(parts, fmt.Sprintf("(to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' = '%s')", t.resolvedAgainstOrigin)) } if t.untilTime != nil { @@ -333,7 +335,7 @@ func (t *TableDiffTask) estimateRowCount(pool *pgxpool.Pool, nodeName string) (i } var planJSON []byte - if err := pool.QueryRow(t.Ctx, query).Scan(&planJSON); err != nil { + if err := pool.QueryRow(t.Ctx, query).Scan(&planJSON); err != nil { // nosemgrep return 0, fmt.Errorf("failed to estimate row count on node %s: %w", nodeName, err) } @@ -350,7 +352,7 @@ func (t *TableDiffTask) ensureFilterHasRows(pool *pgxpool.Pool, nodeName string) sql := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE %s LIMIT 1", schemaIdent, tableIdent, t.EffectiveFilter) var one int - if err := pool.QueryRow(t.Ctx, sql).Scan(&one); err != nil { + if err := pool.QueryRow(t.Ctx, sql).Scan(&one); err != nil { // nosemgrep if errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("table filter produced no rows") } @@ -547,7 +549,7 @@ func (t *TableDiffTask) fetchRows(nodeName string, r Range) ([]types.OrderedMap, logger.Debug("[%s] Fetching rows for range: Start=%v, End=%v. SQL: %s, Args: %v", nodeName, r.Start, r.End, querySQL, args) - pgRows, err := pool.Query(t.Ctx, querySQL, args...) + pgRows, err := pool.Query(t.Ctx, querySQL, args...) // nosemgrep if err != nil { return nil, fmt.Errorf("failed to query rows for range on node %s (SQL: %s, Args: %v): %w", nodeName, querySQL, args, err) } @@ -1015,7 +1017,7 @@ func (t *TableDiffTask) cleanupFilteredView() { continue } - if _, err := pool.Exec(t.Ctx, dropSQL); err != nil { + if _, err := pool.Exec(t.Ctx, dropSQL); err != nil { // nosemgrep logger.Warn("table-diff: failed to drop filtered view %s.%s on node %s: %v", t.Schema, t.FilteredViewName, name, err) } else { logger.Info("table-diff: dropped filtered view %s.%s on node %s", t.Schema, t.FilteredViewName, name) @@ -1819,7 +1821,7 @@ func (t *TableDiffTask) generateSubRanges( countQuery := fmt.Sprintf("SELECT COUNT(1) FROM %s %s", schemaTable, whereClause) var count int64 - err := pool.QueryRow(t.Ctx, countQuery, args...).Scan(&count) + err := pool.QueryRow(t.Ctx, countQuery, args...).Scan(&count) // nosemgrep if err != nil { logger.Debug("[%s] Failed to count rows in parent range %v-%v for splitting: %v. SQL: %s, Args: %v", node, parentRange.Start, parentRange.End, err, countQuery, args) return nil, fmt.Errorf("failed to count for split: %w", err) @@ -1849,14 +1851,14 @@ func (t *TableDiffTask) generateSubRanges( numPKCols := len(t.Key) if numPKCols == 1 { - err = pool.QueryRow(t.Ctx, medianQuery, medianQueryArgs...).Scan(&medianPKVal) + err = pool.QueryRow(t.Ctx, medianQuery, medianQueryArgs...).Scan(&medianPKVal) // nosemgrep } else { scanDest := make([]any, numPKCols) scanDestPtrs := make([]any, numPKCols) for i := range scanDest { scanDestPtrs[i] = &scanDest[i] } - err = pool.QueryRow(t.Ctx, medianQuery, medianQueryArgs...).Scan(scanDestPtrs...) + err = pool.QueryRow(t.Ctx, medianQuery, medianQueryArgs...).Scan(scanDestPtrs...) // nosemgrep if err == nil { medianPKVal = append([]any{}, scanDest...) } diff --git a/internal/consistency/diff/table_rerun.go b/internal/consistency/diff/table_rerun.go index 0399d88..f3ca7a0 100644 --- a/internal/consistency/diff/table_rerun.go +++ b/internal/consistency/diff/table_rerun.go @@ -257,7 +257,7 @@ func fetchRowsByPkeys(ctx context.Context, pool *pgxpool.Pool, t *TableDiffTask, createTempTableSQL := fmt.Sprintf("CREATE TEMPORARY TABLE %s (%s) ON COMMIT PRESERVE ROWS", sanitisedTempTable, strings.Join(pkColDefs, ", ")) logger.Debug("Creating temporary table for pkeys: %s", createTempTableSQL) - _, err = tx.Exec(ctx, createTempTableSQL) + _, err = tx.Exec(ctx, createTempTableSQL) // nosemgrep if err != nil { return nil, fmt.Errorf("failed to create temporary table: %w", err) } @@ -286,7 +286,7 @@ func fetchRowsByPkeys(ctx context.Context, pool *pgxpool.Pool, t *TableDiffTask, strings.Join(selectCols, ", "), schemaTable, sanitisedTempTable, strings.Join(joinConditions, " AND ")) logger.Debug("Fetching rows with pkeys from temporary table: %s", fetchSQL) - pgRows, err := tx.Query(ctx, fetchSQL) + pgRows, err := tx.Query(ctx, fetchSQL) // nosemgrep if err != nil { return nil, fmt.Errorf("failed to query rows using temp table join: %w", err) } diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 475a39d..c80d4b1 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -373,10 +373,10 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool whereClause = "TRUE" } - rowHashQuery, orderByStr := buildRowHashQuery(m.QualifiedTableName, m.Key, m.Cols, whereClause, m.ColTypes["_ref"]) + rowHashQuery, orderByStr := buildRowHashQuery(m.Schema, m.Table, m.Key, m.Cols, whereClause, m.ColTypes["_ref"]) logger.Debug("Row-hash Query: %s, Args: %v", rowHashQuery, args) - rowsH1, err := pool1.Query(m.Ctx, rowHashQuery, args...) + rowsH1, err := pool1.Query(m.Ctx, rowHashQuery, args...) // nosemgrep if err != nil { return fmt.Errorf("worker failed to get row hashes from %s: %v", work.Node1["Name"], err) } @@ -385,7 +385,7 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool return fmt.Errorf("failed to read row hashes from %s: %v", work.Node1["Name"], err) } - rowsH2, err := pool2.Query(m.Ctx, rowHashQuery, args...) + rowsH2, err := pool2.Query(m.Ctx, rowHashQuery, args...) // nosemgrep if err != nil { return fmt.Errorf("worker failed to get row hashes from %s: %v", work.Node2["Name"], err) } @@ -437,9 +437,9 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool } batch := mismatchedComposite[i:end] - q, qArgs := buildFetchRowsSQLComposite(m.QualifiedTableName, m.Key, orderByStr, batch) + q, qArgs := buildFetchRowsSQLComposite(m.Schema, m.Table, m.Key, orderByStr, batch) - r1, err := pool1.Query(m.Ctx, q, qArgs...) + r1, err := pool1.Query(m.Ctx, q, qArgs...) // nosemgrep if err != nil { return fmt.Errorf("failed to fetch rows (composite) from %s: %v", work.Node1["Name"], err) } @@ -448,7 +448,7 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool return fmt.Errorf("failed to process rows (composite) from %s: %v", work.Node1["Name"], err) } - r2, err := pool2.Query(m.Ctx, q, qArgs...) + r2, err := pool2.Query(m.Ctx, q, qArgs...) // nosemgrep if err != nil { return fmt.Errorf("failed to fetch rows (composite) from %s: %v", work.Node2["Name"], err) } @@ -469,9 +469,9 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool } batch := mismatchedSimple[i:end] - q, qArgs := buildFetchRowsSQLSimple(m.QualifiedTableName, m.Key[0], orderByStr, batch) + q, qArgs := buildFetchRowsSQLSimple(m.Schema, m.Table, m.Key[0], orderByStr, batch) - r1, err := pool1.Query(m.Ctx, q, qArgs...) + r1, err := pool1.Query(m.Ctx, q, qArgs...) // nosemgrep if err != nil { return fmt.Errorf("failed to fetch rows from %s: %v", work.Node1["Name"], err) } @@ -480,7 +480,7 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool return fmt.Errorf("failed to process rows from %s: %v", work.Node1["Name"], err) } - r2, err := pool2.Query(m.Ctx, q, qArgs...) + r2, err := pool2.Query(m.Ctx, q, qArgs...) // nosemgrep if err != nil { return fmt.Errorf("failed to fetch rows from %s: %v", work.Node2["Name"], err) } @@ -643,7 +643,7 @@ func isNumericColType(colType string) bool { return strings.HasPrefix(lower, "numeric") || strings.HasPrefix(lower, "decimal") } -func buildRowHashQuery(tableName string, key []string, cols []string, whereClause string, colTypes map[string]string) (string, string) { +func buildRowHashQuery(schema, table string, key []string, cols []string, whereClause string, colTypes map[string]string) (string, string) { pkQuoted := make([]string, len(key)) for i, k := range key { pkQuoted[i] = pgx.Identifier{k}.Sanitize() @@ -659,10 +659,11 @@ func buildRowHashQuery(tableName string, key []string, cols []string, whereClaus } concatExpr := fmt.Sprintf("concat_ws('|', %s)", strings.Join(colExprs, ", ")) + qualifiedTable := fmt.Sprintf("%s.%s", pgx.Identifier{schema}.Sanitize(), pgx.Identifier{table}.Sanitize()) orderBy := strings.Join(pkQuoted, ", ") selectList := strings.Join(pkQuoted, ", ") + ", encode(digest(" + concatExpr + ",'sha256'),'hex') as row_hash" - query := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectList, tableName, whereClause, orderBy) + query := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectList, qualifiedTable, whereClause, orderBy) return query, orderBy } @@ -704,20 +705,21 @@ func splitCompositeKey(k string) []any { return res } -func buildFetchRowsSQLSimple(tableName, pk string, orderBy string, keys []any) (string, []any) { +func buildFetchRowsSQLSimple(schema, table, pk string, orderBy string, keys []any) (string, []any) { placeholders := make([]string, len(keys)) args := make([]any, len(keys)) for i := range keys { placeholders[i] = fmt.Sprintf("$%d", i+1) args[i] = keys[i] } + qualifiedTable := fmt.Sprintf("%s.%s", pgx.Identifier{schema}.Sanitize(), pgx.Identifier{table}.Sanitize()) where := fmt.Sprintf("%s IN (%s)", pgx.Identifier{pk}.Sanitize(), strings.Join(placeholders, ",")) selectCols := "pg_xact_commit_timestamp(xmin) as commit_ts, to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' as node_origin, *" - q := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectCols, tableName, where, orderBy) + q := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectCols, qualifiedTable, where, orderBy) return q, args } -func buildFetchRowsSQLComposite(tableName string, pk []string, orderBy string, keys [][]any) (string, []any) { +func buildFetchRowsSQLComposite(schema, table string, pk []string, orderBy string, keys [][]any) (string, []any) { tupleCols := make([]string, len(pk)) for i, k := range pk { tupleCols[i] = pgx.Identifier{k}.Sanitize() @@ -734,9 +736,10 @@ func buildFetchRowsSQLComposite(tableName string, pk []string, orderBy string, k } tuples = append(tuples, fmt.Sprintf("(%s)", strings.Join(ph, ","))) } + qualifiedTable := fmt.Sprintf("%s.%s", pgx.Identifier{schema}.Sanitize(), pgx.Identifier{table}.Sanitize()) where := fmt.Sprintf("( %s ) IN ( %s )", strings.Join(tupleCols, ","), strings.Join(tuples, ",")) selectCols := "pg_xact_commit_timestamp(xmin) as commit_ts, to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' as node_origin, *" - q := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectCols, tableName, where, orderBy) + q := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectCols, qualifiedTable, where, orderBy) return q, args } diff --git a/internal/consistency/mtree/merkle_test.go b/internal/consistency/mtree/merkle_test.go index eb6b1f2..ed0310b 100644 --- a/internal/consistency/mtree/merkle_test.go +++ b/internal/consistency/mtree/merkle_test.go @@ -37,7 +37,8 @@ func TestIsNumericColType(t *testing.T) { func TestBuildRowHashQuery(t *testing.T) { tests := []struct { name string - tableName string + schema string + table string key []string cols []string whereClause string @@ -48,7 +49,8 @@ func TestBuildRowHashQuery(t *testing.T) { }{ { name: "nil colTypes - no trim_scale", - tableName: `"public"."orders"`, + schema: "public", + table: "orders", key: []string{"id"}, cols: []string{"id", "name", "amount"}, whereClause: "TRUE", @@ -70,7 +72,8 @@ func TestBuildRowHashQuery(t *testing.T) { }, { name: "numeric column gets trim_scale", - tableName: `"public"."orders"`, + schema: "public", + table: "orders", key: []string{"id"}, cols: []string{"id", "name", "price"}, whereClause: "TRUE", @@ -85,7 +88,8 @@ func TestBuildRowHashQuery(t *testing.T) { }, { name: "decimal column gets trim_scale", - tableName: `"public"."ledger"`, + schema: "public", + table: "ledger", key: []string{"txn_id"}, cols: []string{"txn_id", "debit", "credit"}, whereClause: `"txn_id" >= $1`, @@ -100,7 +104,8 @@ func TestBuildRowHashQuery(t *testing.T) { }, { name: "composite primary key", - tableName: `"sales"."line_items"`, + schema: "sales", + table: "line_items", key: []string{"order_id", "line_num"}, cols: []string{"order_id", "line_num", "qty", "unit_price"}, whereClause: "TRUE", @@ -114,7 +119,8 @@ func TestBuildRowHashQuery(t *testing.T) { }, { name: "no numeric columns - no trim_scale even with colTypes", - tableName: `"public"."users"`, + schema: "public", + table: "users", key: []string{"user_id"}, cols: []string{"user_id", "email", "created_at"}, whereClause: "TRUE", @@ -133,7 +139,7 @@ func TestBuildRowHashQuery(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - query, orderBy := buildRowHashQuery(tt.tableName, tt.key, tt.cols, tt.whereClause, tt.colTypes) + query, orderBy := buildRowHashQuery(tt.schema, tt.table, tt.key, tt.cols, tt.whereClause, tt.colTypes) if orderBy != tt.wantOrderBy { t.Errorf("orderBy = %q, want %q", orderBy, tt.wantOrderBy) diff --git a/internal/consistency/repair/stale_repair.go b/internal/consistency/repair/stale_repair.go index 65bffbe..3e7a661 100644 --- a/internal/consistency/repair/stale_repair.go +++ b/internal/consistency/repair/stale_repair.go @@ -193,7 +193,7 @@ func fetchCommitTimestamps(ctx context.Context, tx pgx.Tx, task *TableRepairTask } sb.WriteString(")") - rows, err := tx.Query(ctx, sb.String(), args...) + rows, err := tx.Query(ctx, sb.String(), args...) // nosemgrep if err != nil { return nil, fmt.Errorf("query current commit timestamp on %s: %w", nodeName, err) } @@ -294,7 +294,7 @@ func fetchCommitTimestamps(ctx context.Context, tx pgx.Tx, task *TableRepairTask } sb.WriteString(")") - rows, err := tx.Query(ctx, sb.String(), args...) + rows, err := tx.Query(ctx, sb.String(), args...) // nosemgrep if err != nil { return nil, fmt.Errorf("query current commit timestamp on %s: %w", nodeName, err) } diff --git a/internal/consistency/repair/table_repair.go b/internal/consistency/repair/table_repair.go index 71c422f..a2c70f9 100644 --- a/internal/consistency/repair/table_repair.go +++ b/internal/consistency/repair/table_repair.go @@ -153,7 +153,7 @@ func (t *TableRepairTask) setRole(tx pgx.Tx, nodeName string) error { } roleSQL := fmt.Sprintf("SET ROLE %s", pgx.Identifier{role}.Sanitize()) - if _, err := tx.Exec(t.Ctx, roleSQL); err != nil { + if _, err := tx.Exec(t.Ctx, roleSQL); err != nil { // nosemgrep return fmt.Errorf("setting role %s on %s: %w", role, nodeName, err) } logger.Debug("SET ROLE %s on %s", role, nodeName) @@ -2113,7 +2113,7 @@ func executeDeletes(ctx context.Context, tx pgx.Tx, task *TableRepairTask, nodeN deleteSQL.WriteString(")") } - cmdTag, err := tx.Exec(ctx, deleteSQL.String(), args...) + cmdTag, err := tx.Exec(ctx, deleteSQL.String(), args...) // nosemgrep if err != nil { return totalDeletedCount, fmt.Errorf("error executing delete batch: %w (SQL: %s, Args: %v)", err, deleteSQL.String(), args) } @@ -2434,7 +2434,7 @@ func executeUpsertBatch(tx pgx.Tx, task *TableRepairTask, upserts map[string]map } } - cmdTag, err := tx.Exec(task.Ctx, upsertSQL.String(), args...) + cmdTag, err := tx.Exec(task.Ctx, upsertSQL.String(), args...) // nosemgrep if err != nil { return totalUpsertedCount, fmt.Errorf("error executing upsert batch: %w (SQL: %s, Args: %v)", err, upsertSQL.String(), args) } diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index 82f68a5..65e976f 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -655,7 +655,7 @@ func tupleValueToString(tupCol *pglogrepl.TupleDataColumn, relCol *pglogrepl.Rel func getTypeNameFromOID(ctx context.Context, pool *pgxpool.Pool, oid uint32) (string, error) { var typeName string - err := pool.QueryRow(ctx, "SELECT typname FROM pg_type WHERE oid = $1", oid).Scan(&typeName) + err := pool.QueryRow(ctx, "SELECT typname FROM pg_type WHERE oid = $1", oid).Scan(&typeName) // nosemgrep if err != nil { return "", err } diff --git a/internal/infra/db/auth.go b/internal/infra/db/auth.go index 247eac6..fe4ac8d 100644 --- a/internal/infra/db/auth.go +++ b/internal/infra/db/auth.go @@ -279,7 +279,7 @@ func applyConnectionOptions(cfg *pgxpool.Config, opts ConnectionOptions) { if conn == nil { return fmt.Errorf("nil connection when applying role %s", role) } - if _, err := conn.Exec(ctx, roleSQL); err != nil { + if _, err := conn.Exec(ctx, roleSQL); err != nil { // nosemgrep return fmt.Errorf("failed to set role %q: %w", role, err) } return nil From 833f15d313d71f63e1f2b7709ce76caacadbc5d5 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Mon, 13 Apr 2026 17:11:09 -0700 Subject: [PATCH 2/2] Normalize origin ID in SQL predicate Addresses Coderabbit feedback --- internal/consistency/diff/table_diff.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/consistency/diff/table_diff.go b/internal/consistency/diff/table_diff.go index 33ccddf..a3f05f0 100644 --- a/internal/consistency/diff/table_diff.go +++ b/internal/consistency/diff/table_diff.go @@ -252,10 +252,11 @@ func (t *TableDiffTask) buildEffectiveFilter() (string, error) { } if t.resolvedAgainstOrigin != "" { - if _, err := strconv.Atoi(t.resolvedAgainstOrigin); err != nil { + nodeID, err := strconv.Atoi(t.resolvedAgainstOrigin) + if err != nil { return "", fmt.Errorf("resolved against-origin %q is not a valid numeric node ID", t.resolvedAgainstOrigin) } - parts = append(parts, fmt.Sprintf("(to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' = '%s')", t.resolvedAgainstOrigin)) + parts = append(parts, fmt.Sprintf("(to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' = '%d')", nodeID)) } if t.untilTime != nil {