diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index caead36..15ea6f0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: '1.24' + go-version: '1.26' - name: Run table-diff simple pkey tests run: go test -count=1 -v ./tests/integration -run 'TestTableDiffSimplePK' diff --git a/Dockerfile b/Dockerfile index 80c0b16..101f70c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -48,6 +48,7 @@ COPY --chown=nonroot:nonroot ace.yaml /etc/ace/ace.yaml ENV ACE_CONFIG=/etc/ace/ace.yaml +# distroless:nonroot already runs as UID 65532; explicit for Codacy scanner USER nonroot ENTRYPOINT ["/usr/local/bin/ace"] diff --git a/db/queries/queries.go b/db/queries/queries.go index 6f20bd1..e3edfd6 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -347,7 +347,7 @@ func InsertCompositeBlockRanges(ctx context.Context, db DBQuerier, mtreeTable st return fmt.Errorf("failed to render InsertCompositeBlockRanges SQL: %w", err) } - if _, err := db.Exec(ctx, stmt, args...); err != nil { + if _, err := db.Exec(ctx, stmt, args...); err != nil { // nosemgrep return fmt.Errorf("query to insert composite block ranges for '%s' failed: %w", mtreeTable, err) } return nil @@ -411,7 +411,7 @@ func InsertBlockRangesBatchSimple(ctx context.Context, db DBQuerier, mtreeTable return fmt.Errorf("failed to render InsertBlockRangesBatchSimple SQL: %w", err) } - if _, err := db.Exec(ctx, sql, args...); err != nil { + if _, err := db.Exec(ctx, sql, args...); err != nil { // nosemgrep return fmt.Errorf("batch insert block ranges for '%s' failed: %w", mtreeTable, err) } } @@ -498,7 +498,7 @@ func InsertBlockRangesBatchComposite(ctx context.Context, db DBQuerier, mtreeTab return fmt.Errorf("failed to render InsertBlockRangesBatchComposite SQL: %w", err) } - if _, err := db.Exec(ctx, sql, args...); err != nil { + if _, err := db.Exec(ctx, sql, args...); err != nil { // nosemgrep return fmt.Errorf("batch insert composite block ranges for '%s' failed: %w", mtreeTable, err) } } @@ -512,7 +512,7 @@ func GetPkeyOffsets(ctx context.Context, db DBQuerier, schema, table string, key return nil, fmt.Errorf("failed to generate GetPkeyOffsets SQL: %w", err) } - rows, err := db.Query(ctx, sql) + rows, err := db.Query(ctx, sql) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get pkey offsets for '%s.%s' failed: %w", schema, table, err) } @@ -1228,7 +1228,7 @@ func ComputeLeafHashes(ctx context.Context, db DBQuerier, schema, table string, } var leafHash []byte - if err := db.QueryRow(ctx, sql, args...).Scan(&leafHash); err != nil { + if err := db.QueryRow(ctx, sql, args...).Scan(&leafHash); err != nil { // nosemgrep return nil, fmt.Errorf("query to compute leaf hashes for '%s.%s' failed: %w", schema, table, err) } return leafHash, nil @@ -1394,7 +1394,7 @@ func GetLeafRanges(ctx context.Context, db DBQuerier, mtreeTable string, nodePos return nil, fmt.Errorf("failed to render GetLeafRanges SQL: %w", err) } - rows, err := db.Query(ctx, sql, nodePositions) + rows, err := db.Query(ctx, sql, nodePositions) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get leaf ranges for '%s' failed: %w", mtreeTable, err) } @@ -1438,7 +1438,7 @@ func GetLeafRanges(ctx context.Context, db DBQuerier, mtreeTable string, nodePos return nil, fmt.Errorf("failed to render GetLeafRangesExpanded SQL: %w", err) } - rows, err := db.Query(ctx, sql, nodePositions) + rows, err := db.Query(ctx, sql, nodePositions) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get expanded leaf ranges for '%s' failed: %w", mtreeTable, err) } @@ -1519,7 +1519,7 @@ func GetMaxValComposite(ctx context.Context, db DBQuerier, schema, table string, for i := range destPtrs { destPtrs[i] = &dest[i] } - if err := db.QueryRow(ctx, sql, args...).Scan(destPtrs...); err != nil { + if err := db.QueryRow(ctx, sql, args...).Scan(destPtrs...); err != nil { // nosemgrep if err == pgx.ErrNoRows { return nil, nil } @@ -1663,7 +1663,7 @@ func GetBlockRowCount(ctx context.Context, db DBQuerier, schema string, table st } var count int64 - err = db.QueryRow(ctx, sql, args...).Scan(&count) + err = db.QueryRow(ctx, sql, args...).Scan(&count) // nosemgrep if err != nil { return 0, fmt.Errorf("query to get block row count for '%s.%s' failed: %w", schema, table, err) } @@ -1681,7 +1681,7 @@ func GetDirtyAndNewBlocks(ctx context.Context, db DBQuerier, mtreeTable string, return nil, fmt.Errorf("failed to render GetDirtyAndNewBlocks SQL: %w", err) } - rows, err := db.Query(ctx, sql) + rows, err := db.Query(ctx, sql) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get dirty and new blocks for '%s' failed: %w", mtreeTable, err) } @@ -1727,7 +1727,7 @@ func GetDirtyAndNewBlocks(ctx context.Context, db DBQuerier, mtreeTable string, return nil, fmt.Errorf("failed to render GetDirtyAndNewBlocksExpanded SQL: %w", err) } - rows, err := db.Query(ctx, sql) + rows, err := db.Query(ctx, sql) // nosemgrep if err != nil { return nil, fmt.Errorf("query to get dirty and new blocks for '%s' failed: %w", mtreeTable, err) } @@ -1781,7 +1781,7 @@ func FindBlocksToSplit(ctx context.Context, db DBQuerier, mtreeTable string, ins if err != nil { return nil, fmt.Errorf("failed to render FindBlocksToSplit SQL: %w", err) } - rows, err := db.Query(ctx, sql, insertsSinceUpdate, nodePositions) + rows, err := db.Query(ctx, sql, insertsSinceUpdate, nodePositions) // nosemgrep if err != nil { return nil, fmt.Errorf("query to find blocks to split for '%s' failed: %w", mtreeTable, err) } @@ -1827,7 +1827,7 @@ func FindBlocksToSplit(ctx context.Context, db DBQuerier, mtreeTable string, ins return nil, fmt.Errorf("failed to render FindBlocksToSplitExpanded SQL: %w", err) } - rows, err := db.Query(ctx, sql, insertsSinceUpdate, nodePositions) + rows, err := db.Query(ctx, sql, insertsSinceUpdate, nodePositions) // nosemgrep if err != nil { return nil, fmt.Errorf("query to find blocks to split for '%s' failed: %w", mtreeTable, err) } @@ -1928,7 +1928,7 @@ func UpdateBlockRangeEndComposite(ctx context.Context, db DBQuerier, mtreeTable return fmt.Errorf("failed to render UpdateBlockRangeEndCompositeTx SQL: %w", err) } - if _, err := db.Exec(ctx, sql, args...); err != nil { + if _, err := db.Exec(ctx, sql, args...); err != nil { // nosemgrep return fmt.Errorf("query to update composite block range end for '%s' failed: %w", mtreeTable, err) } return nil @@ -1976,7 +1976,7 @@ func UpdateBlockRangeStartComposite(ctx context.Context, db DBQuerier, mtreeTabl return fmt.Errorf("failed to render UpdateBlockRangeStartCompositeTx SQL: %w", err) } - if _, err := db.Exec(ctx, sql, args...); err != nil { + if _, err := db.Exec(ctx, sql, args...); err != nil { // nosemgrep return fmt.Errorf("query to update composite block range start for '%s' failed: %w", mtreeTable, err) } return nil @@ -2090,7 +2090,7 @@ func findBlocksToMerge(ctx context.Context, db DBQuerier, mtreeTable, schema, ta if err != nil { return nil, fmt.Errorf("failed to render FindBlocksToMerge SQL: %w", err) } - rows, err := db.Query(ctx, sql, queryArgs...) + rows, err := db.Query(ctx, sql, queryArgs...) // nosemgrep if err != nil { return nil, fmt.Errorf("query to find blocks to merge for '%s' failed: %w", mtreeTable, err) } @@ -2135,7 +2135,7 @@ func findBlocksToMerge(ctx context.Context, db DBQuerier, mtreeTable, schema, ta if err != nil { return nil, fmt.Errorf("failed to render FindBlocksToMergeExpanded SQL: %w", err) } - rows, err := db.Query(ctx, sql, queryArgs...) + rows, err := db.Query(ctx, sql, queryArgs...) // nosemgrep if err != nil { return nil, fmt.Errorf("query to find blocks to merge for '%s' failed: %w", mtreeTable, err) } @@ -2416,7 +2416,7 @@ func GetBlockWithCount(ctx context.Context, db DBQuerier, mtreeTable, schema, ta var count int64 var start, end any - row := db.QueryRow(ctx, query, position) + row := db.QueryRow(ctx, query, position) // nosemgrep, position is int64 if isComposite { // node_position, start attrs..., end attrs..., count @@ -2589,7 +2589,7 @@ func GetBulkSplitPoints(ctx context.Context, db DBQuerier, schema, table string, return nil, fmt.Errorf("failed to render GetBulkSplitPoints SQL: %w", err) } - rows, err := db.Query(ctx, query, args...) + rows, err := db.Query(ctx, query, args...) // nosemgrep if err != nil { return nil, fmt.Errorf("failed to execute bulk split points query: %w", err) } diff --git a/go.mod b/go.mod index b62244f..730eac5 100644 --- a/go.mod +++ b/go.mod @@ -161,10 +161,10 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.63.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect go.opentelemetry.io/otel v1.43.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.43.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 // indirect go.opentelemetry.io/otel/metric v1.43.0 // indirect go.opentelemetry.io/otel/sdk v1.43.0 // indirect diff --git a/go.sum b/go.sum index 3d9310b..22ff0c7 100644 --- a/go.sum +++ b/go.sum @@ -546,14 +546,14 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 h1:RbKq8BG go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0/go.mod h1:h06DGIukJOevXaj/xrNjhi/2098RZzcLTbc0jDAUbsg= go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0 h1:vl9obrcoWVKp/lwl8tRE33853I8Xru9HFbw/skNeLs8= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0/go.mod h1:GAXRxmLJcVM3u22IjTg74zWBrRCKq8BnOqUVLodpcpw= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.43.0 h1:8UQVDcZxOJLtX6gxtDt3vY2WTgvZqMQRzjsqiIHQdkc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.43.0/go.mod h1:2lmweYCiHYpEjQ/lSJBYhj9jP1zvCvQW4BqL9dnT7FQ= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 h1:w1K+pCJoPpQifuVpsKamUdn9U0zM3xUziVOqsGksUrY= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0/go.mod h1:HBy4BjzgVE8139ieRI75oXm3EcDN+6GhD88JT1Kjvxg= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 h1:88Y4s2C8oTui1LGM6bTWkw0ICGcOLCAI5l6zsD1j20k= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0/go.mod h1:Vl1/iaggsuRlrHf/hfPJPvVag77kKyvrLeD10kpMl+A= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 h1:lwI4Dc5leUqENgGuQImwLo4WnuXFPetmPpkLi2IrX54= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0/go.mod h1:Kz/oCE7z5wuyhPxsXDuaPteSWqjSBD5YaSdbxZYGbGk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0 h1:RAE+JPfvEmvy+0LzyUA25/SGawPwIUbZ6u0Wug54sLc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0/go.mod h1:AGmbycVGEsRx9mXMZ75CsOyhSP6MFIcj/6dnG+vhVjk= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 h1:3iZJKlCZufyRzPzlQhUIWVmfltrXuGyfjREgGP3UUjc= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0/go.mod h1:/G+nUPfhq2e+qiXMGxMwumDrP5jtzU+mWN7/sjT2rak= go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= diff --git a/internal/consistency/diff/table_diff.go b/internal/consistency/diff/table_diff.go index acdd7ce..a3f05f0 100644 --- a/internal/consistency/diff/table_diff.go +++ b/internal/consistency/diff/table_diff.go @@ -252,8 +252,11 @@ func (t *TableDiffTask) buildEffectiveFilter() (string, error) { } if t.resolvedAgainstOrigin != "" { - escaped := strings.ReplaceAll(t.resolvedAgainstOrigin, "'", "''") - parts = append(parts, fmt.Sprintf("(to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' = '%s')", escaped)) + nodeID, err := strconv.Atoi(t.resolvedAgainstOrigin) + if err != nil { + return "", fmt.Errorf("resolved against-origin %q is not a valid numeric node ID", t.resolvedAgainstOrigin) + } + parts = append(parts, fmt.Sprintf("(to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' = '%d')", nodeID)) } if t.untilTime != nil { @@ -333,7 +336,7 @@ func (t *TableDiffTask) estimateRowCount(pool *pgxpool.Pool, nodeName string) (i } var planJSON []byte - if err := pool.QueryRow(t.Ctx, query).Scan(&planJSON); err != nil { + if err := pool.QueryRow(t.Ctx, query).Scan(&planJSON); err != nil { // nosemgrep return 0, fmt.Errorf("failed to estimate row count on node %s: %w", nodeName, err) } @@ -350,7 +353,7 @@ func (t *TableDiffTask) ensureFilterHasRows(pool *pgxpool.Pool, nodeName string) sql := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE %s LIMIT 1", schemaIdent, tableIdent, t.EffectiveFilter) var one int - if err := pool.QueryRow(t.Ctx, sql).Scan(&one); err != nil { + if err := pool.QueryRow(t.Ctx, sql).Scan(&one); err != nil { // nosemgrep if errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("table filter produced no rows") } @@ -547,7 +550,7 @@ func (t *TableDiffTask) fetchRows(nodeName string, r Range) ([]types.OrderedMap, logger.Debug("[%s] Fetching rows for range: Start=%v, End=%v. SQL: %s, Args: %v", nodeName, r.Start, r.End, querySQL, args) - pgRows, err := pool.Query(t.Ctx, querySQL, args...) + pgRows, err := pool.Query(t.Ctx, querySQL, args...) // nosemgrep if err != nil { return nil, fmt.Errorf("failed to query rows for range on node %s (SQL: %s, Args: %v): %w", nodeName, querySQL, args, err) } @@ -1015,7 +1018,7 @@ func (t *TableDiffTask) cleanupFilteredView() { continue } - if _, err := pool.Exec(t.Ctx, dropSQL); err != nil { + if _, err := pool.Exec(t.Ctx, dropSQL); err != nil { // nosemgrep logger.Warn("table-diff: failed to drop filtered view %s.%s on node %s: %v", t.Schema, t.FilteredViewName, name, err) } else { logger.Info("table-diff: dropped filtered view %s.%s on node %s", t.Schema, t.FilteredViewName, name) @@ -1819,7 +1822,7 @@ func (t *TableDiffTask) generateSubRanges( countQuery := fmt.Sprintf("SELECT COUNT(1) FROM %s %s", schemaTable, whereClause) var count int64 - err := pool.QueryRow(t.Ctx, countQuery, args...).Scan(&count) + err := pool.QueryRow(t.Ctx, countQuery, args...).Scan(&count) // nosemgrep if err != nil { logger.Debug("[%s] Failed to count rows in parent range %v-%v for splitting: %v. SQL: %s, Args: %v", node, parentRange.Start, parentRange.End, err, countQuery, args) return nil, fmt.Errorf("failed to count for split: %w", err) @@ -1849,14 +1852,14 @@ func (t *TableDiffTask) generateSubRanges( numPKCols := len(t.Key) if numPKCols == 1 { - err = pool.QueryRow(t.Ctx, medianQuery, medianQueryArgs...).Scan(&medianPKVal) + err = pool.QueryRow(t.Ctx, medianQuery, medianQueryArgs...).Scan(&medianPKVal) // nosemgrep } else { scanDest := make([]any, numPKCols) scanDestPtrs := make([]any, numPKCols) for i := range scanDest { scanDestPtrs[i] = &scanDest[i] } - err = pool.QueryRow(t.Ctx, medianQuery, medianQueryArgs...).Scan(scanDestPtrs...) + err = pool.QueryRow(t.Ctx, medianQuery, medianQueryArgs...).Scan(scanDestPtrs...) // nosemgrep if err == nil { medianPKVal = append([]any{}, scanDest...) } diff --git a/internal/consistency/diff/table_rerun.go b/internal/consistency/diff/table_rerun.go index 0399d88..f3ca7a0 100644 --- a/internal/consistency/diff/table_rerun.go +++ b/internal/consistency/diff/table_rerun.go @@ -257,7 +257,7 @@ func fetchRowsByPkeys(ctx context.Context, pool *pgxpool.Pool, t *TableDiffTask, createTempTableSQL := fmt.Sprintf("CREATE TEMPORARY TABLE %s (%s) ON COMMIT PRESERVE ROWS", sanitisedTempTable, strings.Join(pkColDefs, ", ")) logger.Debug("Creating temporary table for pkeys: %s", createTempTableSQL) - _, err = tx.Exec(ctx, createTempTableSQL) + _, err = tx.Exec(ctx, createTempTableSQL) // nosemgrep if err != nil { return nil, fmt.Errorf("failed to create temporary table: %w", err) } @@ -286,7 +286,7 @@ func fetchRowsByPkeys(ctx context.Context, pool *pgxpool.Pool, t *TableDiffTask, strings.Join(selectCols, ", "), schemaTable, sanitisedTempTable, strings.Join(joinConditions, " AND ")) logger.Debug("Fetching rows with pkeys from temporary table: %s", fetchSQL) - pgRows, err := tx.Query(ctx, fetchSQL) + pgRows, err := tx.Query(ctx, fetchSQL) // nosemgrep if err != nil { return nil, fmt.Errorf("failed to query rows using temp table join: %w", err) } diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 475a39d..c80d4b1 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -373,10 +373,10 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool whereClause = "TRUE" } - rowHashQuery, orderByStr := buildRowHashQuery(m.QualifiedTableName, m.Key, m.Cols, whereClause, m.ColTypes["_ref"]) + rowHashQuery, orderByStr := buildRowHashQuery(m.Schema, m.Table, m.Key, m.Cols, whereClause, m.ColTypes["_ref"]) logger.Debug("Row-hash Query: %s, Args: %v", rowHashQuery, args) - rowsH1, err := pool1.Query(m.Ctx, rowHashQuery, args...) + rowsH1, err := pool1.Query(m.Ctx, rowHashQuery, args...) // nosemgrep if err != nil { return fmt.Errorf("worker failed to get row hashes from %s: %v", work.Node1["Name"], err) } @@ -385,7 +385,7 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool return fmt.Errorf("failed to read row hashes from %s: %v", work.Node1["Name"], err) } - rowsH2, err := pool2.Query(m.Ctx, rowHashQuery, args...) + rowsH2, err := pool2.Query(m.Ctx, rowHashQuery, args...) // nosemgrep if err != nil { return fmt.Errorf("worker failed to get row hashes from %s: %v", work.Node2["Name"], err) } @@ -437,9 +437,9 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool } batch := mismatchedComposite[i:end] - q, qArgs := buildFetchRowsSQLComposite(m.QualifiedTableName, m.Key, orderByStr, batch) + q, qArgs := buildFetchRowsSQLComposite(m.Schema, m.Table, m.Key, orderByStr, batch) - r1, err := pool1.Query(m.Ctx, q, qArgs...) + r1, err := pool1.Query(m.Ctx, q, qArgs...) // nosemgrep if err != nil { return fmt.Errorf("failed to fetch rows (composite) from %s: %v", work.Node1["Name"], err) } @@ -448,7 +448,7 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool return fmt.Errorf("failed to process rows (composite) from %s: %v", work.Node1["Name"], err) } - r2, err := pool2.Query(m.Ctx, q, qArgs...) + r2, err := pool2.Query(m.Ctx, q, qArgs...) // nosemgrep if err != nil { return fmt.Errorf("failed to fetch rows (composite) from %s: %v", work.Node2["Name"], err) } @@ -469,9 +469,9 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool } batch := mismatchedSimple[i:end] - q, qArgs := buildFetchRowsSQLSimple(m.QualifiedTableName, m.Key[0], orderByStr, batch) + q, qArgs := buildFetchRowsSQLSimple(m.Schema, m.Table, m.Key[0], orderByStr, batch) - r1, err := pool1.Query(m.Ctx, q, qArgs...) + r1, err := pool1.Query(m.Ctx, q, qArgs...) // nosemgrep if err != nil { return fmt.Errorf("failed to fetch rows from %s: %v", work.Node1["Name"], err) } @@ -480,7 +480,7 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool return fmt.Errorf("failed to process rows from %s: %v", work.Node1["Name"], err) } - r2, err := pool2.Query(m.Ctx, q, qArgs...) + r2, err := pool2.Query(m.Ctx, q, qArgs...) // nosemgrep if err != nil { return fmt.Errorf("failed to fetch rows from %s: %v", work.Node2["Name"], err) } @@ -643,7 +643,7 @@ func isNumericColType(colType string) bool { return strings.HasPrefix(lower, "numeric") || strings.HasPrefix(lower, "decimal") } -func buildRowHashQuery(tableName string, key []string, cols []string, whereClause string, colTypes map[string]string) (string, string) { +func buildRowHashQuery(schema, table string, key []string, cols []string, whereClause string, colTypes map[string]string) (string, string) { pkQuoted := make([]string, len(key)) for i, k := range key { pkQuoted[i] = pgx.Identifier{k}.Sanitize() @@ -659,10 +659,11 @@ func buildRowHashQuery(tableName string, key []string, cols []string, whereClaus } concatExpr := fmt.Sprintf("concat_ws('|', %s)", strings.Join(colExprs, ", ")) + qualifiedTable := fmt.Sprintf("%s.%s", pgx.Identifier{schema}.Sanitize(), pgx.Identifier{table}.Sanitize()) orderBy := strings.Join(pkQuoted, ", ") selectList := strings.Join(pkQuoted, ", ") + ", encode(digest(" + concatExpr + ",'sha256'),'hex') as row_hash" - query := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectList, tableName, whereClause, orderBy) + query := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectList, qualifiedTable, whereClause, orderBy) return query, orderBy } @@ -704,20 +705,21 @@ func splitCompositeKey(k string) []any { return res } -func buildFetchRowsSQLSimple(tableName, pk string, orderBy string, keys []any) (string, []any) { +func buildFetchRowsSQLSimple(schema, table, pk string, orderBy string, keys []any) (string, []any) { placeholders := make([]string, len(keys)) args := make([]any, len(keys)) for i := range keys { placeholders[i] = fmt.Sprintf("$%d", i+1) args[i] = keys[i] } + qualifiedTable := fmt.Sprintf("%s.%s", pgx.Identifier{schema}.Sanitize(), pgx.Identifier{table}.Sanitize()) where := fmt.Sprintf("%s IN (%s)", pgx.Identifier{pk}.Sanitize(), strings.Join(placeholders, ",")) selectCols := "pg_xact_commit_timestamp(xmin) as commit_ts, to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' as node_origin, *" - q := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectCols, tableName, where, orderBy) + q := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectCols, qualifiedTable, where, orderBy) return q, args } -func buildFetchRowsSQLComposite(tableName string, pk []string, orderBy string, keys [][]any) (string, []any) { +func buildFetchRowsSQLComposite(schema, table string, pk []string, orderBy string, keys [][]any) (string, []any) { tupleCols := make([]string, len(pk)) for i, k := range pk { tupleCols[i] = pgx.Identifier{k}.Sanitize() @@ -734,9 +736,10 @@ func buildFetchRowsSQLComposite(tableName string, pk []string, orderBy string, k } tuples = append(tuples, fmt.Sprintf("(%s)", strings.Join(ph, ","))) } + qualifiedTable := fmt.Sprintf("%s.%s", pgx.Identifier{schema}.Sanitize(), pgx.Identifier{table}.Sanitize()) where := fmt.Sprintf("( %s ) IN ( %s )", strings.Join(tupleCols, ","), strings.Join(tuples, ",")) selectCols := "pg_xact_commit_timestamp(xmin) as commit_ts, to_json(spock.xact_commit_timestamp_origin(xmin))->>'roident' as node_origin, *" - q := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectCols, tableName, where, orderBy) + q := fmt.Sprintf("SELECT %s FROM %s WHERE %s ORDER BY %s", selectCols, qualifiedTable, where, orderBy) return q, args } diff --git a/internal/consistency/mtree/merkle_test.go b/internal/consistency/mtree/merkle_test.go index eb6b1f2..ed0310b 100644 --- a/internal/consistency/mtree/merkle_test.go +++ b/internal/consistency/mtree/merkle_test.go @@ -37,7 +37,8 @@ func TestIsNumericColType(t *testing.T) { func TestBuildRowHashQuery(t *testing.T) { tests := []struct { name string - tableName string + schema string + table string key []string cols []string whereClause string @@ -48,7 +49,8 @@ func TestBuildRowHashQuery(t *testing.T) { }{ { name: "nil colTypes - no trim_scale", - tableName: `"public"."orders"`, + schema: "public", + table: "orders", key: []string{"id"}, cols: []string{"id", "name", "amount"}, whereClause: "TRUE", @@ -70,7 +72,8 @@ func TestBuildRowHashQuery(t *testing.T) { }, { name: "numeric column gets trim_scale", - tableName: `"public"."orders"`, + schema: "public", + table: "orders", key: []string{"id"}, cols: []string{"id", "name", "price"}, whereClause: "TRUE", @@ -85,7 +88,8 @@ func TestBuildRowHashQuery(t *testing.T) { }, { name: "decimal column gets trim_scale", - tableName: `"public"."ledger"`, + schema: "public", + table: "ledger", key: []string{"txn_id"}, cols: []string{"txn_id", "debit", "credit"}, whereClause: `"txn_id" >= $1`, @@ -100,7 +104,8 @@ func TestBuildRowHashQuery(t *testing.T) { }, { name: "composite primary key", - tableName: `"sales"."line_items"`, + schema: "sales", + table: "line_items", key: []string{"order_id", "line_num"}, cols: []string{"order_id", "line_num", "qty", "unit_price"}, whereClause: "TRUE", @@ -114,7 +119,8 @@ func TestBuildRowHashQuery(t *testing.T) { }, { name: "no numeric columns - no trim_scale even with colTypes", - tableName: `"public"."users"`, + schema: "public", + table: "users", key: []string{"user_id"}, cols: []string{"user_id", "email", "created_at"}, whereClause: "TRUE", @@ -133,7 +139,7 @@ func TestBuildRowHashQuery(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - query, orderBy := buildRowHashQuery(tt.tableName, tt.key, tt.cols, tt.whereClause, tt.colTypes) + query, orderBy := buildRowHashQuery(tt.schema, tt.table, tt.key, tt.cols, tt.whereClause, tt.colTypes) if orderBy != tt.wantOrderBy { t.Errorf("orderBy = %q, want %q", orderBy, tt.wantOrderBy) diff --git a/internal/consistency/repair/stale_repair.go b/internal/consistency/repair/stale_repair.go index 65bffbe..3e7a661 100644 --- a/internal/consistency/repair/stale_repair.go +++ b/internal/consistency/repair/stale_repair.go @@ -193,7 +193,7 @@ func fetchCommitTimestamps(ctx context.Context, tx pgx.Tx, task *TableRepairTask } sb.WriteString(")") - rows, err := tx.Query(ctx, sb.String(), args...) + rows, err := tx.Query(ctx, sb.String(), args...) // nosemgrep if err != nil { return nil, fmt.Errorf("query current commit timestamp on %s: %w", nodeName, err) } @@ -294,7 +294,7 @@ func fetchCommitTimestamps(ctx context.Context, tx pgx.Tx, task *TableRepairTask } sb.WriteString(")") - rows, err := tx.Query(ctx, sb.String(), args...) + rows, err := tx.Query(ctx, sb.String(), args...) // nosemgrep if err != nil { return nil, fmt.Errorf("query current commit timestamp on %s: %w", nodeName, err) } diff --git a/internal/consistency/repair/table_repair.go b/internal/consistency/repair/table_repair.go index 71c422f..a2c70f9 100644 --- a/internal/consistency/repair/table_repair.go +++ b/internal/consistency/repair/table_repair.go @@ -153,7 +153,7 @@ func (t *TableRepairTask) setRole(tx pgx.Tx, nodeName string) error { } roleSQL := fmt.Sprintf("SET ROLE %s", pgx.Identifier{role}.Sanitize()) - if _, err := tx.Exec(t.Ctx, roleSQL); err != nil { + if _, err := tx.Exec(t.Ctx, roleSQL); err != nil { // nosemgrep return fmt.Errorf("setting role %s on %s: %w", role, nodeName, err) } logger.Debug("SET ROLE %s on %s", role, nodeName) @@ -2113,7 +2113,7 @@ func executeDeletes(ctx context.Context, tx pgx.Tx, task *TableRepairTask, nodeN deleteSQL.WriteString(")") } - cmdTag, err := tx.Exec(ctx, deleteSQL.String(), args...) + cmdTag, err := tx.Exec(ctx, deleteSQL.String(), args...) // nosemgrep if err != nil { return totalDeletedCount, fmt.Errorf("error executing delete batch: %w (SQL: %s, Args: %v)", err, deleteSQL.String(), args) } @@ -2434,7 +2434,7 @@ func executeUpsertBatch(tx pgx.Tx, task *TableRepairTask, upserts map[string]map } } - cmdTag, err := tx.Exec(task.Ctx, upsertSQL.String(), args...) + cmdTag, err := tx.Exec(task.Ctx, upsertSQL.String(), args...) // nosemgrep if err != nil { return totalUpsertedCount, fmt.Errorf("error executing upsert batch: %w (SQL: %s, Args: %v)", err, upsertSQL.String(), args) } diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index 82f68a5..65e976f 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -655,7 +655,7 @@ func tupleValueToString(tupCol *pglogrepl.TupleDataColumn, relCol *pglogrepl.Rel func getTypeNameFromOID(ctx context.Context, pool *pgxpool.Pool, oid uint32) (string, error) { var typeName string - err := pool.QueryRow(ctx, "SELECT typname FROM pg_type WHERE oid = $1", oid).Scan(&typeName) + err := pool.QueryRow(ctx, "SELECT typname FROM pg_type WHERE oid = $1", oid).Scan(&typeName) // nosemgrep if err != nil { return "", err } diff --git a/internal/infra/db/auth.go b/internal/infra/db/auth.go index 247eac6..fe4ac8d 100644 --- a/internal/infra/db/auth.go +++ b/internal/infra/db/auth.go @@ -279,7 +279,7 @@ func applyConnectionOptions(cfg *pgxpool.Config, opts ConnectionOptions) { if conn == nil { return fmt.Errorf("nil connection when applying role %s", role) } - if _, err := conn.Exec(ctx, roleSQL); err != nil { + if _, err := conn.Exec(ctx, roleSQL); err != nil { // nosemgrep return fmt.Errorf("failed to set role %q: %w", role, err) } return nil