Skip to content

Commit 81e8d30

Browse files
YashIIT0909Mohit-Davar
authored andcommitted
Fix: nip01 replaceable tiebreaker (#416)
* fix: update event upsert query to handle duplicate timestamps using event ID comparison * test: add NIP-01 tie-breaker integration test for identical timestamps
1 parent b172235 commit 81e8d30

File tree

11 files changed

+175
-36
lines changed

11 files changed

+175
-36
lines changed

src/@types/adapters.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export interface ICacheAdapter {
2222
getKey(key: string): Promise<string>
2323
hasKey(key: string): Promise<boolean>
2424
setKey(key: string, value: string): Promise<boolean>
25+
setKeyWithExpiry(key: string, value: string, ttl: number): Promise<boolean>
2526
addToSortedSet(key: string, set: Record<string, string> | Record<string, string>[]): Promise<number>
2627
removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number>
2728
getRangeFromSortedSet(key: string, start: number, stop: number): Promise<string[]>

src/adapters/redis-adapter.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,16 @@ export class RedisAdapter implements ICacheAdapter {
6060

6161
public async setKey(key: string, value: string): Promise<boolean> {
6262
await this.connection
63-
debug('get %s key', key)
63+
debug('set %s key', key)
6464
return 'OK' === await this.client.set(key, value)
6565
}
6666

67+
public async setKeyWithExpiry(key: string, value: string, ttl: number): Promise<boolean> {
68+
await this.connection
69+
debug('set %s key with expiry %d', key, ttl)
70+
return 'OK' === await this.client.set(key, value, { EX: ttl })
71+
}
72+
6773
public async removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number> {
6874
await this.connection
6975
debug('remove %d..%d range from sorted set %s', min, max, key)

src/constants/caching.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/**
2+
* States for user admission caching.
3+
*/
4+
export enum CacheAdmissionState {
5+
ADMITTED = 'admitted',
6+
NOT_ADMITTED = 'not-admitted',
7+
INSUFFICIENT_BALANCE = 'insufficient-balance',
8+
}

src/factories/message-handler-factory.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
1+
import { ICacheAdapter, IWebSocketAdapter } from '../@types/adapters'
12
import { IEventRepository, IUserRepository } from '../@types/repositories'
23
import { IncomingMessage, MessageType } from '../@types/messages'
34
import { createSettings } from './settings-factory'
45
import { EventMessageHandler } from '../handlers/event-message-handler'
56
import { eventStrategyFactory } from './event-strategy-factory'
6-
import { IWebSocketAdapter } from '../@types/adapters'
7+
import { getCacheClient } from '../cache/client'
8+
import { RedisAdapter } from '../adapters/redis-adapter'
79
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
810
import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler'
911
import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler'
1012

13+
let cacheAdapter: ICacheAdapter | undefined = undefined
14+
const getCache = (): ICacheAdapter => {
15+
if (!cacheAdapter) {
16+
cacheAdapter = new RedisAdapter(getCacheClient())
17+
}
18+
return cacheAdapter
19+
}
20+
1121
export const messageHandlerFactory = (
1222
eventRepository: IEventRepository,
1323
userRepository: IUserRepository,
@@ -22,6 +32,7 @@ export const messageHandlerFactory = (
2232
userRepository,
2333
createSettings,
2434
slidingWindowRateLimiterFactory,
35+
getCache(),
2536
)
2637
}
2738
case MessageType.REQ:

src/handlers/event-message-handler.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ import {
1515
} from '../utils/event'
1616
import { IEventRepository, IUserRepository } from '../@types/repositories'
1717
import { IEventStrategy, IMessageHandler } from '../@types/message-handlers'
18+
import { CacheAdmissionState } from '../constants/caching'
1819
import { createCommandResult } from '../utils/messages'
1920
import { createLogger } from '../factories/logger-factory'
2021
import { Factory } from '../@types/base'
22+
import { ICacheAdapter } from '../@types/adapters'
2123
import { IncomingEventMessage } from '../@types/messages'
2224
import { IRateLimiter } from '../@types/utils'
2325
import { IWebSocketAdapter } from '../@types/adapters'
@@ -33,6 +35,7 @@ export class EventMessageHandler implements IMessageHandler {
3335
protected readonly userRepository: IUserRepository,
3436
private readonly settings: () => Settings,
3537
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
38+
private readonly cache: ICacheAdapter,
3639
) {}
3740

3841
public async handleMessage(message: IncomingEventMessage): Promise<void> {
@@ -313,17 +316,45 @@ export class EventMessageHandler implements IMessageHandler {
313316
return
314317
}
315318

316-
// const hasKey = await this.cache.hasKey(`${event.pubkey}:is-admitted`)
317-
// TODO: use cache
319+
const cacheKey = `${event.pubkey}:is-admitted`
320+
321+
try {
322+
const cachedAdmission = await this.cache.getKey(cacheKey)
323+
switch (cachedAdmission) {
324+
case CacheAdmissionState.ADMITTED:
325+
debug('cache hit for %s admission: admitted', event.pubkey)
326+
return
327+
case CacheAdmissionState.NOT_ADMITTED:
328+
debug('cache hit for %s admission: blocked', event.pubkey)
329+
return 'blocked: pubkey not admitted'
330+
case CacheAdmissionState.INSUFFICIENT_BALANCE:
331+
debug('cache hit for %s admission: blocked (insufficient balance)', event.pubkey)
332+
return 'blocked: insufficient balance'
333+
default:
334+
break
335+
}
336+
} catch (error) {
337+
debug('cache error for %s: %o', event.pubkey, error)
338+
}
339+
318340
const user = await this.userRepository.findByPubkey(event.pubkey)
319341
if (!user || !user.isAdmitted) {
342+
this.cacheSet(cacheKey, CacheAdmissionState.NOT_ADMITTED, 60)
320343
return 'blocked: pubkey not admitted'
321344
}
322345

323346
const minBalance = currentSettings.limits?.event?.pubkey?.minBalance ?? 0n
324347
if (minBalance > 0n && user.balance < minBalance) {
348+
this.cacheSet(cacheKey, CacheAdmissionState.INSUFFICIENT_BALANCE, 60)
325349
return 'blocked: insufficient balance'
326350
}
351+
352+
this.cacheSet(cacheKey, CacheAdmissionState.ADMITTED, 300)
353+
}
354+
355+
private cacheSet(key: string, value: string, ttl: number): void {
356+
this.cache.setKeyWithExpiry(key, value, ttl)
357+
.catch((error) => debug('unable to cache %s: %o', key, error))
327358
}
328359

329360
protected addExpirationMetadata(event: Event): Event | ExpiringEvent {

src/repositories/event-repository.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,13 @@ export class EventRepository implements IEventRepository {
227227
)
228228
)
229229
.merge(omit(['event_pubkey', 'event_kind', 'event_deduplication'])(row))
230-
.where('events.event_created_at', '<', row.event_created_at)
230+
.where(function () {
231+
this.where('events.event_created_at', '<', row.event_created_at)
232+
.orWhere(function () {
233+
this.where('events.event_created_at', '=', row.event_created_at)
234+
.andWhere('events.event_id', '>', row.event_id)
235+
})
236+
})
231237

232238
return {
233239
then: <T1, T2>(onfulfilled: (value: number) => T1 | PromiseLike<T1>, onrejected: (reason: any) => T2 | PromiseLike<T2>) => query.then(prop('rowCount') as () => number).then(onfulfilled, onrejected),

test/integration/features/nip-16/nip-16.feature

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ Feature: NIP-16 Event treatment
99
When Alice subscribes to author Alice
1010
Then Alice receives 1 replaceable_event_0 event from Alice with content "updated" and EOSE
1111

12+
Scenario: Tie-breaker on Identical Timestamps
13+
Given someone called Alice
14+
When Alice sends two identically-timestamped replaceable_event_0 events where the second has a lower ID
15+
And Alice subscribes to author Alice
16+
Then Alice receives 1 replaceable_event_0 event from Alice matching the lower ID event and EOSE
17+
1218
Scenario: Charlie sends an ephemeral event
1319
Given someone called Charlie
1420
Given someone called Alice

test/integration/features/nip-16/nip-16.feature.ts

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import WebSocket from 'ws'
55
import { createEvent, sendEvent, waitForEventCount, waitForNextEvent } from '../helpers'
66
import { Event } from '../../../../src/@types/event'
77

8-
When(/^(\w+) sends a replaceable_event_0 event with content "([^"]+)"$/, async function(
8+
When(/^(\w+) sends a replaceable_event_0 event with content "([^"]+)"$/, async function (
99
name: string,
1010
content: string,
1111
) {
@@ -20,17 +20,17 @@ When(/^(\w+) sends a replaceable_event_0 event with content "([^"]+)"$/, async f
2020

2121
Then(
2222
/(\w+) receives a replaceable_event_0 event from (\w+) with content "([^"]+?)"/,
23-
async function(name: string, author: string, content: string) {
24-
const ws = this.parameters.clients[name] as WebSocket
25-
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
26-
const receivedEvent = await waitForNextEvent(ws, subscription.name)
23+
async function (name: string, author: string, content: string) {
24+
const ws = this.parameters.clients[name] as WebSocket
25+
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
26+
const receivedEvent = await waitForNextEvent(ws, subscription.name)
2727

28-
expect(receivedEvent.kind).to.equal(10000)
29-
expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey)
30-
expect(receivedEvent.content).to.equal(content)
31-
})
28+
expect(receivedEvent.kind).to.equal(10000)
29+
expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey)
30+
expect(receivedEvent.content).to.equal(content)
31+
})
3232

