Skip to content

Latest commit

 

History

History
1470 lines (1065 loc) · 55.4 KB

File metadata and controls

1470 lines (1065 loc) · 55.4 KB

postgres

import "github.com/Aleph-Alpha/std/v1/postgres"

Package postgres provides a PostgreSQL client built on top of GORM.

The package exposes a small, database-agnostic interface (`Client`) plus a fluent query builder (`QueryBuilder`). The concrete implementation (`*Postgres`) wraps a `*gorm.DB` and adds:

  • Connection establishment + pool configuration
  • Periodic health checks and automatic reconnection (via `MonitorConnection` + `RetryConnection`)
  • Standardized CRUD and query-builder helpers
  • Optional error normalization (`TranslateError`)

Concurrency model

The active `*gorm.DB` connection pointer is stored in an `atomic.Pointer`. Calls that need a DB snapshot simply load the pointer and run the operation without holding any package-level locks. Reconnection swaps the pointer atomically.

Basic usage

cfg := postgres.Config{
    Connection: postgres.Connection{
        Host:     "localhost",
        Port:     "5432",
        User:     "postgres",
        Password: "password",
        DbName:   "mydb",
        SSLMode:  "disable",
    },
}

pg, err := postgres.NewPostgres(cfg)
if err != nil {
    // handle
}
defer pg.GracefulShutdown()

var users []User
if err := pg.Find(ctx, &users, "active = ?", true); err != nil {
    // handle
}

Transaction usage

err := pg.Transaction(ctx, func(tx postgres.Client) error {
    if err := tx.Create(ctx, &User{Name: "Alice"}); err != nil {
        return err
    }
    return nil
})

Observability \(Observer hook\)

The Postgres client supports optional observability through the unified `observability.Observer` interface (`github.com/Aleph-Alpha/std/v1/observability`). If an observer is attached, it will be notified after each operation completes (success or error) with an `observability.OperationContext`.

Non-Fx usage (builder pattern):

pg, err := postgres.NewPostgres(cfg)
if err != nil {
    return err
}
pg = pg.WithObserver(myObserver)

Fx usage (optional injection):

app := fx.New(
    postgres.FXModule,
    fx.Provide(loadPostgresConfig),
    fx.Provide(func() observability.Observer {
        return myObserver
    }),
)

The Postgres client emits (at least) the following operation names:

  • Basic ops: "find", "first", "create", "save", "update", "update_column", "update_columns", "update_where", "delete", "count", "exec"
  • Query builder terminal ops: "scan", "last", "pluck", "create_in_batches", "first_or_init", "first_or_create"
  • Transactions: "transaction"
  • Migrations: "auto_migrate", "migrate_up", "migrate_down", "migration_status"

Resource conventions:

  • Resource: table name when available (otherwise falls back to database name)
  • SubResource: optional extra context (e.g. migration id / migrations dir)

Fx integration

The package provides `FXModule` which constructs `*Postgres` and also exposes it as the `Client` interface, plus lifecycle hooks for starting/stopping monitoring.

app := fx.New(
    postgres.FXModule,
    fx.Provide(loadPostgresConfig),
)

Package postgres provides PostgreSQL database operations with an interface-first design. The interfaces defined here (Client, QueryBuilder) can be implemented by other database packages (like mariadb) to provide a consistent API across different databases.

Index

Variables

Common database error types that can be used by consumers of this package. These provide a standardized set of errors that abstract away the underlying database-specific error details.

