-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmigration.go
More file actions
322 lines (249 loc) · 9.73 KB
/
migration.go
File metadata and controls
322 lines (249 loc) · 9.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
// SPDX-FileCopyrightText: 2026 The DMorph contributors.
// SPDX-License-Identifier: MPL-2.0
// Package dmorph provides a simple database migration framework.
package dmorph
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"regexp"
"slices"
"time"
)
// MigrationTableName is the default name for the migration management table in the database.
const MigrationTableName = "migrations"
// ValidTableNameRex is the regular expression used to check if a given migration table name is valid.
var ValidTableNameRex = regexp.MustCompile("^[a-zA-Z0-9_]+$")
// ErrMigrationsUnrelated signals that the set of migrations to apply and the already applied set do not have the
// same (order of) applied migrations. Applying unrelated migrations could severely harm the database.
var ErrMigrationsUnrelated = errors.New("migrations unrelated")
// ErrMigrationsUnsorted indicates that the already applied migrations were not registered in the order
// (using the timestamp) that they should have been registered (using their id).
var ErrMigrationsUnsorted = errors.New("migrations unsorted")
// ErrNoDialect signals that no dialect for the database operations was chosen.
var ErrNoDialect = errors.New("no dialect")
// ErrNoMigrations signals that no migrations were chosen to be applied.
var ErrNoMigrations = errors.New("no migrations")
// ErrNoMigrationTable occurs if there is no migration table present.
var ErrNoMigrationTable = errors.New("no migration table")
// ErrMigrationTableNameInvalid occurs if the migration table does not adhere to ValidTableNameRex.
var ErrMigrationTableNameInvalid = errors.New("invalid migration table name")
// ErrMigrationsTooOld signals that the migrations to be applied are older than the migrations that are already
// present in the database. This error can occur when an older version of the application is started using a database
// used already by a newer version of the application.
var ErrMigrationsTooOld = errors.New("migrations too old")
// Dialect is an interface describing the functionalities needed to manage migrations inside a database.
type Dialect interface {
EnsureMigrationTableExists(ctx context.Context, db *sql.DB, tableName string) error
AppliedMigrations(ctx context.Context, db *sql.DB, tableName string) ([]string, error)
RegisterMigration(ctx context.Context, tx *sql.Tx, id string, tableName string) error
}
// Migration is an interface to provide abstract information about the migration at hand.
type Migration interface {
Key() string // identifier, used for ordering
Migrate(ctx context.Context, tx *sql.Tx) error // migration functionality
}
// migrationOrder is used to order Migration instances.
func migrationOrder(m, n Migration) int {
switch {
case m.Key() < n.Key():
return -1
case m.Key() > n.Key():
return 1
default:
return 0
}
}
// Morpher contains all the required information to run a given set of database migrations on a database.
type Morpher struct {
Dialect Dialect // database vendor specific dialect
Migrations []Migration // migrations to be applied
TableName string // table name for migration management
Log *slog.Logger // logger to be used
}
// MorphOption is the type used for functional options.
type MorphOption func(*Morpher) error
// WithDialect sets the vendor-specific database dialect to be used.
func WithDialect(dialect Dialect) MorphOption {
return func(m *Morpher) error {
m.Dialect = dialect
return nil
}
}
// WithMigrations adds the given migrations to be executed.
func WithMigrations(migrations ...Migration) MorphOption {
return func(m *Morpher) error {
m.Migrations = append(m.Migrations, migrations...)
return nil
}
}
// WithLog sets the logger that is to be used. If none is supplied, the default logger
// is used instead.
func WithLog(log *slog.Logger) MorphOption {
return func(m *Morpher) error {
m.Log = log
return nil
}
}
// WithTableName sets the migration table name to the given one. If not supplied, the
// default MigrationTableName is used instead.
func WithTableName(tableName string) func(*Morpher) error {
return func(m *Morpher) error {
if len(tableName) < 1 {
return ErrMigrationTableNameInvalid
}
if !ValidTableNameRex.MatchString(tableName) {
return ErrMigrationTableNameInvalid
}
m.TableName = tableName
return nil
}
}
// NewMorpher creates a new Morpher configuring it with the given options.
// It ensures that the newly created Morpher has migrations and a database dialect configured.
// If no migration table name is given, the default MigrationTableName is used instead.
func NewMorpher(options ...MorphOption) (*Morpher, error) {
morpher := &Morpher{
TableName: MigrationTableName,
Log: slog.Default(),
}
for _, option := range options {
if err := option(morpher); err != nil {
return nil, err
}
}
if validErr := morpher.IsValid(); validErr != nil {
return nil, validErr
}
return morpher, nil
}
// IsValid checks if the Morpher contains all the required information to run.
func (m *Morpher) IsValid() error {
if m.Dialect == nil {
return ErrNoDialect
}
if len(m.Migrations) < 1 {
return ErrNoMigrations
}
if len(m.TableName) < 1 {
return ErrNoMigrationTable
}
if !ValidTableNameRex.MatchString(m.TableName) {
return ErrMigrationTableNameInvalid
}
return nil
}
// Run runs the configured Morpher on the given database. If the migrations already applied
// to the database are a superset of the migrations the Morpher would apply, ErrMigrationsTooOld is
// returned.
// Run will run each migration in a separate transaction, with the last step to register the
// migration in the migration table.
func (m *Morpher) Run(ctx context.Context, db *sql.DB) error {
if validErr := m.IsValid(); validErr != nil {
return validErr
}
if err := m.Dialect.EnsureMigrationTableExists(ctx, db, m.TableName); err != nil {
return fmt.Errorf("could not create migration table: %w", err)
}
appliedMigrations, appliedMigrationsErr := m.Dialect.AppliedMigrations(ctx, db, m.TableName)
if appliedMigrationsErr != nil {
return fmt.Errorf("could not get applied migrations: %w", appliedMigrationsErr)
}
slices.SortFunc(m.Migrations, migrationOrder)
lastMigration := ""
if len(appliedMigrations) == 0 {
m.Log.Debug("no previous migrations")
} else {
m.Log.Debug("last migration",
slog.String("file", appliedMigrations[len(appliedMigrations)-1]))
err := m.checkAppliedMigrations(appliedMigrations)
if err != nil {
return err
}
lastMigration = appliedMigrations[len(appliedMigrations)-1]
}
return m.applyMigrations(ctx, db, lastMigration)
}
// applyMigrations applies the given migrations to the database.
// This method does not check for the validity or consistency of the database.
func (m *Morpher) applyMigrations(ctx context.Context, db *sql.DB, lastMigration string) error {
var startMigration time.Time
for _, migration := range m.Migrations {
if lastMigration >= migration.Key() {
m.Log.Info("migration already applied", slog.String("file", migration.Key()))
continue
}
m.Log.Info("applying migration", slog.String("file", migration.Key()))
startMigration = time.Now()
// Check context before starting a transaction
if err := ctx.Err(); err != nil {
return fmt.Errorf("context cancelled before migration %s: %w", migration.Key(), err)
}
if err := m.runOneMigration(ctx, db, migration); err != nil {
return err
}
m.Log.Info("migration applied",
slog.String("file", migration.Key()),
slog.Duration("duration", time.Since(startMigration)),
)
}
return nil
}
// runOneMigration executes a single migration within a database transaction and logs its completion.
func (m *Morpher) runOneMigration(ctx context.Context, db *sql.DB, mig Migration) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
// Even if we are sure to catch all possibilities, we use this as a safeguard that also with later
// modifications. When a successful commit cannot be done, at least the rollback is executed, freeing
// allocated resources of the transaction.
defer func() { _ = tx.Rollback() }()
if err = mig.Migrate(ctx, tx); err != nil {
rollbackErr := tx.Rollback()
return errors.Join(err, rollbackErr)
}
if err = m.Dialect.RegisterMigration(ctx, tx, mig.Key(), m.TableName); err != nil {
rollbackErr := tx.Rollback()
return errors.Join(err, rollbackErr)
}
if commitErr := tx.Commit(); commitErr != nil {
rollbackErr := tx.Rollback()
return errors.Join(commitErr, rollbackErr)
}
return nil
}
// checkAppliedMigrations checks if the already applied migrations in the database are consistent.
// This means inherently in them and also regarding the migrations that are to be applied.
func (m *Morpher) checkAppliedMigrations(appliedMigrations []string) error {
if !slices.IsSorted(appliedMigrations) {
m.Log.Error("migrations not applied in order")
return ErrMigrationsUnsorted
}
if m.Migrations[len(m.Migrations)-1].Key() < appliedMigrations[len(appliedMigrations)-1] {
return ErrMigrationsTooOld
}
if len(m.Migrations) < len(appliedMigrations) {
// it is impossible to have a migration newer than the one already applied
// without having at least the same number of previous migrations
return ErrMigrationsUnrelated
}
// we know here that there are at least as many migrations applied as we got to apply
for i := range appliedMigrations {
if appliedMigrations[i] != m.Migrations[i].Key() {
return ErrMigrationsUnrelated
}
}
return nil
}
// Run is a convenience function to easily get the migration job done. For more control use the
// Morpher directly.
func Run(ctx context.Context, db *sql.DB, options ...MorphOption) error {
m, morphErr := NewMorpher(options...)
if morphErr != nil {
return morphErr
}
return m.Run(ctx, db)
}