33-
Then(/(\w+) receives (\d+) replaceable_event_0 events? from (\w+) with content "([^"]+?)" and EOSE/, async function(
33+
Then(/(\w+) receives (\d+) replaceable_event_0 events? from (\w+) with content "([^"]+?)" and EOSE/, async function (
3434
name: string,
3535
count: string,
3636
author: string,
@@ -46,7 +46,7 @@ Then(/(\w+) receives (\d+) replaceable_event_0 events? from (\w+) with content "
4646
expect(events[0].content).to.equal(content)
4747
})
4848

49-
When(/^(\w+) sends a ephemeral_event_0 event with content "([^"]+)"$/, async function(
49+
When(/^(\w+) sends a ephemeral_event_0 event with content "([^"]+)"$/, async function (
5050
name: string,
5151
content: string,
5252
) {
@@ -61,23 +61,66 @@ When(/^(\w+) sends a ephemeral_event_0 event with content "([^"]+)"$/, async fun
6161

6262
Then(
6363
/(\w+) receives a ephemeral_event_0 event from (\w+) with content "([^"]+?)"/,
64-
async function(name: string, author: string, content: string) {
64+
async function (name: string, author: string, content: string) {
65+
const ws = this.parameters.clients[name] as WebSocket
66+
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
67+
const receivedEvent = await waitForNextEvent(ws, subscription.name)
68+
69+
expect(receivedEvent.kind).to.equal(20000)
70+
expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey)
71+
expect(receivedEvent.content).to.equal(content)
72+
})
73+
74+
Then(/(\w+) receives (\d+) ephemeral_event_0 events? and EOSE/, async function (
75+
name: string,
76+
count: string,
77+
) {
6578
const ws = this.parameters.clients[name] as WebSocket
6679
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
67-
const receivedEvent = await waitForNextEvent(ws, subscription.name)
80+
const events = await waitForEventCount(ws, subscription.name, Number(count), true)
6881

69-
expect(receivedEvent.kind).to.equal(20000)
70-
expect(receivedEvent.pubkey).to.equal(this.parameters.identities[author].pubkey)
71-
expect(receivedEvent.content).to.equal(content)
82+
expect(events.length).to.equal(Number(count))
7283
})
7384