var (
    // ErrRecordNotFound is returned when a query doesn't find any matching records
    ErrRecordNotFound = errors.New("record not found")

    // ErrDuplicateKey is returned when an insert or update violates a unique constraint
    ErrDuplicateKey = errors.New("duplicate key violation")

    // ErrForeignKey is returned when an operation violates a foreign key constraint
    ErrForeignKey = errors.New("foreign key violation")

    // ErrInvalidData is returned when the data being saved doesn't meet validation rules
    ErrInvalidData = errors.New("invalid data")

    // ErrConnectionFailed is returned when database connection cannot be established
    ErrConnectionFailed = errors.New("database connection failed")

    // ErrTransactionFailed is returned when a transaction fails to commit or rollback
    ErrTransactionFailed = errors.New("transaction failed")

    // ErrQueryTimeout is returned when a query exceeds the allowed timeout
    ErrQueryTimeout = errors.New("query timeout exceeded")

    // ErrInvalidQuery is returned when the SQL query is malformed or invalid
    ErrInvalidQuery = errors.New("invalid query")

    // ErrPermissionDenied is returned when the user lacks necessary permissions
    ErrPermissionDenied = errors.New("permission denied")

    // ErrTableNotFound is returned when trying to access a non-existent table
    ErrTableNotFound = errors.New("table not found")

    // ErrColumnNotFound is returned when trying to access a non-existent column
    ErrColumnNotFound = errors.New("column not found")

    // ErrConstraintViolation is returned for general constraint violations
    ErrConstraintViolation = errors.New("constraint violation")

    // ErrCheckConstraintViolation is returned when a check constraint is violated
    ErrCheckConstraintViolation = errors.New("check constraint violation")

    // ErrNotNullViolation is returned when trying to insert null into a not-null column
    ErrNotNullViolation = errors.New("not null constraint violation")

    // ErrDataTooLong is returned when data exceeds column length limits
    ErrDataTooLong = errors.New("data too long for column")

    // ErrDeadlock is returned when a deadlock is detected during transaction
    ErrDeadlock = errors.New("deadlock detected")

    // ErrLockTimeout is returned when unable to acquire lock within timeout
    ErrLockTimeout = errors.New("lock acquisition timeout")

    // ErrInvalidDataType is returned when data type conversion fails
    ErrInvalidDataType = errors.New("invalid data type")

    // ErrDivisionByZero is returned for division by zero operations
    ErrDivisionByZero = errors.New("division by zero")

    // ErrNumericOverflow is returned when numeric operation causes overflow
    ErrNumericOverflow = errors.New("numeric value overflow")

    // ErrDiskFull is returned when database storage is full
    ErrDiskFull = errors.New("disk full")

    // ErrTooManyConnections is returned when connection pool is exhausted
    ErrTooManyConnections = errors.New("too many connections")

    // ErrInvalidJSON is returned when JSON data is malformed
    ErrInvalidJSON = errors.New("invalid JSON data")

    // ErrIndexCorruption is returned when database index is corrupted
    ErrIndexCorruption = errors.New("index corruption detected")

    // ErrConfigurationError is returned for database configuration issues
    ErrConfigurationError = errors.New("database configuration error")

    // ErrUnsupportedOperation is returned for operations not supported by the database
    ErrUnsupportedOperation = errors.New("unsupported operation")

    // ErrMigrationFailed is returned when database migration fails
    ErrMigrationFailed = errors.New("migration failed")

    // ErrBackupFailed is returned when database backup operation fails
    ErrBackupFailed = errors.New("backup operation failed")

    // ErrRestoreFailed is returned when database restore operation fails
    ErrRestoreFailed = errors.New("restore operation failed")

    // ErrSchemaValidation is returned when schema validation fails
    ErrSchemaValidation = errors.New("schema validation failed")

    // ErrSerializationFailure is returned when transaction serialization fails
    ErrSerializationFailure = errors.New("serialization failure")

    // ErrInsufficientPrivileges is returned when user lacks required privileges
    ErrInsufficientPrivileges = errors.New("insufficient privileges")

    // ErrInvalidPassword is returned for authentication failures
    ErrInvalidPassword = errors.New("invalid password")

    // ErrAccountLocked is returned when user account is locked
    ErrAccountLocked = errors.New("account locked")

    // ErrDatabaseNotFound is returned when specified database doesn't exist
    ErrDatabaseNotFound = errors.New("database not found")

    // ErrSchemaNotFound is returned when specified schema doesn't exist
    ErrSchemaNotFound = errors.New("schema not found")

    // ErrFunctionNotFound is returned when specified function doesn't exist
    ErrFunctionNotFound = errors.New("function not found")

    // ErrTriggerNotFound is returned when specified trigger doesn't exist
    ErrTriggerNotFound = errors.New("trigger not found")

    // ErrIndexNotFound is returned when specified index doesn't exist
    ErrIndexNotFound = errors.New("index not found")

    // ErrViewNotFound is returned when specified view doesn't exist
    ErrViewNotFound = errors.New("view not found")

    // ErrSequenceNotFound is returned when specified sequence doesn't exist
    ErrSequenceNotFound = errors.New("sequence not found")

    // ErrInvalidCursor is returned when cursor operation fails
    ErrInvalidCursor = errors.New("invalid cursor")

    // ErrCursorNotFound is returned when specified cursor doesn't exist
    ErrCursorNotFound = errors.New("cursor not found")

    // ErrStatementTimeout is returned when statement execution exceeds timeout
    ErrStatementTimeout = errors.New("statement timeout")

    // ErrIdleInTransaction is returned when transaction is idle too long
    ErrIdleInTransaction = errors.New("idle in transaction timeout")

    // ErrConnectionLost is returned when database connection is lost
    ErrConnectionLost = errors.New("connection lost")

    // ErrProtocolViolation is returned for database protocol violations
    ErrProtocolViolation = errors.New("protocol violation")

    // ErrInternalError is returned for internal database errors
    ErrInternalError = errors.New("internal database error")

    // ErrSystemError is returned for system-level database errors
    ErrSystemError = errors.New("system error")
)

FXModule is an fx module that provides the Postgres database component. It registers the Postgres constructor for dependency injection and sets up lifecycle hooks to properly initialize and shut down the database connection.

This module provides:

  • *Postgres (concrete type) - for direct use and lifecycle management
  • Client (interface) - for consumers who want database abstraction

Consumers can inject either:

  • *Postgres for full access to all methods
  • Client for interface-based programming
