Skip to content

Commit 7173f4d

Browse files
committed
feat: add opt-in event retention purge (#359)
1 parent 6c82d8b commit 7173f4d

File tree

13 files changed

+297
-2
lines changed

13 files changed

+297
-2
lines changed

CONFIGURATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ Running `nostream` for the first time creates the settings file in `<project_roo
109109
| limits.event.rateLimits[].rate | Maximum number of events during period. |
110110
| limits.event.whitelists.pubkeys | List of public keys to ignore rate limits. |
111111
| limits.event.whitelists.ipAddresses | List of IPs (IPv4 or IPv6) to ignore rate limits. |
112+
| limits.event.retentionDays | Number of days to retain events (based on created_at). Older events are purged. Defaults to -1 (disabled). Any non-positive number disables the feature. |
112113
| limits.client.subscription.maxSubscriptions | Maximum number of subscriptions per connected client. Defaults to 10. Disabled when set to zero. |
113114
| limits.client.subscription.maxFilters | Maximum number of filters per subscription. Defaults to 10. Disabled when set to zero. |
114115
| limits.message.rateLimits[].period | Rate limit period in milliseconds. |

resources/default-settings.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ limits:
7676
- "10.10.10.1"
7777
- "::ffff:10.10.10.1"
7878
event:
79+
retentionDays: -1
7980
eventId:
8081
minLeadingZeroBits: 0
8182
kind:

src/@types/repositories.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export interface IEventRepository {
1717
upsert(event: Event): Promise<number>
1818
findByFilters(filters: SubscriptionFilter[]): IQueryResult<DBEvent[]>
1919
deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise<number>
20+
deleteExpiredAndRetained(retentionDays?: number): Promise<number>
2021
}
2122

2223
export interface IInvoiceRepository {

src/@types/services.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { Invoice } from './invoice'
22
import { Pubkey } from './base'
33

4+
export interface IMaintenanceService {
5+
clearOldEvents(): Promise<void>
6+
}
7+
48
export interface IPaymentsService {
59
getInvoiceFromPaymentsProcessor(invoice: string | Invoice): Promise<Partial<Invoice>>
610
createInvoice(

src/@types/settings.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export interface EventLimits {
7777
content?: ContentLimits | ContentLimits[]
7878
rateLimits?: EventRateLimit[]
7979
whitelists?: EventWhitelists
80+
retentionDays?: number
8081
}
8182

8283
export interface ClientSubscriptionLimits {

src/app/maintenance-worker.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
import { IMaintenanceService, IPaymentsService } from '../@types/services'
12
import { mergeDeepLeft, path, pipe } from 'ramda'
23
import { IRunnable } from '../@types/base'
34

45
import { createLogger } from '../factories/logger-factory'
56
import { delayMs } from '../utils/misc'
67
import { InvoiceStatus } from '../@types/invoice'
7-
import { IPaymentsService } from '../@types/services'
88
import { Settings } from '../@types/settings'
99

1010
const UPDATE_INVOICE_INTERVAL = 60000
@@ -17,6 +17,7 @@ export class MaintenanceWorker implements IRunnable {
1717
public constructor(
1818
private readonly process: NodeJS.Process,
1919
private readonly paymentsService: IPaymentsService,
20+
private readonly maintenanceService: IMaintenanceService,
2021
private readonly settings: () => Settings,
2122
) {
2223
this.process
@@ -34,6 +35,8 @@ export class MaintenanceWorker implements IRunnable {
3435
private async onSchedule(): Promise<void> {
3536
const currentSettings = this.settings()
3637

38+
await this.maintenanceService.clearOldEvents()
39+
3740
if (!path(['payments','enabled'], currentSettings)) {
3841
return
3942
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { getMasterDbClient, getReadReplicaDbClient } from '../database/client'
2+
import { createSettings } from './settings-factory'
3+
import { EventRepository } from '../repositories/event-repository'
4+
import { MaintenanceService } from '../services/maintenance-service'
5+
6+
export const createMaintenanceService = () => {
7+
return new MaintenanceService(
8+
new EventRepository(getMasterDbClient(), getReadReplicaDbClient()),
9+
createSettings
10+
)
11+
}
Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1+
import { createMaintenanceService } from './maintenance-service-factory'
12
import { createPaymentsService } from './payments-service-factory'
23
import { createSettings } from './settings-factory'
34
import { MaintenanceWorker } from '../app/maintenance-worker'
45

56
export const maintenanceWorkerFactory = () => {
6-
return new MaintenanceWorker(process, createPaymentsService(), createSettings)
7+
return new MaintenanceWorker(
8+
process,
9+
createPaymentsService(),
10+
createMaintenanceService(),
11+
createSettings
12+
)
713
}

src/repositories/event-repository.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,4 +251,27 @@ export class EventRepository implements IEventRepository {
251251
deleted_at: this.masterDbClient.raw('now()'),
252252
})
253253
}
254+
255+
public deleteExpiredAndRetained(retentionDays?: number): Promise<number> {
256+
const now = Math.floor(Date.now() / 1000)
257+
258+
if (typeof retentionDays !== 'number' || isNaN(retentionDays) || retentionDays <= 0) {
259+
debug('skipping purge: retentionDays is not a positive number')
260+
return Promise.resolve(0)
261+
}
262+
263+
const retentionLimit = now - (retentionDays * 86400)
264+
265+
debug('deleting expired and retained events (retentionLimit: %d, now: %d)', retentionLimit, now)
266+
267+
return this.masterDbClient('events')
268+
.where(function () {
269+
this.where(function () {
270+
this.whereNotNull('expires_at').andWhere('expires_at', '<', now)
271+
})
272+
.orWhereNotNull('deleted_at')
273+
.orWhere('event_created_at', '<', retentionLimit)
274+
})
275+
.del()
276+
}
254277
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { createLogger } from '../factories/logger-factory'
2+
import { IEventRepository } from '../@types/repositories'
3+
import { IMaintenanceService } from '../@types/services'
4+
import { Settings } from '../@types/settings'
5+
6+
const debug = createLogger('maintenance-service')
7+
8+
export class MaintenanceService implements IMaintenanceService {
9+
public constructor(
10+
private readonly eventRepository: IEventRepository,
11+
private readonly settings: () => Settings,
12+
) {}
13+
14+
public async clearOldEvents(): Promise<void> {
15+
const currentSettings = this.settings()
16+
const retentionDays = currentSettings.limits?.event?.retentionDays
17+
18+
if (typeof retentionDays !== 'number' || isNaN(retentionDays) || retentionDays <= 0) {
19+
return
20+
}
21+
22+
try {
23+
debug('purging deleted, expired and old events')
24+
const deletedCount = await this.eventRepository.deleteExpiredAndRetained(retentionDays)
25+
if (deletedCount > 0) {
26+
console.info(`[Maintenance] Deleted ${deletedCount} expired and retained events.`)
27+
}
28+
} catch (error) {
29+
console.error('Unable to purge events. Reason:', error)
30+
}
31+
}
32+
}

0 commit comments

Comments
 (0)