74-
Then(/(\w+) receives (\d+) ephemeral_event_0 events? and EOSE/, async function(
85+
When(/^(\w+) sends two identically-timestamped replaceable_event_0 events where the second has a lower ID$/, async function (
86+
name: string
87+
) {
88+
const ws = this.parameters.clients[name] as WebSocket
89+
const { pubkey, privkey } = this.parameters.identities[name]
90+
91+
const commonTimestamp = Math.floor(Date.now() / 1000)
92+
93+
const event1 = await createEvent({ pubkey, kind: 10000, content: 'first content', created_at: commonTimestamp }, privkey)
94+
95+
let nonce = 0
96+
let event2: Event
97+
for (; ;) {
98+
event2 = await createEvent({ pubkey, kind: 10000, content: `second content ${nonce++}`, created_at: commonTimestamp }, privkey)
99+
100+
if (event2.id < event1.id) {
101+
break
102+
}
103+
}
104+
105+
await sendEvent(ws, event1)
106+
await sendEvent(ws, event2)
107+
108+
this.parameters.events[name].push(event1, event2)
109+
this.parameters.lowerIdEventContent = event2.content
110+
})
111+
112+
Then(/(\w+) receives (\d+) replaceable_event_0 event from (\w+) matching the lower ID event and EOSE/, async function (
75113
name: string,
76114
count: string,
115+
author: string,
77116
) {
78117
const ws = this.parameters.clients[name] as WebSocket
79118
const subscription = this.parameters.subscriptions[name][this.parameters.subscriptions[name].length - 1]
80119
const events = await waitForEventCount(ws, subscription.name, Number(count), true)
81120

82121
expect(events.length).to.equal(Number(count))
122+
expect(events[0].kind).to.equal(10000)
123+
expect(events[0].pubkey).to.equal(this.parameters.identities[author].pubkey)
124+
expect(events[0].content).to.equal(this.parameters.lowerIdEventContent)
83125
})
126+

test/unit/handlers/event-message-handler.spec.ts

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ chai.use(chaiAsPromised)
1010

1111
import { EventLimits, Settings } from '../../../src/@types/settings'
1212
import { IncomingEventMessage, MessageType } from '../../../src/@types/messages'
13+
import { CacheAdmissionState } from '../../../src/constants/caching'
1314
import { Event } from '../../../src/@types/event'
1415
import { EventKinds } from '../../../src/constants/base'
1516
import { EventMessageHandler } from '../../../src/handlers/event-message-handler'
@@ -88,7 +89,8 @@ describe('EventMessageHandler', () => {
8889
() => ({
8990
info: { relay_url: 'relay_url' },
9091
}) as any,
91-
() => ({ hit: async () => false })
92+
() => ({ hit: async () => false }),
93+
{ hasKey: async () => false, setKey: async () => true } as any,
9294
)
9395
})
9496

@@ -262,7 +264,8 @@ describe('EventMessageHandler', () => {
262264
{ hasActiveRequestToVanish: async () => false } as any,
263265
userRepository,
264266
() => settings,
265-
() => ({ hit: async () => false })
267+
() => ({ hit: async () => false }),
268+
{ hasKey: async () => false, setKey: async () => true } as any,
266269
)
267270
})
268271