var FXModule = fx.Module("postgres",
    fx.Provide(
        NewPostgresClientWithDI,
        fx.Annotate(
            ProvideClient,
            fx.As(new(Client)),
        ),
    ),
    fx.Invoke(RegisterPostgresLifecycle),
)

func RegisterPostgresLifecycle(params PostgresLifeCycleParams)

RegisterPostgresLifecycle registers lifecycle hooks for the Postgres database component. It sets up: 1. Connection monitoring on the application starts 2. Automatic reconnection mechanism on application start 3. Graceful shutdown of database connections on application stop

The function uses a WaitGroup to ensure that all goroutines complete before the application terminates.

type Client

Client is the main database client interface that provides CRUD operations, query building, and transaction management.

This interface allows applications to:

  • Switch between PostgreSQL and MariaDB without code changes
  • Write database-agnostic business logic
  • Mock database operations easily for testing
  • Depend on abstractions rather than concrete implementations

The Postgres type implements this interface.

type Client interface {
    // Basic CRUD operations
    Find(ctx context.Context, dest interface{}, conditions ...interface{}) error
    First(ctx context.Context, dest interface{}, conditions ...interface{}) error
    Create(ctx context.Context, value interface{}) error
    Save(ctx context.Context, value interface{}) error
    Update(ctx context.Context, model interface{}, attrs interface{}) (int64, error)
    UpdateColumn(ctx context.Context, model interface{}, columnName string, value interface{}) (int64, error)
    UpdateColumns(ctx context.Context, model interface{}, columnValues map[string]interface{}) (int64, error)
    Delete(ctx context.Context, value interface{}, conditions ...interface{}) (int64, error)
    Count(ctx context.Context, model interface{}, count *int64, conditions ...interface{}) error
    UpdateWhere(ctx context.Context, model interface{}, attrs interface{}, condition string, args ...interface{}) (int64, error)
    Exec(ctx context.Context, sql string, values ...interface{}) (int64, error)

    // Query builder for complex queries
    // Returns the QueryBuilder interface for method chaining
    Query(ctx context.Context) QueryBuilder

    // Transaction support
    // The callback function receives a Client interface (not a concrete type)
    // This allows the same transaction code to work with any database implementation
    Transaction(ctx context.Context, fn func(tx Client) error) error

    // Raw GORM access for advanced use cases
    // Use this when you need direct access to GORM's functionality
    DB() *gorm.DB

    // Error translation / classification.
    //
    // std deliberately returns raw GORM/driver errors from CRUD/query methods.
    // Use TranslateError to normalize errors to std's exported sentinels (ErrRecordNotFound,
    // ErrDuplicateKey, ...), especially when working with the Client interface (e.g. inside
    // Transaction callbacks).
    TranslateError(err error) error
    GetErrorCategory(err error) ErrorCategory
    IsRetryable(err error) bool
    IsTemporary(err error) bool
    IsCritical(err error) bool

    // Lifecycle management
    GracefulShutdown() error
}

func ProvideClient(pg *Postgres) Client

ProvideClient wraps the concrete *Postgres and returns it as Client interface. This enables applications to depend on the interface rather than concrete type.

type Config

Config represents the complete configuration for a PostgresSQL database connection. It encapsulates both the basic connection parameters and detailed connection pool settings.

type Config struct {
    // Connection contains the essential parameters needed to establish a database connection
    Connection Connection

    // ConnectionDetails contains configuration for the connection pool behavior
    ConnectionDetails ConnectionDetails
}

Connection holds the basic parameters required to connect to a PostgresSQL database. These parameters are used to construct the database connection string.

type Connection struct {
    // Host specifies the database server hostname or IP address
    Host string

    // Port specifies the TCP port on which the database server is listening to
    Port string

    // User specifies the database username for authentication
    User string

    // Password specifies the database user password for authentication
    Password string

    // DbName specifies the name of the database to connect to
    DbName string

    // SSLMode specifies the SSL mode for the connection (e.g., "disable", "require", "verify-ca", "verify-full")
    // For production environments, it's recommended to use at least "require"
    SSLMode string
}

ConnectionDetails holds configuration settings for the database connection pool. These settings help optimize performance and resource usage by controlling how database connections are created, reused, and expired.

type ConnectionDetails struct {
    // MaxOpenConns controls the maximum number of open connections to the database.
    // Setting this appropriately helps prevent overwhelming the database with too many connections.
    // If set to 0, the package default is used.
    MaxOpenConns int

    // MaxIdleConns controls the maximum number of connections in the idle connection pool.
    // A higher value can improve performance under a concurrent load but consumes more resources.
    // If set to 0, the package default is used.
    MaxIdleConns int

    // ConnMaxLifetime is the maximum amount of time a connection may be reused.
    // Expired connections are closed and removed from the pool during connection acquisition.
    // This helps ensure database-enforced timeouts are respected.
    // If set to 0, the package default is used.
    ConnMaxLifetime time.Duration
}

ErrorCategory represents different categories of database errors

type ErrorCategory int

