Skip to content

Commit 791c01f

Browse files
refactor: reuse EventRepository instead of raw queries in import service
1 parent 21a85d6 commit 791c01f

File tree

2 files changed

+30
-279
lines changed

2 files changed

+30
-279
lines changed

src/import-events.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
EventImportStats,
1414
} from './services/event-import-service'
1515
import { getMasterDbClient } from './database/client'
16+
import { EventRepository } from './repositories/event-repository'
1617

1718
interface CliOptions {
1819
batchSize: number
@@ -127,7 +128,8 @@ const run = async (): Promise<void> => {
127128
const absoluteFilePath = ensureValidInputFile(options.filePath)
128129

129130
const dbClient = getMasterDbClient()
130-
const importer = new EventImportService(createEventBatchPersister(dbClient))
131+
const eventRepository = new EventRepository(dbClient, dbClient)
132+
const importer = new EventImportService(createEventBatchPersister(eventRepository))
131133

132134
let loggedErrors = 0
133135
let suppressedErrors = 0

src/services/event-import-service.ts

Lines changed: 27 additions & 278 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,23 @@
11
import fs from 'fs'
22
import readline from 'readline'
33

4-
import { Knex } from 'knex'
5-
6-
import { DatabaseClient, EventId } from '../@types/base'
74
import {
8-
getEventExpiration,
95
isDeleteEvent,
106
isEphemeralEvent,
117
isEventIdValid,
128
isEventSignatureValid,
139
isParameterizedReplaceableEvent,
1410
isReplaceableEvent,
1511
} from '../utils/event'
16-
import { toBuffer, toJSON } from '../utils/transform'
1712
import { attemptValidation } from '../utils/validation'
1813

1914
import { Event } from '../@types/event'
2015
import { eventSchema } from '../schemas/event-schema'
2116
import { EventTags } from '../constants/base'
17+
import { IEventRepository } from '../@types/repositories'
2218

2319
const DEFAULT_BATCH_SIZE = 1000
2420

25-
const REPLACEABLE_EVENT_CONFLICT_TARGET =
26-
'(event_pubkey, event_kind, event_deduplication) '
27-
+ 'WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 '
28-
+ 'OR (event_kind >= 10000 AND event_kind < 20000)) '
29-
+ 'OR (event_kind >= 30000 AND event_kind < 40000)'
30-
31-
interface ImportEventRow {
32-
deleted_at: null
33-
event_content: string
34-
event_created_at: number
35-
event_deduplication: string | null
36-
event_id: Buffer
37-
event_kind: number
38-
event_pubkey: Buffer
39-
event_signature: Buffer
40-
event_tags: string
41-
expires_at: number | null
42-
}
43-
4421
export interface EventImportStats {
4522
errors: number
4623
inserted: number
@@ -67,276 +44,48 @@ const getErrorMessage = (error: unknown): string => {
6744
return String(error)
6845
}
6946

70-
const getAffectedRowCount = (result: unknown): number => {
71-
if (Array.isArray(result)) {
72-
return result.length
73-
}
74-
75-
if (
76-
typeof result === 'object'
77-
&& result !== null
78-
&& 'rowCount' in result
79-
&& typeof (result as { rowCount: unknown }).rowCount === 'number'
80-
) {
81-
return Number((result as { rowCount: number }).rowCount)
82-
}
83-
84-
return 0
85-
}
86-
87-
const isEventIdUniqueViolation = (error: unknown): boolean => {
88-
if (typeof error !== 'object' || error === null) {
89-
return false
90-
}
91-
92-
const dbError = error as {
93-
code?: string
94-
constraint?: string
95-
message?: string
96-
}
97-
98-
return dbError.code === '23505'
99-
&& (
100-
dbError.constraint === 'events_event_id_unique'
101-
|| dbError.message?.includes('events_event_id_unique') === true
102-
)
103-
}
104-
105-
const isValidDeleteTag = (tag: string[]): boolean => {
106-
return tag.length >= 2
107-
&& tag[0] === EventTags.Event
108-
&& /^[0-9a-f]{64}$/.test(tag[1])
109-
}
110-
111-
const getDeleteTargetEventIds = (event: Event): EventId[] => {
112-
return event.tags.reduce((eventIds, tag) => {
113-
if (isValidDeleteTag(tag)) {
114-
eventIds.push(tag[1])
115-
}
116-
117-
return eventIds
118-
}, [] as EventId[])
119-
}
120-
121-
const isEventReplaceableForStorage = (event: Event): boolean => {
122-
return isReplaceableEvent(event) || isParameterizedReplaceableEvent(event)
123-
}
124-
125-
const getReplaceableEventDeduplication = (event: Event): string => {
126-
if (isParameterizedReplaceableEvent(event)) {
127-
const [, ...deduplication] = event.tags.find(
128-
(tag) => tag.length >= 2 && tag[0] === EventTags.Deduplication,
129-
) ?? [null, '']
130-
131-
return toJSON(deduplication)
132-
}
133-
134-
return toJSON([event.pubkey, event.kind])
135-
}
136-
137-
const getReplaceableEventKey = (event: Event): string => {
138-
return `${event.pubkey}:${event.kind}:${getReplaceableEventDeduplication(event)}`
139-
}
140-
141-
const toImportEventRow = (event: Event): ImportEventRow => {
142-
const expiresAt = getEventExpiration(event)
143-
144-
return {
145-
deleted_at: null,
146-
event_content: event.content,
147-
event_created_at: event.created_at,
148-
event_deduplication: (
149-
isReplaceableEvent(event) || isParameterizedReplaceableEvent(event)
150-
? getReplaceableEventDeduplication(event)
151-
: null
152-
),
153-
event_id: toBuffer(event.id),
154-
event_kind: event.kind,
155-
event_pubkey: toBuffer(event.pubkey),
156-
event_signature: toBuffer(event.sig),
157-
event_tags: toJSON(event.tags),
158-
expires_at: typeof expiresAt === 'number' ? expiresAt : null,
159-
}
160-
}
161-
162-
const applyDeleteEvents = async (
163-
transaction: Knex.Transaction,
164-
deleteEvent: Event,
165-
): Promise<void> => {
166-
const eventIds = getDeleteTargetEventIds(deleteEvent)
167-
if (!eventIds.length) {
168-
return
169-
}
170-
171-
await transaction('events')
172-
.where('event_pubkey', toBuffer(deleteEvent.pubkey))
173-
.whereIn('event_id', eventIds.map(toBuffer))
174-
.whereNull('deleted_at')
175-
.update({
176-
deleted_at: transaction.raw('now()'),
177-
})
178-
}
179-
180-
const insertRegularEvents = async (
181-
transaction: Knex.Transaction,
182-
events: Event[],
183-
): Promise<number> => {
184-
if (!events.length) {
185-
return 0
186-
}
187-
188-
const rows = events.map(toImportEventRow)
189-
190-
const result = await transaction('events')
191-
.insert(rows)
192-
.onConflict()
193-
.ignore()
194-
.returning('event_id')
195-
196-
return getAffectedRowCount(result)
197-
}
198-
199-
const filterOutExistingEventIds = async (
200-
transaction: Knex.Transaction,
201-
events: Event[],
202-
): Promise<Event[]> => {
203-
if (!events.length) {
204-
return []
205-
}
206-
207-
const existingRows = await transaction('events')
208-
.select('event_id')
209-
.whereIn('event_id', events.map((event) => toBuffer(event.id))) as Array<{ event_id: Buffer }>
210-
211-
const existingEventIds = new Set(existingRows.map((row) => row.event_id.toString('hex')))
212-
213-
return events.filter((event) => !existingEventIds.has(event.id))
214-
}
215-
216-
const upsertReplaceableEvents = async (
217-
transaction: Knex.Transaction,
218-
events: Event[],
219-
): Promise<number> => {
220-
if (!events.length) {
221-
return 0
222-
}
223-
224-
let pendingEvents = events
225-
226-
while (pendingEvents.length) {
227-
const deduplicatedByEventId = new Map<string, Event>()
228-
for (const event of pendingEvents) {
229-
deduplicatedByEventId.set(event.id, event)
230-
}
231-
232-
pendingEvents = Array.from(deduplicatedByEventId.values())
233-
234-
const rows = pendingEvents.map(toImportEventRow)
235-
236-
try {
237-
const result = await transaction('events')
238-
.insert(rows)
239-
.onConflict(transaction.raw(REPLACEABLE_EVENT_CONFLICT_TARGET))
240-
.merge([
241-
'deleted_at',
242-
'event_content',
243-
'event_created_at',
244-
'event_id',
245-
'event_signature',
246-
'event_tags',
247-
'expires_at',
248-
])
249-
.whereRaw('"events"."event_created_at" < "excluded"."event_created_at"')
250-
.returning('event_id')
251-
252-
return getAffectedRowCount(result)
253-
} catch (error) {
254-
if (!isEventIdUniqueViolation(error)) {
255-
throw error
256-
}
257-
258-
const filteredEvents = await filterOutExistingEventIds(transaction, pendingEvents)
259-
260-
if (filteredEvents.length === pendingEvents.length) {
261-
throw error
262-
}
263-
264-
pendingEvents = filteredEvents
265-
}
266-
}
267-
268-
return 0
269-
}
270-
27147
export const createEventBatchPersister =
272-
(dbClient: DatabaseClient) =>
48+
(eventRepository: IEventRepository) =>
27349
async (events: Event[]): Promise<number> => {
27450
if (!events.length) {
27551
return 0
27652
}
27753

278-
return dbClient.transaction(async (transaction) => {
279-
let inserted = 0
54+
let inserted = 0
28055

281-
let nonDeleteSegment: Event[] = []
282-
283-
const flushNonDeleteSegment = async () => {
284-
if (!nonDeleteSegment.length) {
285-
return
286-
}
287-
288-
const regularEvents: Event[] = []
289-
const replaceableEventsByKey = new Map<string, Event>()
290-
291-
for (const event of nonDeleteSegment) {
292-
if (isEventReplaceableForStorage(event)) {
293-
const deduplicationKey = getReplaceableEventKey(event)
294-
const existingEvent = replaceableEventsByKey.get(deduplicationKey)
295-
296-
if (!existingEvent || existingEvent.created_at < event.created_at) {
297-
replaceableEventsByKey.set(deduplicationKey, event)
298-
}
299-
300-
continue
301-
}
302-
303-
regularEvents.push(event)
304-
}
305-
306-
inserted += await insertRegularEvents(transaction, regularEvents)
56+
for (const event of events) {
57+
if (isEphemeralEvent(event)) {
58+
continue
59+
}
30760

308-
const upsertEvents = await filterOutExistingEventIds(
309-
transaction,
310-
Array.from(replaceableEventsByKey.values()),
61+
if (isDeleteEvent(event)) {
62+
const eventIdsToDelete = event.tags.reduce(
63+
(ids, tag) =>
64+
tag.length >= 2
65+
&& tag[0] === EventTags.Event
66+
&& /^[0-9a-f]{64}$/.test(tag[1])
67+
? [...ids, tag[1]]
68+
: ids,
69+
[] as string[]
31170
)
31271

313-
inserted += await upsertReplaceableEvents(transaction, upsertEvents)
314-
315-
nonDeleteSegment = []
316-
}
317-
318-
for (const event of events) {
319-
if (isEphemeralEvent(event)) {
320-
continue
72+
if (eventIdsToDelete.length) {
73+
await eventRepository.deleteByPubkeyAndIds(event.pubkey, eventIdsToDelete)
32174
}
32275

323-
if (isDeleteEvent(event)) {
324-
await flushNonDeleteSegment()
325-
326-
await applyDeleteEvents(transaction, event)
327-
328-
inserted += await insertRegularEvents(transaction, [event])
329-
330-
continue
331-
}
76+
inserted += await eventRepository.create(event)
77+
continue
78+
}
33279

333-
nonDeleteSegment.push(event)
80+
if (isReplaceableEvent(event) || isParameterizedReplaceableEvent(event)) {
81+
inserted += await eventRepository.upsert(event)
82+
continue
33483
}
33584

336-
await flushNonDeleteSegment()
85+
inserted += await eventRepository.create(event)
86+
}
33787

338-
return inserted
339-
})
88+
return inserted
34089
}
34190

34291
export class EventImportService {

0 commit comments

Comments
 (0)