@@ -738,7 +741,8 @@ describe('EventMessageHandler', () => {
738741
{ hasActiveRequestToVanish: async () => false } as any,
739742
userRepository,
740743
() => settings,
741-
() => ({ hit: rateLimiterHitStub })
744+
() => ({ hit: rateLimiterHitStub }),
745+
{ hasKey: async () => false, setKey: async () => true, setKeyWithExpiry: async () => true } as any,
742746
)
743747
})
744748

@@ -953,6 +957,7 @@ describe('EventMessageHandler', () => {
953957
let webSocket: IWebSocketAdapter
954958
let getRelayPublicKeyStub: SinonStub
955959
let userRepositoryFindByPubkeyStub: SinonStub
960+
let cacheGetKeyStub: SinonStub
956961

957962
beforeEach(() => {
958963
settings = {
@@ -994,6 +999,7 @@ describe('EventMessageHandler', () => {
994999
getRelayPublicKeyStub = sandbox.stub(EventMessageHandler.prototype, 'getRelayPublicKey' as any)
9951000
getClientAddressStub = sandbox.stub()
9961001
userRepositoryFindByPubkeyStub = sandbox.stub()
1002+
cacheGetKeyStub = sandbox.stub().resolves(null)
9971003
webSocket = {
9981004
getClientAddress: getClientAddressStub,
9991005
} as any
@@ -1006,11 +1012,18 @@ describe('EventMessageHandler', () => {
10061012
{ hasActiveRequestToVanish: async () => false } as any,
10071013
userRepository,
10081014
() => settings,
1009-
() => ({ hit: async () => false })
1015+
() => ({ hit: async () => false }),
1016+
{
1017+
hasKey: async () => false,
1018+
getKey: cacheGetKeyStub,
1019+
setKey: async () => true,
1020+
setKeyWithExpiry: async () => true,
1021+
setKeyExpiry: async () => undefined,
1022+
} as any,
10101023
)
10111024
})
10121025

1013-
it ('fulfills with undefined if payments are disabled', async () => {
1026+
it('fulfills with undefined if payments are disabled', async () => {
10141027
settings.payments.enabled = false
10151028

10161029
return expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
@@ -1089,12 +1102,6 @@ describe('EventMessageHandler', () => {
10891102
return expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: pubkey not admitted')
10901103
})
10911104

1092-
it('fulfills with reason if user is not admitted', async () => {
1093-
userRepositoryFindByPubkeyStub.resolves({ isAdmitted: false })
1094-
1095-
return expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: pubkey not admitted')
1096-
})
1097-
10981105
it('fulfills with reason if user does not meet minimum balance', async () => {
10991106
settings.limits.event.pubkey.minBalance = 1000n
11001107
userRepositoryFindByPubkeyStub.resolves({ isAdmitted: true, balance: 999n })
@@ -1108,5 +1115,23 @@ describe('EventMessageHandler', () => {
11081115

11091116
return expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
11101117
})
1118+
1119+
it('fulfills with undefined if user is admitted in cache', async () => {
1120+
cacheGetKeyStub.resolves(CacheAdmissionState.ADMITTED)
1121+
1122+
return expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
1123+
})
1124+
1125+
it('fulfills with reason if user is blocked in cache', async () => {
1126+
cacheGetKeyStub.resolves(CacheAdmissionState.NOT_ADMITTED)
1127+
1128+
return expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: pubkey not admitted')
1129+
})
1130+
1131+
it('fulfills with reason if user has insufficient balance in cache', async () => {
1132+
cacheGetKeyStub.resolves(CacheAdmissionState.INSUFFICIENT_BALANCE)
1133+
1134+
return expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: insufficient balance')
1135+
})
11111136
})
11121137
})

0 commit comments

Comments
 (0)