const (
    CategoryUnknown ErrorCategory = iota
    CategoryConnection
    CategoryQuery
    CategoryData
    CategoryConstraint
    CategoryPermission
    CategoryTransaction
    CategoryResource
    CategorySystem
    CategorySchema
    CategoryOperation
)

type Logger

Logger is an interface that matches the std/v1/logger.Logger interface. It provides context-aware structured logging with optional error and field parameters.

type Logger interface {
    // InfoWithContext logs an informational message with trace context.
    InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

    // WarnWithContext logs a warning message with trace context.
    WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

    // ErrorWithContext logs an error message with trace context.
    ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}

Migration represents a single database migration with all its metadata and content. Each migration contains the SQL to execute and information about its purpose and identity.

type Migration struct {
    // ID is a unique identifier for the migration, typically a timestamp (YYYYMMDDHHMMSS)
    // that also helps establish the execution order.
    ID  string

    // Name is a descriptive label for the migration, typically describing what it does.
    Name string

    // Type categorizes the migration as either schema or data.
    Type MigrationType

    // Direction indicates whether this is an up (apply) or down (rollback) migration.
    Direction MigrationDirection

    // SQL contains the actual SQL statements to execute for this migration.
    SQL string
}

MigrationDirection specifies the direction of the migration, indicating whether it's applying a change or reverting one.

type MigrationDirection string

const (
    // UpMigration indicates a forward migration that applies a change.
    UpMigration MigrationDirection = "up"

    // DownMigration indicates a rollback migration that reverts a change.
    DownMigration MigrationDirection = "down"
)

MigrationHistoryRecord represents a record in the migration history table. It tracks when and how each migration was applied, enabling the system to determine which migrations have been run and providing an audit trail.

type MigrationHistoryRecord struct {
    // ID matches the migration ID that was applied.
    ID  string

    // Name is the descriptive name of the migration.
    Name string

    // Type indicates whether this was a schema or data migration.
    Type string

    // ExecutedAt records when the migration was applied.
    ExecutedAt time.Time

    // ExecutedBy tracks who or what system applied the migration.
    ExecutedBy string

    // Duration measures how long the migration took to execute in milliseconds.
    Duration int64

    // Status indicates whether the migration completed successfully or failed.
    Status string

    // ErrorMessage contains details if the migration failed.
    ErrorMessage string `gorm:"type:text"`
}

MigrationType defines the type of migration, categorizing the purpose of the change. This helps track and organize migrations based on their impact on the database.

type MigrationType string

const (
    // SchemaType represents schema changes (tables, columns, indexes, etc.)
    // These migrations modify the structure of the database.
    SchemaType MigrationType = "schema"

    // DataType represents data manipulations (inserts, updates, etc.)
    // These migrations modify the content within the database.
    DataType MigrationType = "data"
)

Postgres is a wrapper around gorm.DB that provides connection monitoring, automatic reconnection, and standardized database operations.

Implements both postgres.Client (deprecated) and database.Client interfaces.

Concurrency: the active `*gorm.DB` pointer is stored in an atomic pointer and can be swapped during reconnection without blocking readers.

type Postgres struct {
    // contains filtered or unexported fields
}

func NewPostgres(cfg Config) (*Postgres, error)

NewPostgres creates a new Postgres instance with the provided configuration and Logger. It establishes the initial database connection and sets up the internal state for connection monitoring and recovery. If the initial connection fails, it logs a fatal error and terminates.

Returns *Postgres concrete type (following Go best practice: "accept interfaces, return structs").

func NewPostgresClientWithDI(params PostgresParams) (*Postgres, error)

NewPostgresClientWithDI creates a new Postgres Client using dependency injection. This function is designed to be used with Uber's fx dependency injection framework where the Config, Logger, and Observer dependencies are automatically provided via the PostgresParams struct.

Parameters:

  • params: A PostgresParams struct containing the Config instance and optionally a Logger and Observer instances required to initialize the Postgres Client. This struct embeds fx.In to enable automatic injection of these dependencies.

Returns:

  • *Postgres: A fully initialized Postgres Client (concrete type). The FX module also provides this as Client interface for consumers who want abstraction.

Example usage with fx:

app := fx.New(
    postgres.FXModule,
    logger.FXModule,  // Optional: provides logger
    fx.Provide(
        func() postgres.Config {
            return loadPostgresConfig() // Your config loading function
        },
        func(metrics *prometheus.Metrics) observability.Observer {
            return &MyObserver{metrics: metrics}  // Optional observer
        },
    ),
)

This function creates the client and injects the optional logger and observer before returning.

func (*Postgres) AutoMigrate

func (p *Postgres) AutoMigrate(models ...interface{}) error

AutoMigrate is a wrapper around GORM's AutoMigrate with additional features. It tracks migrations in the migration history table and provides better error handling.

Parameters:

  • models: The GORM models to auto-migrate

Returns a GORM error if any part of the migration process fails.

This method is useful during development or for simple applications, but for production systems, explicit migrations are recommended.

func (*Postgres) Count

