Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 87 additions & 7 deletions src/internal/database/migrations/progressive.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { areMigrationsUpToDate } from '@internal/database/migrations/migrate'
import { ErrorCode, isStorageError } from '@internal/errors'
import { RunMigrationsOnTenants } from '@storage/events'
import { getConfig } from '../../../config'
import { logger, logSchema } from '../../monitoring'
Expand All @@ -9,6 +10,8 @@ const { dbMigrationFreezeAt } = getConfig()
export class ProgressiveMigrations {
protected tenants: string[] = []
protected emittingJobs = false
protected inFlightCreateJobs?: Promise<void>
protected pendingCreateJobsMax = 0
protected watchInterval: NodeJS.Timeout | undefined

constructor(protected readonly options: { maxSize: number; interval: number; watch?: boolean }) {
Expand Down Expand Up @@ -96,9 +99,33 @@ export class ProgressiveMigrations {
}, this.options.interval)
}

protected async createJobs(maxJobs: number) {
protected createJobs(maxJobs: number) {
this.pendingCreateJobsMax = Math.max(this.pendingCreateJobsMax, maxJobs)

if (this.inFlightCreateJobs) {
return this.inFlightCreateJobs
}

this.emittingJobs = true
const tenantsBatch = this.tenants.splice(0, maxJobs)
this.inFlightCreateJobs = this.runCreateJobs().finally(() => {
this.emittingJobs = false
this.inFlightCreateJobs = undefined
this.pendingCreateJobsMax = 0
Comment thread
ferhatelmas marked this conversation as resolved.
})

return this.inFlightCreateJobs
}

protected async runCreateJobs() {
while (this.pendingCreateJobsMax > 0) {
const maxJobs = this.pendingCreateJobsMax
this.pendingCreateJobsMax = 0
await this.createJobsBatch(maxJobs)
}
}

protected async createJobsBatch(maxJobs: number) {
const tenantsBatch = this.tenants.slice(0, maxJobs)
const jobs = await Promise.allSettled(
tenantsBatch.map(async (tenant) => {
const tenantConfig = await getTenantConfig(tenant)
Expand Down Expand Up @@ -127,15 +154,68 @@ export class ProgressiveMigrations {
})
)

const completedTenants = new Set<string>()
const droppedTenants = new Set<string>()
const retryableFailedTenants = new Set<string>()
const validJobs = jobs
.map((job) => {
if (job.status === 'fulfilled' && job.value) {
return job.value
.map((job, index) => {
const tenant = tenantsBatch[index]

if (job.status === 'rejected') {
// If there are more terminal errors later, we need to extend this check.
if (isStorageError(ErrorCode.TenantNotFound, job.reason)) {
droppedTenants.add(tenant)
logSchema.warning(
logger,
`[Migrations] Failed to prepare migration job for tenant ${tenant}; dropping tenant from queue because it no longer exists`,
{
type: 'migrations',
error: job.reason,
project: tenant,
metadata: JSON.stringify({
strategy: 'progressive',
}),
}
)
return
}

retryableFailedTenants.add(tenant)
logSchema.warning(
logger,
`[Migrations] Failed to prepare migration job for tenant ${tenant}; keeping tenant queued for retry`,
{
Comment thread
ferhatelmas marked this conversation as resolved.
type: 'migrations',
error: job.reason,
project: tenant,
metadata: JSON.stringify({
strategy: 'progressive',
}),
}
)
return
}

completedTenants.add(tenant)
return job.value
})
.filter((job) => job)

await RunMigrationsOnTenants.batchSend(validJobs as RunMigrationsOnTenants[])
this.emittingJobs = false
if (validJobs.length > 0) {
await RunMigrationsOnTenants.batchSend(validJobs as RunMigrationsOnTenants[])
Comment thread
ferhatelmas marked this conversation as resolved.
}

if (completedTenants.size > 0 || droppedTenants.size > 0 || retryableFailedTenants.size > 0) {
const remainingTenants = this.tenants.filter(
(tenant) =>
!completedTenants.has(tenant) &&
!droppedTenants.has(tenant) &&
!retryableFailedTenants.has(tenant)
)
const failedTenantsInQueue = this.tenants.filter((tenant) =>
retryableFailedTenants.has(tenant)
)
this.tenants = remainingTenants.concat(failedTenantsInQueue)
}
}
}
Loading