-
Notifications
You must be signed in to change notification settings - Fork 3
Ability to use vanilla Postgres #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
de3f378
c1ac5a7
964bcf1
60f2f14
b9d02c1
acef061
adfc090
4a36311
701efd8
a3af3c2
3b2aec3
fed4e9f
fcd68db
f85367a
0e0ea88
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,18 @@ | |
|
|
||
| package queries | ||
|
|
||
| import "text/template" | ||
| import ( | ||
| "text/template" | ||
|
|
||
| "github.com/jackc/pgx/v5" | ||
| "github.com/pgedge/ace/pkg/config" | ||
| ) | ||
|
|
||
| // aceTemplateFuncs provides the {{aceSchema}} function to SQL templates. | ||
| // The function is evaluated at render time (after config is loaded), not at parse time. | ||
| var aceTemplateFuncs = template.FuncMap{ | ||
| "aceSchema": func() string { return pgx.Identifier{config.Get().MTree.Schema}.Sanitize() }, | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| type Templates struct { | ||
| EstimateRowCount *template.Template | ||
|
|
@@ -78,6 +89,7 @@ type Templates struct { | |
| GetBlockCountSimple *template.Template | ||
| GetBlockSizeFromMetadata *template.Template | ||
| GetMaxNodeLevel *template.Template | ||
| CompareBlocksSQL *template.Template | ||
|
|
||
| DropXORFunction *template.Template | ||
| DropMetadataTable *template.Template | ||
|
|
@@ -119,6 +131,10 @@ type Templates struct { | |
| RemoveTableFromCDCMetadata *template.Template | ||
| GetSpockOriginLSNForNode *template.Template | ||
| GetSpockSlotLSNForNode *template.Template | ||
| GetNativeOriginLSNForNode *template.Template | ||
| GetNativeSlotLSNForNode *template.Template | ||
| GetReplicationOriginNames *template.Template | ||
| GetNativeNodeOriginNames *template.Template | ||
| EnsureHashVersionColumn *template.Template | ||
| GetHashVersion *template.Template | ||
| MarkAllLeavesDirty *template.Template | ||
|
|
@@ -133,8 +149,8 @@ type Templates struct { | |
|
|
||
| var SQLTemplates = Templates{ | ||
| // A template isn't needed for this query; just keeping the struct uniform | ||
| CreateMetadataTable: template.Must(template.New("createMetadataTable").Parse(` | ||
| CREATE TABLE IF NOT EXISTS spock.ace_mtree_metadata ( | ||
| CreateMetadataTable: template.Must(template.New("createMetadataTable").Funcs(aceTemplateFuncs).Parse(` | ||
| CREATE TABLE IF NOT EXISTS {{aceSchema}}.ace_mtree_metadata ( | ||
| schema_name text, | ||
| table_name text, | ||
| total_rows bigint, | ||
|
|
@@ -161,8 +177,8 @@ var SQLTemplates = Templates{ | |
| ALTER PUBLICATION {{.PublicationName}} DROP TABLE {{.TableName}} | ||
| `)), | ||
|
|
||
| RemoveTableFromCDCMetadata: template.Must(template.New("removeTableFromCDCMetadata").Parse(` | ||
| UPDATE spock.ace_cdc_metadata | ||
| RemoveTableFromCDCMetadata: template.Must(template.New("removeTableFromCDCMetadata").Funcs(aceTemplateFuncs).Parse(` | ||
| UPDATE {{aceSchema}}.ace_cdc_metadata | ||
| SET tables = array_remove(tables, $1) | ||
| WHERE publication_name = $2 | ||
| `)), | ||
|
|
@@ -179,9 +195,9 @@ var SQLTemplates = Templates{ | |
| ) | ||
| `)), | ||
|
|
||
| UpdateCDCMetadata: template.Must(template.New("updateCdcMetadata").Parse(` | ||
| UpdateCDCMetadata: template.Must(template.New("updateCdcMetadata").Funcs(aceTemplateFuncs).Parse(` | ||
| INSERT INTO | ||
| spock.ace_cdc_metadata ( | ||
| {{aceSchema}}.ace_cdc_metadata ( | ||
| publication_name, | ||
| slot_name, | ||
| start_lsn, | ||
|
|
@@ -220,17 +236,17 @@ var SQLTemplates = Templates{ | |
| CheckPIDExists: template.Must(template.New("checkPIDExists").Parse(` | ||
| SELECT pid FROM pg_stat_activity WHERE pid = $1 | ||
| `)), | ||
| DropCDCMetadataTable: template.Must(template.New("dropCDCMetadataTable").Parse(` | ||
| DROP TABLE IF EXISTS spock.ace_cdc_metadata | ||
| DropCDCMetadataTable: template.Must(template.New("dropCDCMetadataTable").Funcs(aceTemplateFuncs).Parse(` | ||
| DROP TABLE IF EXISTS {{aceSchema}}.ace_cdc_metadata | ||
| `)), | ||
|
|
||
| GetCDCMetadata: template.Must(template.New("getCDCMetadata").Parse(` | ||
| GetCDCMetadata: template.Must(template.New("getCDCMetadata").Funcs(aceTemplateFuncs).Parse(` | ||
| SELECT | ||
| slot_name, | ||
| start_lsn, | ||
| tables | ||
| FROM | ||
| spock.ace_cdc_metadata | ||
| {{aceSchema}}.ace_cdc_metadata | ||
| WHERE | ||
| publication_name = $1 | ||
| `)), | ||
|
|
@@ -317,8 +333,8 @@ var SQLTemplates = Templates{ | |
| AND mt.node_position = b.node_position; | ||
| `)), | ||
|
|
||
| CreateCDCMetadataTable: template.Must(template.New("createCDCMetadataTable").Parse(` | ||
| CREATE TABLE IF NOT EXISTS spock.ace_cdc_metadata ( | ||
| CreateCDCMetadataTable: template.Must(template.New("createCDCMetadataTable").Funcs(aceTemplateFuncs).Parse(` | ||
| CREATE TABLE IF NOT EXISTS {{aceSchema}}.ace_cdc_metadata ( | ||
| publication_name text PRIMARY KEY, | ||
| slot_name text, | ||
| start_lsn text, | ||
|
|
@@ -772,9 +788,9 @@ var SQLTemplates = Templates{ | |
| VALUES | ||
| (0, $1, {{.StartExpr}}, {{.EndExpr}}); | ||
| `)), | ||
| CreateXORFunction: template.Must(template.New("createXORFunction").Parse(` | ||
| CreateXORFunction: template.Must(template.New("createXORFunction").Funcs(aceTemplateFuncs).Parse(` | ||
| CREATE | ||
| OR REPLACE FUNCTION spock.bytea_xor(a bytea, b bytea) RETURNS bytea AS $$ | ||
| OR REPLACE FUNCTION {{aceSchema}}.bytea_xor(a bytea, b bytea) RETURNS bytea AS $$ | ||
| DECLARE | ||
| result bytea; | ||
| len int; | ||
|
|
@@ -805,7 +821,7 @@ var SQLTemplates = Templates{ | |
| CREATE OPERATOR # ( | ||
| LEFTARG = bytea, | ||
| RIGHTARG = bytea, | ||
| PROCEDURE = spock.bytea_xor | ||
| PROCEDURE = {{aceSchema}}.bytea_xor | ||
| ); | ||
| END IF; | ||
| END $$; | ||
|
|
@@ -840,9 +856,9 @@ var SQLTemplates = Templates{ | |
| AND c.relname = $2 | ||
| AND a.attname = $3 | ||
| `)), | ||
| UpdateMetadata: template.Must(template.New("updateMetadata").Parse(` | ||
| UpdateMetadata: template.Must(template.New("updateMetadata").Funcs(aceTemplateFuncs).Parse(` | ||
| INSERT INTO | ||
| spock.ace_mtree_metadata ( | ||
| {{aceSchema}}.ace_mtree_metadata ( | ||
| schema_name, | ||
| table_name, | ||
| total_rows, | ||
|
|
@@ -873,8 +889,8 @@ var SQLTemplates = Templates{ | |
| hash_version = EXCLUDED.hash_version, | ||
| last_updated = EXCLUDED.last_updated | ||
| `)), | ||
| DeleteMetadata: template.Must(template.New("deleteMetadata").Parse(` | ||
| DELETE FROM spock.ace_mtree_metadata WHERE schema_name = $1 AND table_name = $2 | ||
| DeleteMetadata: template.Must(template.New("deleteMetadata").Funcs(aceTemplateFuncs).Parse(` | ||
| DELETE FROM {{aceSchema}}.ace_mtree_metadata WHERE schema_name = $1 AND table_name = $2 | ||
| `)), | ||
| InsertBlockRanges: template.Must(template.New("insertBlockRanges").Parse(` | ||
| INSERT INTO | ||
|
|
@@ -1055,11 +1071,11 @@ var SQLTemplates = Templates{ | |
| ORDER BY | ||
| node_position | ||
| `)), | ||
| GetRowCountEstimate: template.Must(template.New("getRowCountEstimate").Parse(` | ||
| GetRowCountEstimate: template.Must(template.New("getRowCountEstimate").Funcs(aceTemplateFuncs).Parse(` | ||
| SELECT | ||
| total_rows | ||
| FROM | ||
| spock.ace_mtree_metadata | ||
| {{aceSchema}}.ace_mtree_metadata | ||
| WHERE | ||
| schema_name = $1 | ||
| AND table_name = $2 | ||
|
|
@@ -1294,11 +1310,11 @@ var SQLTemplates = Templates{ | |
| mt.range_start, | ||
| mt.range_end | ||
| `)), | ||
| GetBlockSizeFromMetadata: template.Must(template.New("getBlockSizeFromMetadata").Parse(` | ||
| GetBlockSizeFromMetadata: template.Must(template.New("getBlockSizeFromMetadata").Funcs(aceTemplateFuncs).Parse(` | ||
| SELECT | ||
| block_size | ||
| FROM | ||
| spock.ace_mtree_metadata | ||
| {{aceSchema}}.ace_mtree_metadata | ||
| WHERE | ||
| schema_name = $1 | ||
| AND table_name = $2 | ||
|
|
@@ -1309,11 +1325,19 @@ var SQLTemplates = Templates{ | |
| FROM | ||
| {{.MtreeTable}} | ||
| `)), | ||
| DropXORFunction: template.Must(template.New("dropXORFunction").Parse(` | ||
| DROP FUNCTION IF EXISTS spock.bytea_xor(bytea, bytea) CASCADE | ||
| CompareBlocksSQL: template.Must(template.New("compareBlocksSQL").Parse(` | ||
| SELECT | ||
| * | ||
| FROM | ||
| {{.TableName}} | ||
| WHERE | ||
| {{.WhereClause}} | ||
| `)), | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| DropXORFunction: template.Must(template.New("dropXORFunction").Funcs(aceTemplateFuncs).Parse(` | ||
| DROP FUNCTION IF EXISTS {{aceSchema}}.bytea_xor(bytea, bytea) CASCADE | ||
| `)), | ||
| DropMetadataTable: template.Must(template.New("dropMetadataTable").Parse(` | ||
| DROP TABLE IF EXISTS spock.ace_mtree_metadata CASCADE | ||
| DropMetadataTable: template.Must(template.New("dropMetadataTable").Funcs(aceTemplateFuncs).Parse(` | ||
| DROP TABLE IF EXISTS {{aceSchema}}.ace_mtree_metadata CASCADE | ||
| `)), | ||
| DropMtreeTable: template.Must(template.New("dropMtreeTable").Parse(` | ||
| DROP TABLE IF EXISTS {{.MtreeTable}} CASCADE | ||
|
|
@@ -1506,13 +1530,13 @@ var SQLTemplates = Templates{ | |
| ORDER BY rs.confirmed_flush_lsn DESC | ||
| LIMIT 1 | ||
| `)), | ||
| EnsureHashVersionColumn: template.Must(template.New("ensureHashVersionColumn").Parse(` | ||
| ALTER TABLE spock.ace_mtree_metadata | ||
| EnsureHashVersionColumn: template.Must(template.New("ensureHashVersionColumn").Funcs(aceTemplateFuncs).Parse(` | ||
| ALTER TABLE {{aceSchema}}.ace_mtree_metadata | ||
| ADD COLUMN IF NOT EXISTS hash_version int NOT NULL DEFAULT 1 | ||
| `)), | ||
| GetHashVersion: template.Must(template.New("getHashVersion").Parse(` | ||
| GetHashVersion: template.Must(template.New("getHashVersion").Funcs(aceTemplateFuncs).Parse(` | ||
| SELECT COALESCE( | ||
| (SELECT hash_version FROM spock.ace_mtree_metadata | ||
| (SELECT hash_version FROM {{aceSchema}}.ace_mtree_metadata | ||
| WHERE schema_name = $1 AND table_name = $2), | ||
| 1 | ||
| ) | ||
|
|
@@ -1522,11 +1546,41 @@ var SQLTemplates = Templates{ | |
| SET dirty = true | ||
| WHERE node_level = 0 | ||
| `)), | ||
| UpdateHashVersion: template.Must(template.New("updateHashVersion").Parse(` | ||
| UPDATE spock.ace_mtree_metadata | ||
| UpdateHashVersion: template.Must(template.New("updateHashVersion").Funcs(aceTemplateFuncs).Parse(` | ||
| UPDATE {{aceSchema}}.ace_mtree_metadata | ||
| SET hash_version = $1, last_updated = current_timestamp | ||
| WHERE schema_name = $2 AND table_name = $3 | ||
| `)), | ||
| GetNativeOriginLSNForNode: template.Must(template.New("getNativeOriginLSNForNode").Parse(` | ||
| SELECT ros.remote_lsn::text | ||
| FROM pg_catalog.pg_replication_origin_status ros | ||
| JOIN pg_catalog.pg_replication_origin ro ON ro.roident = ros.local_id | ||
| JOIN pg_catalog.pg_subscription s ON ro.roname LIKE 'pg_%' || s.oid::text | ||
| WHERE s.subname ~ ('\m' || $1 || '\M') | ||
| AND ros.remote_lsn IS NOT NULL | ||
|
Comment on lines
+1559
to
+1560
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. $1 (a user-supplied node name) is interpolated raw into a regex fragment. A node name containing |, ., or * silently
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. technically valid but low-risk in practice: node |
||
| LIMIT 1 | ||
| `)), | ||
| GetNativeSlotLSNForNode: template.Must(template.New("getNativeSlotLSNForNode").Parse(` | ||
| SELECT rs.confirmed_flush_lsn::text | ||
| FROM pg_catalog.pg_replication_slots rs | ||
| JOIN pg_catalog.pg_subscription s ON rs.slot_name = s.subslotname | ||
| WHERE s.subname ~ ('\m' || $1 || '\M') | ||
| AND rs.confirmed_flush_lsn IS NOT NULL | ||
| ORDER BY rs.confirmed_flush_lsn DESC | ||
| LIMIT 1 | ||
| `)), | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| GetReplicationOriginNames: template.Must(template.New("getReplicationOriginNames").Parse(` | ||
| SELECT roident::text, roname FROM pg_replication_origin; | ||
| `)), | ||
| // GetNativeNodeOriginNames maps pg_replication_origin entries to their | ||
| // corresponding pg_subscription names. This provides the native PG | ||
| // equivalent of GetSpockNodeNames — mapping origin IDs (used by | ||
| // pg_xact_commit_timestamp_origin) to human-readable node identifiers. | ||
| GetNativeNodeOriginNames: template.Must(template.New("getNativeNodeOriginNames").Parse(` | ||
| SELECT ro.roident::text, s.subname | ||
| FROM pg_catalog.pg_replication_origin ro | ||
| JOIN pg_catalog.pg_subscription s ON ro.roname = 'pg_' || s.oid::text | ||
| `)), | ||
| GetReplicationOriginByName: template.Must(template.New("getReplicationOriginByName").Parse(` | ||
| SELECT roident FROM pg_replication_origin WHERE roname = $1 | ||
| `)), | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.