func (p *Postgres) Count(ctx context.Context, model interface{}, count *int64, conditions ...interface{}) error

Count determines the number of records that match the given conditions. It populates the count parameter with the result.

Parameters:

  • ctx: Context for the database operation
  • model: The model type to count
  • count: Pointer to an int64 where the count will be stored
  • conditions: Query conditions to filter the records to count

Returns a GORM error if the query fails or nil on success. Use TranslateError() to convert to standardized error types if needed.

Example:

var count int64
err := db.Count(ctx, &User{}, &count, "age > ?", 18)

func (*Postgres) Create

func (p *Postgres) Create(ctx context.Context, value interface{}) error

Create inserts a new record into the database. It processes the value parameter according to GORM conventions, performing hooks and validations defined on the model.

Parameters:

  • ctx: Context for the database operation
  • value: The struct or slice of structs to be created

Returns a GORM error if the creation fails or nil on success. Use TranslateError() to convert to standardized error types if needed.

Example:

user := User{Name: "John", Email: "john@example.com"}
err := db.Create(ctx, &user)

func (*Postgres) CreateMigration

func (p *Postgres) CreateMigration(migrationsDir, name string, migrationType MigrationType) (string, error)

CreateMigration generates a new migration file. It creates both up and down migration files with appropriate names and timestamps.

Parameters:

  • migrationsDir: Directory where migration files should be created
  • name: Descriptive name for the migration
  • migrationType: Whether this is a schema or data migration

Returns the base filename of the created migration or a wrapped error if creation fails.

Example:

filename, err := db.CreateMigration("./migrations", "create_users_table", postgres.SchemaType)
if err == nil {
    fmt.Printf("Created migration: %s\n", filename)
}

func (*Postgres) DB

func (p *Postgres) DB() *gorm.DB

DB returns the underlying GORM DB Client instance. This method provides direct access to the database connection while maintaining thread safety through an atomic load.

Use this method when you need to perform operations not covered by the wrapper methods or when you need to access specific GORM functionality. Note that direct usage bypasses some of the safety mechanisms, so use it with care.

func (*Postgres) Delete

func (p *Postgres) Delete(ctx context.Context, value interface{}, conditions ...interface{}) (int64, error)

Delete removes records that match the given value and conditions from the database. It respects soft delete if implemented on the model.

Parameters:

  • ctx: Context for the database operation
  • value: The model to delete or a slice for batch delete
  • conditions: Additional conditions to filter records to delete

Returns:

  • int64: Number of rows affected by the delete operation
  • error: GORM error if the deletion fails, nil on success

Example:

// Delete user with ID=1
rowsAffected, err := db.Delete(ctx, &User{}, "id = ?", 1)
if err != nil {
    return err
}
fmt.Printf("Deleted %d rows\n", rowsAffected)

// Or with a model instance
user := User{ID: 1}
rowsAffected, err := db.Delete(ctx, &user)

func (*Postgres) Exec

func (p *Postgres) Exec(ctx context.Context, sql string, values ...interface{}) (int64, error)

Exec executes raw SQL directly against the database. This is useful for operations not easily expressed through GORM's API or for performance-critical code.

Parameters:

  • ctx: Context for the database operation
  • sql: The SQL statement to execute
  • values: Parameters for the SQL statement

Returns:

  • int64: Number of rows affected by the SQL execution
  • error: GORM error if the execution fails, nil on success

Example:

rowsAffected, err := db.Exec(ctx, "UPDATE users SET status = ? WHERE last_login < ?",
                             "inactive", time.Now().AddDate(0, -6, 0))
if err != nil {
    return err
}
fmt.Printf("Updated %d users\n", rowsAffected)

func (*Postgres) Find

func (p *Postgres) Find(ctx context.Context, dest interface{}, conditions ...interface{}) error

Find retrieves records from the database that match the given conditions. It populates the dest parameter with the query results.

Parameters:

  • ctx: Context for the database operation
  • dest: Pointer to a slice where the results will be stored
  • conditions: Optional query conditions (follows GORM conventions)

Returns a GORM error if the query fails or nil on success. Use TranslateError() to convert to standardized error types if needed.

Example:

var users []User
err := db.Find(ctx, &users, "name LIKE ?", "%john%")

func (*Postgres) First

func (p *Postgres) First(ctx context.Context, dest interface{}, conditions ...interface{}) error

First retrieves the first record that matches the given conditions. It populates the dest parameter with the result or returns an error if no matching record exists.

Parameters:

  • ctx: Context for the database operation
  • dest: Pointer to a struct where the result will be stored
  • conditions: Optional query conditions (follows GORM conventions)

Returns gorm.ErrRecordNotFound if no matching record exists, or another GORM error if the query fails. Use TranslateError() to convert to standardized error types if needed.

Example:

var user User
err := db.First(ctx, &user, "email = ?", "user@example.com")
if errors.Is(err, gorm.ErrRecordNotFound) {
    // Handle not found
}

func (*Postgres) GetErrorCategory

