Skip to content

Commit 3b3abf0

Browse files
committed
feat: add opt-in event retention purge (#359)
1 parent 1410824 commit 3b3abf0

File tree

13 files changed

+517
-4
lines changed

13 files changed

+517
-4
lines changed

CONFIGURATION.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ Running `nostream` for the first time creates the settings file in `<project_roo
109109
| limits.event.rateLimits[].rate | Maximum number of events during period. |
110110
| limits.event.whitelists.pubkeys | List of public keys to ignore rate limits. |
111111
| limits.event.whitelists.ipAddresses | List of IPs (IPv4 or IPv6) to ignore rate limits. |
112+
| limits.event.retention.maxDays | Maximum number of days to retain events. Purge deletes events that are expired (`expires_at`), soft-deleted (`deleted_at`), or older than this window (`created_at`). Any non-positive value disables retention purge. |
113+
| limits.event.retention.kind.whitelist | Event kinds excluded from retention purge. NIP-62 `REQUEST_TO_VANISH` is always excluded from retention purge, even if not listed here. |
114+
| limits.event.retention.pubkey.whitelist | Public keys excluded from retention purge. |
112115
| limits.client.subscription.maxSubscriptions | Maximum number of subscriptions per connected client. Defaults to 10. Disabled when set to zero. |
113116
| limits.client.subscription.maxFilters | Maximum number of filters per subscription. Defaults to 10. Disabled when set to zero. |
114117
| limits.message.rateLimits[].period | Rate limit period in milliseconds. |

resources/default-settings.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ limits:
7676
- "10.10.10.1"
7777
- "::ffff:10.10.10.1"
7878
event:
79+
retention:
80+
maxDays: -1
81+
kind:
82+
whitelist:
83+
- 62
84+
pubkey:
85+
whitelist: []
7986
eventId:
8087
minLeadingZeroBits: 0
8188
kind:

src/@types/repositories.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,24 @@ import { PassThrough } from 'stream'
22

33
import { DatabaseClient, EventId, Pubkey } from './base'
44
import { DBEvent, Event } from './event'
5+
import { EventKinds } from '../constants/base'
6+
import { EventKindsRange } from './settings'
57
import { Invoice } from './invoice'
68
import { SubscriptionFilter } from './subscription'
79
import { User } from './user'
810

11+
export interface EventRetentionOptions {
12+
maxDays?: number
13+
kindWhitelist?: (EventKinds | EventKindsRange)[]
14+
pubkeyWhitelist?: Pubkey[]
15+
}
16+
17+
export interface EventPurgeCounts {
18+
deleted: number
19+
expired: number
20+
retained: number
21+
}
22+
923
export type ExposedPromiseKeys = 'then' | 'catch' | 'finally'
1024

1125
export interface IQueryResult<T> extends Pick<Promise<T>, keyof Promise<T> & ExposedPromiseKeys> {
@@ -21,6 +35,7 @@ export interface IEventRepository {
2135
deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise<number>
2236
deleteByPubkeyExceptKinds(pubkey: Pubkey, excludedKinds: number[]): Promise<number>
2337
hasActiveRequestToVanish(pubkey: Pubkey): Promise<boolean>
38+
deleteExpiredAndRetained(options?: EventRetentionOptions): Promise<EventPurgeCounts>
2439
}
2540

2641
export interface IInvoiceRepository {

src/@types/services.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { Invoice } from './invoice'
22
import { Pubkey } from './base'
33

4+
export interface IMaintenanceService {
5+
clearOldEvents(): Promise<void>
6+
}
7+
48
export interface IPaymentsService {
59
getInvoiceFromPaymentsProcessor(invoice: string | Invoice): Promise<Partial<Invoice>>
610
createInvoice(

src/@types/settings.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,20 @@ export interface EventWhitelists {
6969
ipAddresses?: string[]
7070
}
7171

72+
export interface EventRetentionKindLimits {
73+
whitelist?: (EventKinds | EventKindsRange)[]
74+
}
75+
76+
export interface EventRetentionPubkeyLimits {
77+
whitelist?: Pubkey[]
78+
}
79+
80+
export interface EventRetentionLimits {
81+
maxDays?: number
82+
kind?: EventRetentionKindLimits
83+
pubkey?: EventRetentionPubkeyLimits
84+
}
85+
7286
export interface EventLimits {
7387
eventId?: EventIdLimits
7488
pubkey?: PubkeyLimits
@@ -77,6 +91,7 @@ export interface EventLimits {
7791
content?: ContentLimits | ContentLimits[]
7892
rateLimits?: EventRateLimit[]
7993
whitelists?: EventWhitelists
94+
retention?: EventRetentionLimits
8095
}
8196

8297
export interface ClientSubscriptionLimits {

src/app/maintenance-worker.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
1+
import { IMaintenanceService, IPaymentsService } from '../@types/services'
12
import { mergeDeepLeft, path, pipe } from 'ramda'
23
import { IRunnable } from '../@types/base'
34

45
import { createLogger } from '../factories/logger-factory'
56
import { delayMs } from '../utils/misc'
67
import { InvoiceStatus } from '../@types/invoice'
7-
import { IPaymentsService } from '../@types/services'
88
import { Settings } from '../@types/settings'
99

1010
const UPDATE_INVOICE_INTERVAL = 60000
11+
const CLEAR_OLD_EVENTS_TIMEOUT_MS = 5000
1112

1213
const debug = createLogger('maintenance-worker')
1314

1415
export class MaintenanceWorker implements IRunnable {
1516
private interval: NodeJS.Timeout | undefined
17+
private isRunning = false
1618

1719
public constructor(
1820
private readonly process: NodeJS.Process,
1921
private readonly paymentsService: IPaymentsService,
22+
private readonly maintenanceService: IMaintenanceService,
2023
private readonly settings: () => Settings,
2124
) {
2225
this.process
@@ -27,14 +30,43 @@ export class MaintenanceWorker implements IRunnable {
2730
.on('unhandledRejection', this.onError.bind(this))
2831
}
2932

33+
private async clearOldEventsSafely(): Promise<void> {
34+
try {
35+
await Promise.race([
36+
this.maintenanceService.clearOldEvents(),
37+
delayMs(CLEAR_OLD_EVENTS_TIMEOUT_MS).then(() => {
38+
throw new Error(`clearOldEvents timed out after ${CLEAR_OLD_EVENTS_TIMEOUT_MS}ms`)
39+
}),
40+
])
41+
} catch (error) {
42+
debug('unable to clear old events: %o', error)
43+
}
44+
}
45+
3046
public run(): void {
31-
this.interval = setInterval(() => this.onSchedule(), UPDATE_INVOICE_INTERVAL)
47+
this.interval = setInterval(async () => {
48+
if (this.isRunning) {
49+
debug('skipping scheduled maintenance run because previous run is still in progress')
50+
return
51+
}
52+
53+
this.isRunning = true
54+
try {
55+
await this.onSchedule()
56+
} catch (error) {
57+
this.onError(error as Error)
58+
} finally {
59+
this.isRunning = false
60+
}
61+
}, UPDATE_INVOICE_INTERVAL)
3262
}
3363

3464
private async onSchedule(): Promise<void> {
3565
const currentSettings = this.settings()
66+
const clearOldEventsPromise = this.clearOldEventsSafely()
3667

3768
if (!path(['payments','enabled'], currentSettings)) {
69+
await clearOldEventsPromise
3870
return
3971
}
4072

@@ -84,6 +116,8 @@ export class MaintenanceWorker implements IRunnable {
84116

85117
debug('updated %d of %d invoices successfully', successful, invoices.length)
86118
}
119+
120+
await clearOldEventsPromise
87121
}
88122

89123
private onError(error: Error) {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { getMasterDbClient, getReadReplicaDbClient } from '../database/client'
2+
import { createSettings } from './settings-factory'
3+
import { EventRepository } from '../repositories/event-repository'
4+
import { MaintenanceService } from '../services/maintenance-service'
5+
6+
export const createMaintenanceService = () => {
7+
return new MaintenanceService(
8+
new EventRepository(getMasterDbClient(), getReadReplicaDbClient()),
9+
createSettings
10+
)
11+
}
Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1+
import { createMaintenanceService } from './maintenance-service-factory'
12
import { createPaymentsService } from './payments-service-factory'
23
import { createSettings } from './settings-factory'
34
import { MaintenanceWorker } from '../app/maintenance-worker'
45

56
export const maintenanceWorkerFactory = () => {
6-
return new MaintenanceWorker(process, createPaymentsService(), createSettings)
7+
return new MaintenanceWorker(
8+
process,
9+
createPaymentsService(),
10+
createMaintenanceService(),
11+
createSettings
12+
)
713
}

src/repositories/event-repository.ts

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import {
3131
import { ContextMetadataKey, EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey, EventKinds } from '../constants/base'
3232
import { DatabaseClient, EventId } from '../@types/base'
3333
import { DBEvent, Event } from '../@types/event'
34-
import { IEventRepository, IQueryResult } from '../@types/repositories'
34+
import { EventPurgeCounts, EventRetentionOptions, IEventRepository, IQueryResult } from '../@types/repositories'
3535
import { toBuffer, toJSON } from '../utils/transform'
3636
import { createLogger } from '../factories/logger-factory'
3737
import { isGenericTagQuery } from '../utils/filter'
@@ -322,4 +322,99 @@ export class EventRepository implements IEventRepository {
322322

323323
return Boolean(result)
324324
}
325+
326+
public deleteExpiredAndRetained(options?: EventRetentionOptions): Promise<EventPurgeCounts> {
327+
const now = Math.floor(Date.now() / 1000)
328+
const maxDays = options?.maxDays
329+
330+
if (typeof maxDays !== 'number' || isNaN(maxDays) || maxDays <= 0) {
331+
debug('skipping purge: retention.maxDays is not a positive number')
332+
return Promise.resolve({
333+
deleted: 0,
334+
expired: 0,
335+
retained: 0,
336+
})
337+
}
338+
339+
const retentionLimit = now - (maxDays * 86400)
340+
const batchSize = 1000
341+
342+
debug('deleting expired and retained events (retentionLimit: %d, now: %d, batchSize: %d)', retentionLimit, now, batchSize)
343+
344+
const kindWhitelist = [
345+
...(Array.isArray(options?.kindWhitelist) ? options.kindWhitelist : []),
346+
EventKinds.REQUEST_TO_VANISH,
347+
].reduce<(number | [number, number])[]>((result, item) => {
348+
const key = Array.isArray(item)
349+
? `range:${item[0]}-${item[1]}`
350+
: `kind:${item}`
351+
352+
if (!result.some((existing) => {
353+
const existingKey = Array.isArray(existing)
354+
? `range:${existing[0]}-${existing[1]}`
355+
: `kind:${existing}`
356+
return existingKey === key
357+
})) {
358+
result.push(item)
359+
}
360+
361+
return result
362+
}, [])
363+
364+
const candidates = this.masterDbClient('events')
365+
.select('event_id')
366+
.where(function () {
367+
this.where('expires_at', '<', now)
368+
.orWhereNotNull('deleted_at')
369+
.orWhere('event_created_at', '<', retentionLimit)
370+
})
371+
.modify((query) => {
372+
query.whereNot((builder) => {
373+
kindWhitelist.forEach((kindOrRange) => {
374+
if (Array.isArray(kindOrRange)) {
375+
builder.orWhereBetween('event_kind', kindOrRange)
376+
} else {
377+
builder.orWhere('event_kind', kindOrRange)
378+
}
379+
})
380+
})
381+
382+
if (Array.isArray(options?.pubkeyWhitelist) && options.pubkeyWhitelist.length > 0) {
383+
query.whereNotIn('event_pubkey', map(toBuffer)(options.pubkeyWhitelist))
384+
}
385+
})
386+
.limit(batchSize)
387+
388+
const query = this.masterDbClient('events')
389+
.whereIn('event_id', candidates)
390+
.del(['deleted_at', 'expires_at', 'event_created_at'])
391+
392+
const mapToCounts = (deletedRows: Pick<DBEvent, 'deleted_at' | 'expires_at' | 'event_created_at'>[]): EventPurgeCounts => deletedRows.reduce((counts, row) => {
393+
if (row.deleted_at) {
394+
counts.deleted += 1
395+
} else if (typeof row.expires_at === 'number' && row.expires_at < now) {
396+
counts.expired += 1
397+
} else if (row.event_created_at < retentionLimit) {
398+
counts.retained += 1
399+
}
400+
401+
return counts
402+
}, {
403+
deleted: 0,
404+
expired: 0,
405+
retained: 0,
406+
})
407+
408+
const getPromise = () => query.then((rows: any) => mapToCounts(rows))
409+
410+
return {
411+
then: <T1, T2>(
412+
onfulfilled?: ((value: EventPurgeCounts) => T1 | PromiseLike<T1>) | null,
413+
onrejected?: ((reason: any) => T2 | PromiseLike<T2>) | null,
414+
) => getPromise().then(onfulfilled as any, onrejected as any),
415+
catch: <T>(onrejected?: ((reason: any) => T | PromiseLike<T>) | null) => getPromise().catch(onrejected as any),
416+
finally: (onfinally?: (() => void) | null) => getPromise().finally(onfinally as any),
417+
toString: (): string => query.toString(),
418+
} as Promise<EventPurgeCounts> & { toString(): string }
419+
}
325420
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { createLogger } from '../factories/logger-factory'
2+
import { IEventRepository } from '../@types/repositories'
3+
import { IMaintenanceService } from '../@types/services'
4+
import { Settings } from '../@types/settings'
5+
6+
const debug = createLogger('maintenance-service')
7+
8+
export class MaintenanceService implements IMaintenanceService {
9+
public constructor(
10+
private readonly eventRepository: IEventRepository,
11+
private readonly settings: () => Settings,
12+
) {}
13+
14+
public async clearOldEvents(): Promise<void> {
15+
const currentSettings = this.settings()
16+
const retention = currentSettings.limits?.event?.retention
17+
const maxDays = retention?.maxDays
18+
19+
if (typeof maxDays !== 'number' || isNaN(maxDays) || maxDays <= 0) {
20+
return
21+
}
22+
23+
try {
24+
debug('purging deleted, expired and old events')
25+
const deletedCounts = await this.eventRepository.deleteExpiredAndRetained({
26+
maxDays,
27+
kindWhitelist: retention?.kind?.whitelist,
28+
pubkeyWhitelist: retention?.pubkey?.whitelist,
29+
})
30+
const totalDeleted = deletedCounts.deleted + deletedCounts.expired + deletedCounts.retained
31+
if (totalDeleted > 0) {
32+
console.info(`[Maintenance] Deleted events: deleted=${deletedCounts.deleted}, expired=${deletedCounts.expired}, retained=${deletedCounts.retained}.`)
33+
}
34+
} catch (error) {
35+
console.error('Unable to purge events. Reason:', error)
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)