Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import type Stripe from 'stripe'
import { StripeSync, runMigrations } from '@supabase/stripe-sync-engine'
import { afterAll, afterEach, beforeAll, describe, expect, test, vitest } from 'vitest'
import { getConfig } from '../utils/config'
import { mockStripe } from './helpers/mockStripe'
import { logger } from '../logger'

let stripeSync: StripeSync
let schema: string

beforeAll(async () => {
process.env.REVALIDATE_OBJECTS_VIA_STRIPE_API = ''
process.env.BACKFILL_RELATED_ENTITIES = 'false'

const config = getConfig()
schema = config.schema

await runMigrations({
databaseUrl: config.databaseUrl,
schema: config.schema,
logger,
})

stripeSync = new StripeSync({
...config,
poolConfig: {
connectionString: config.databaseUrl,
},
})
const stripe = Object.assign(stripeSync.stripe, mockStripe)
vitest.spyOn(stripeSync, 'stripe', 'get').mockReturnValue(stripe)
})

afterEach(() => {
vitest.clearAllMocks()
})

afterAll(async () => {
if (stripeSync) {
await stripeSync.close()
}
})

describe('subscription tie-break on same timestamp', () => {
test('refetches subscription when same-second event would otherwise be skipped', async () => {
const baseEvent = await import('./stripe/subscription_created.json').then(
({ default: data }) => data
)
const sameTimestamp = Math.floor(Date.now() / 1000)
const subscriptionId = baseEvent.data.object.id

await stripeSync.postgresClient.query(
`delete from "${schema}"."subscription_items" where subscription = $1`,
[subscriptionId]
)
await stripeSync.postgresClient.query(`delete from "${schema}"."subscriptions" where id = $1`, [
subscriptionId,
])

const trialEvent = structuredClone(baseEvent)
trialEvent.id = 'evt_tie_trial'
trialEvent.type = 'customer.subscription.trial_will_end'
trialEvent.created = sameTimestamp
trialEvent.data.object.status = 'trialing'
trialEvent.data.object.billing_cycle_anchor = 100

const updatedEvent = structuredClone(baseEvent)
updatedEvent.id = 'evt_tie_updated'
updatedEvent.type = 'customer.subscription.updated'
updatedEvent.created = sameTimestamp
updatedEvent.data.object.status = 'active'
updatedEvent.data.object.billing_cycle_anchor = 200

mockStripe.subscriptions.retrieve.mockResolvedValueOnce({
...updatedEvent.data.object,
billing_cycle_anchor: 300,
status: 'active',
})

await stripeSync.processEvent(trialEvent as Stripe.Event)
await stripeSync.processEvent(updatedEvent as Stripe.Event)

const result = await stripeSync.postgresClient.query(
`select id, status, billing_cycle_anchor from "${schema}"."subscriptions" where id = $1`,
[subscriptionId]
)

expect(result.rows.length).toBe(1)
expect(result.rows[0].status).toBe('active')
expect(result.rows[0].billing_cycle_anchor).toBe(300)
expect(mockStripe.subscriptions.retrieve).toHaveBeenCalledTimes(1)
})
})
37 changes: 35 additions & 2 deletions packages/sync-engine/src/stripeSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,22 @@ export class StripeSync {
(subscription) =>
subscription.status === 'canceled' || subscription.status === 'incomplete_expired'
)
const syncTimestamp = this.getSyncTimestamp(event, refetched)

this.config.logger?.info(
`Received webhook ${event.id}: ${event.type} for subscription ${subscription.id}`
)

await this.upsertSubscriptions(
const upsertedSubscriptions = await this.upsertSubscriptions(
[subscription],
false,
this.getSyncTimestamp(event, refetched)
syncTimestamp
)
await this.resolveEqualTimestampSubscriptionConflict(
subscription.id,
upsertedSubscriptions,
refetched,
syncTimestamp
)
break
}
Expand Down Expand Up @@ -581,6 +588,32 @@ export class StripeSync {
return refetched ? new Date().toISOString() : new Date(event.created * 1000).toISOString()
}

private async resolveEqualTimestampSubscriptionConflict(
subscriptionId: string,
upsertedSubscriptions: Stripe.Subscription[],
refetched: boolean,
syncTimestamp: string
) {
if (refetched || upsertedSubscriptions.length > 0) return

const query = `
select 1
from "${this.config.schema}"."subscriptions"
where id = $1
and last_synced_at = $2::timestamptz
limit 1
`
const result = await this.postgresClient.query(query, [subscriptionId, syncTimestamp])
if (!result.rows.length) return

this.config.logger?.warn(
`Equal timestamp detected for subscription ${subscriptionId}. Refetching from Stripe for deterministic state.`
)

const latestSubscription = await this.stripe.subscriptions.retrieve(subscriptionId)
await this.upsertSubscriptions([latestSubscription], false, new Date().toISOString())
}

private shouldRefetchEntity(entity: { object: string }) {
return this.config.revalidateObjectsViaStripeApi?.includes(entity.object as RevalidateEntity)
}
Expand Down