func (p *Postgres) GetErrorCategory(err error) ErrorCategory

GetErrorCategory returns the category of the given error

func (*Postgres) GetMigrationStatus

func (p *Postgres) GetMigrationStatus(ctx context.Context, migrationsDir string) ([]map[string]interface{}, error)

GetMigrationStatus returns the status of all migrations. It compares available migrations with those that have been applied to build a comprehensive status report.

Parameters:

  • ctx: Context for database operations
  • migrationsDir: Directory containing the migration SQL files

Returns a slice of maps with status information for each migration, or a wrapped error if the status cannot be determined. The error wraps the underlying GORM error with additional context.

Example:

status, err := db.GetMigrationStatus(ctx, "./migrations")
if err == nil {
    for _, m := range status {
        fmt.Printf("Migration %s: %v\n", m["id"], m["applied"])
    }
}

func (*Postgres) GracefulShutdown

func (p *Postgres) GracefulShutdown() error

func (*Postgres) IsCritical

func (p *Postgres) IsCritical(err error) bool

IsCritical returns true if the error indicates a serious system problem

func (*Postgres) IsRetryable

func (p *Postgres) IsRetryable(err error) bool

IsRetryable returns true if the error might be resolved by retrying the operation

func (*Postgres) IsTemporary

func (p *Postgres) IsTemporary(err error) bool

IsTemporary returns true if the error is likely temporary and might resolve itself

func (*Postgres) MigrateDown

func (p *Postgres) MigrateDown(ctx context.Context, migrationsDir string) error

MigrateDown rolls back the last applied migration. It finds the most recently applied migration and executes its corresponding down migration to revert the changes.

Parameters:

  • ctx: Context for database operations
  • migrationsDir: Directory containing the migration SQL files

Returns a wrapped error if the rollback fails or if the down migration can't be found. The error wraps the underlying GORM error with additional context.

Example:

err := db.MigrateDown(ctx, "./migrations")

func (*Postgres) MigrateUp

func (p *Postgres) MigrateUp(ctx context.Context, migrationsDir string) error

MigrateUp applies all pending migrations from the specified directory. It identifies which migrations haven't been applied yet, sorts them by ID, and applies them in order within transactions.

Parameters:

  • ctx: Context for database operations
  • migrationsDir: Directory containing the migration SQL files

Returns a wrapped error if any migration fails or if there are issues accessing the migrations. The error wraps the underlying GORM error with additional context.

Example:

err := db.MigrateUp(ctx, "./migrations")

func (*Postgres) MonitorConnection

func (p *Postgres) MonitorConnection(ctx context.Context)

MonitorConnection periodically checks the health of the database connection and triggers reconnection attempts when necessary. It runs as a goroutine that performs health checks at regular intervals (10 seconds) and signals the RetryConnection goroutine when a failure is detected.

The function respects context cancellation and shutdown signals, ensuring proper resource cleanup and graceful termination when requested.

func (*Postgres) Query

func (p *Postgres) Query(ctx context.Context) QueryBuilder

Query provides a flexible way to build complex queries. It returns a QueryBuilder interface which can be used to chain query methods in a fluent interface. The builder snapshots the current `*gorm.DB` connection and does not hold any package-level locks while the chain is being built or executed.

Parameters:

  • ctx: Context for the database operation

Returns a QueryBuilder interface instance that can be used to construct the query.

Note: QueryBuilder methods return GORM errors directly. Use Postgres.TranslateError() to convert them to standardized error types if needed.

Example:

users := []User{}
err := db.Query(ctx).
    Where("age > ?", 18).
    Order("created_at DESC").
    Limit(10).
    Find(&users)
if err != nil {
    err = db.TranslateError(err) // Optional: translate to standardized error
}

func (*Postgres) RetryConnection

func (p *Postgres) RetryConnection(ctx context.Context)

RetryConnection continuously attempts to reconnect to the PostgresSQL database when notified of a connection failure. It operates as a goroutine that waits for signals on retryChanSignal before attempting reconnection. The function respects context cancellation and shutdown signals, ensuring graceful termination when requested.

It implements two nested loops: - The outer loop waits for retry signals - The inner loop attempts reconnection until successful

func (*Postgres) Save

func (p *Postgres) Save(ctx context.Context, value interface{}) error

Save updates the database record if the primary key exists, otherwise it creates a new record. It performs a full update of all fields, not just changed fields.

Parameters:

  • ctx: Context for the database operation
  • value: The struct to be saved

Returns a GORM error if the operation fails or nil on success. Use TranslateError() to convert to standardized error types if needed.

Example:

user.Name = "Updated Name"
err := db.Save(ctx, &user)

func (*Postgres) Transaction

func (p *Postgres) Transaction(ctx context.Context, fn func(tx Client) error) error

Transaction executes the given function within a database transaction. It creates a transaction-specific Postgres instance and passes it as Client interface. If the function returns an error, the transaction is rolled back; otherwise, it's committed.

This method provides a clean way to execute multiple database operations as a single atomic unit, with automatic handling of commit/rollback based on the execution result.

Returns a GORM error if the transaction fails or the error returned by the callback function.

Example usage:

err := pg.Transaction(ctx, func(tx Client) error {
	if err := tx.Create(ctx, user); err != nil {
		return err
	}
	return tx.Create(ctx, userProfile)
})

func (*Postgres) TranslateError

func (p *Postgres) TranslateError(err error) error

TranslateError converts GORM/database-specific errors into standardized application errors. This function provides abstraction from the underlying database implementation details, allowing application code to handle errors in a database-agnostic way.

It maps common database errors to the standardized error types defined above. If an error doesn't match any known type, it's returned unchanged.

func (*Postgres) Update

func (p *Postgres) Update(ctx context.Context, model interface{}, attrs interface{}) (int64, error)

Update updates records that match the given model's non-zero fields or primary key. It only updates the fields provided in attrs and only affects records that match the model's primary key or query conditions.

Parameters:

  • ctx: Context for the database operation
  • model: The model instance with primary key set, or struct with query conditions
  • attrs: Map, struct, or individual field values to update

Returns:

  • int64: Number of rows affected by the update operation
  • error: GORM error if the update fails, nil on success

Note: The current implementation has a bug where it executes the query twice. This should be fixed to execute only once and return both values properly.

Example:

// Update user with ID=1
rowsAffected, err := db.Update(ctx, &User{ID: 1}, map[string]interface{}{
    "name": "New Name",
    "age": 30,
})
if err != nil {
    return err
}
fmt.Printf("Updated %d rows\n", rowsAffected)

func (*Postgres) UpdateColumn

func (p *Postgres) UpdateColumn(ctx context.Context, model interface{}, columnName string, value interface{}) (int64, error)

UpdateColumn updates a single column's value for records that match the given model. Unlike Update, it doesn't run hooks and can be used to update fields that are zero values (like setting a string to empty or a number to zero).

Parameters:

  • ctx: Context for the database operation
  • model: The model instance with primary key set, or struct with query conditions
  • columnName: Name of the column to update
  • value: New value for the column

Returns:

  • int64: Number of rows affected by the update operation
  • error: GORM error if the update fails, nil on success

Example:

// Set status to "inactive" for user with ID=1
rowsAffected, err := db.UpdateColumn(ctx, &User{ID: 1}, "status", "inactive")
if err != nil {
    return err
}
fmt.Printf("Updated %d rows\n", rowsAffected)

func (*Postgres) UpdateColumns

func (p *Postgres) UpdateColumns(ctx context.Context, model interface{}, columnValues map[string]interface{}) (int64, error)

UpdateColumns updates multiple columns with name/value pairs for records that match the given model. Like UpdateColumn, it doesn't run hooks and can update zero-value fields.

Parameters:

  • ctx: Context for the database operation
  • model: The model instance with primary key set, or struct with query conditions
  • columnValues: Map of column names to their new values

Returns:

  • int64: Number of rows affected by the update operation
  • error: GORM error if the update fails, nil on success

Example:

// Update multiple fields for user with ID=1
rowsAffected, err := db.UpdateColumns(ctx, &User{ID: 1}, map[string]interface{}{
    "status": "inactive",
    "last_login": time.Now(),
})
if err != nil {
    return err
}
fmt.Printf("Updated %d rows\n", rowsAffected)

func (*Postgres) UpdateWhere

func (p *Postgres) UpdateWhere(ctx context.Context, model interface{}, attrs interface{}, condition string, args ...interface{}) (int64, error)

UpdateWhere updates records that match the specified WHERE condition. This method provides more flexibility than Update for complex conditions.

Parameters:

  • ctx: Context for the database operation
  • model: The model type to update
  • attrs: Fields to update (map, struct, or name/value pairs)
  • condition: WHERE condition as a string
  • args: Arguments for the WHERE condition

Returns:

  • int64: Number of rows affected by the update operation
  • error: GORM error if the update fails, nil on success

Example:

// Update all users who haven't logged in for 6 months
rowsAffected, err := db.UpdateWhere(ctx, &User{},
                                    map[string]interface{}{"status": "inactive"},
                                    "last_login < ?",
                                    time.Now().AddDate(0, -6, 0))
if err != nil {
    return err
}
fmt.Printf("Updated %d users to inactive status\n", rowsAffected)

func (*Postgres) WithLogger

func (p *Postgres) WithLogger(logger Logger) *Postgres

WithLogger attaches a logger to the Postgres client for internal logging. This method uses the builder pattern and returns the client for method chaining.

The logger will be used for lifecycle events, connection monitoring, and background operations.

Example:

client, err := postgres.NewPostgres(config)
if err != nil {
    return err
}
client = client.WithLogger(myLogger)
defer client.GracefulShutdown()

func (*Postgres) WithObserver

func (p *Postgres) WithObserver(observer observability.Observer) *Postgres

WithObserver attaches an observer to the Postgres client for observability hooks. This method uses the builder pattern and returns the client for method chaining.

The observer will be notified of all database operations, allowing external systems to track metrics, traces, or other observability data.

Example:

client, err := postgres.NewPostgres(config)
if err != nil {
    return err
}
client = client.WithObserver(myObserver)
defer client.GracefulShutdown()

PostgresLifeCycleParams groups the dependencies needed for Postgres lifecycle management. This struct combines all the components required to properly manage the lifecycle of a Postgres Client within an fx application, including startup, monitoring, and graceful shutdown.

The embedded fx.In marker enables automatic injection of the struct fields from the dependency container when this struct is used as a parameter in lifecycle registration functions.

type PostgresLifeCycleParams struct {
    fx.In

    Lifecycle fx.Lifecycle
    Postgres  *Postgres
}

PostgresParams groups the dependencies needed to create a Postgres Client via dependency injection. This struct is designed to work with Uber's fx dependency injection framework and provides the necessary parameters for initializing a Postgres database connection.

The embedded fx.In marker enables automatic injection of the struct fields from the dependency container when this struct is used as a parameter in provider functions.

type PostgresParams struct {
    fx.In

    Config   Config
    Logger   Logger                 `optional:"true"`
    Observer observability.Observer `optional:"true"`
}

QueryBuilder provides a fluent interface for building complex database queries. All chainable methods return the QueryBuilder interface, allowing method chaining. Terminal operations (like Find, First, Create) execute the query and return results.

Example:

var users []User
err := db.Query(ctx).
    Where("age > ?", 18).
    Order("created_at DESC").
    Limit(10).
    Find(&users)
type QueryBuilder interface {
    // Query modifiers - these return QueryBuilder for chaining
    Select(query interface{}, args ...interface{}) QueryBuilder
    Where(query interface{}, args ...interface{}) QueryBuilder
    Or(query interface{}, args ...interface{}) QueryBuilder
    Not(query interface{}, args ...interface{}) QueryBuilder
    Joins(query string, args ...interface{}) QueryBuilder
    LeftJoin(query string, args ...interface{}) QueryBuilder
    RightJoin(query string, args ...interface{}) QueryBuilder
    Preload(query string, args ...interface{}) QueryBuilder
    Group(query string) QueryBuilder
    Having(query interface{}, args ...interface{}) QueryBuilder
    Order(value interface{}) QueryBuilder
    Limit(limit int) QueryBuilder
    Offset(offset int) QueryBuilder
    Raw(sql string, values ...interface{}) QueryBuilder
    Model(value interface{}) QueryBuilder
    Distinct(args ...interface{}) QueryBuilder
    Table(name string) QueryBuilder
    Unscoped() QueryBuilder
    Scopes(funcs ...func(*gorm.DB) *gorm.DB) QueryBuilder

    // Locking methods
    ForUpdate() QueryBuilder
    ForShare() QueryBuilder
    ForUpdateSkipLocked() QueryBuilder
    ForShareSkipLocked() QueryBuilder
    ForUpdateNoWait() QueryBuilder
    ForNoKeyUpdate() QueryBuilder // PostgreSQL-specific
    ForKeyShare() QueryBuilder    // PostgreSQL-specific

    // Conflict handling and returning
    OnConflict(onConflict interface{}) QueryBuilder
    Returning(columns ...string) QueryBuilder

    // Custom clauses
    Clauses(conds ...interface{}) QueryBuilder

    // Terminal operations - these execute the query
    Scan(dest interface{}) error
    Find(dest interface{}) error
    First(dest interface{}) error
    Last(dest interface{}) error
    Count(count *int64) error
    Updates(values interface{}) (int64, error)
    Delete(value interface{}) (int64, error)
    Pluck(column string, dest interface{}) (int64, error)
    Create(value interface{}) (int64, error)
    CreateInBatches(value interface{}, batchSize int) (int64, error)
    FirstOrInit(dest interface{}, conds ...interface{}) error
    FirstOrCreate(dest interface{}, conds ...interface{}) error

    // Utility methods
    Done()                // Finalize builder (currently a no-op)
    ToSubquery() *gorm.DB // Convert to GORM subquery
}

RowScanner provides an interface for scanning a single row of data. It abstracts the process of parsing column values into Go variables, allowing for efficient handling of individual rows returned from a query.

type RowScanner interface {
    // Scan copies the column values from the current row into the values pointed to by dest.
    // The number of values in dest must match the number of columns in the row.
    Scan(dest ...interface{}) error
}

RowsScanner provides an interface for iterating through rows of data returned by a query. It extends RowScanner functionality with methods for navigation and error handling, allowing for efficient processing of result sets with multiple rows.

type RowsScanner interface {
    // Next prepares the next row for reading. It returns false when there are no more rows.
    Next() bool

    // Scan copies column values from the current row into the provided destination variables.
    Scan(dest ...interface{}) error

    // Close closes the rows iterator, releasing any associated resources.
    Close() error

    // Err returns any error encountered during iteration.
    Err() error
}

Generated by gomarkdoc