From aee264caeb3555192df4d161c388d612fbf532eb Mon Sep 17 00:00:00 2001 From: Jeffrieh Date: Wed, 1 Apr 2026 10:56:17 +0200 Subject: [PATCH 1/2] feat: state syncing --- core/protobufs/src/effect.proto | 42 + core/protobufs/src/effect.ts | 724 ++++++++++++++++++ .../src/common/stores/paymentStore.spec.ts | 52 +- modules/manager/src/main.ts | 93 +++ .../src/modules/createPaymentManager.spec.ts | 134 +--- .../src/modules/createPaymentManager.ts | 30 +- .../src/modules/createWorkerManager.spec.ts | 18 +- .../src/stores/managerTaskStore.spec.ts | 5 +- .../manager/src/stores/managerTaskStore.ts | 87 +++ modules/payment/tests/circuits.spec.ts | 9 +- modules/payment/tests/program.test.ts | 34 +- modules/worker/src/index.ts | 1 + modules/worker/src/main.ts | 80 +- .../src/modules/createPaymentWorker.spec.ts | 31 +- .../worker/src/stores/workerSyncStateStore.ts | 93 +++ modules/worker/src/stores/workerTaskStore.ts | 28 + .../src/sync/applyWorkerSyncResponse.ts | 47 ++ modules/worker/src/sync/runConnectFlow.ts | 17 + modules/worker/src/sync/sync.connect.spec.ts | 51 ++ packages/library/tests/protocol.spec.ts | 21 +- vitest.config.ts | 20 + 21 files changed, 1403 insertions(+), 214 deletions(-) create mode 100644 modules/worker/src/stores/workerSyncStateStore.ts create mode 100644 modules/worker/src/sync/applyWorkerSyncResponse.ts create mode 100644 modules/worker/src/sync/runConnectFlow.ts create mode 100644 modules/worker/src/sync/sync.connect.spec.ts create mode 100644 vitest.config.ts diff --git a/core/protobufs/src/effect.proto b/core/protobufs/src/effect.proto index 02d0932a..a5ddfe9e 100644 --- a/core/protobufs/src/effect.proto +++ b/core/protobufs/src/effect.proto @@ -43,6 +43,46 @@ message RequestToWorkResponse { string peer = 3; } +message WorkerSyncRequest { + uint32 timestamp = 1; + string worker_id = 2; + optional uint64 cursor = 3; + repeated string scopes = 4; + optional uint32 limit = 5; +} + +message WorkerSyncStatus { + string state = 1; + uint32 last_activity = 2; +} + +message WorkerSyncTask { + string task_id = 1; + string status = 2; + uint32 last_event_at = 3; + Task task = 4; +} + +message WorkerSyncPayment { + string payment_id = 1; + string status = 2; + string amount = 3; + optional string task_id = 4; + uint32 created_at = 5; + Payment payment = 6; +} + +message WorkerSyncResponse { + uint32 server_time = 1; + string worker_id = 2; + uint64 cursor = 3; + string manager_peer_id = 4; + optional WorkerSyncStatus status = 5; + repeated string capabilities = 6; + repeated WorkerSyncTask tasks = 7; + repeated WorkerSyncPayment payments = 8; +} + message EffectProtocolMessage { oneof message { Task task = 1; @@ -62,5 +102,7 @@ message EffectProtocolMessage { EffectIdentifyRequest identify_request = 15; EffectIdentifyResponse identify_response = 16; BulkProofRequest bulk_proof_request = 17; + WorkerSyncRequest worker_sync_request = 18; + WorkerSyncResponse worker_sync_response = 19; } } diff --git a/core/protobufs/src/effect.ts b/core/protobufs/src/effect.ts index c949f3f5..1ceca400 100644 --- a/core/protobufs/src/effect.ts +++ b/core/protobufs/src/effect.ts @@ -545,6 +545,554 @@ export namespace RequestToWorkResponse { } } +export interface WorkerSyncRequest { + timestamp: number + workerId: string + cursor?: bigint + scopes: string[] + limit?: number +} + +export namespace WorkerSyncRequest { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.timestamp != null && obj.timestamp !== 0)) { + w.uint32(8) + w.uint32(obj.timestamp) + } + + if ((obj.workerId != null && obj.workerId !== '')) { + w.uint32(18) + w.string(obj.workerId) + } + + if (obj.cursor != null) { + w.uint32(24) + w.uint64(obj.cursor) + } + + if (obj.scopes != null) { + for (const value of obj.scopes) { + w.uint32(34) + w.string(value) + } + } + + if (obj.limit != null) { + w.uint32(40) + w.uint32(obj.limit) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + timestamp: 0, + workerId: '', + scopes: [] + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.timestamp = reader.uint32() + break + } + case 2: { + obj.workerId = reader.string() + break + } + case 3: { + obj.cursor = reader.uint64() + break + } + case 4: { + if (opts.limits?.scopes != null && obj.scopes.length === opts.limits.scopes) { + throw new MaxLengthError('Decode error - map field "scopes" had too many elements') + } + + obj.scopes.push(reader.string()) + break + } + case 5: { + obj.limit = reader.uint32() + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncRequest.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncRequest => { + return decodeMessage(buf, WorkerSyncRequest.codec(), opts) + } +} + +export interface WorkerSyncStatus { + state: string + lastActivity: number +} + +export namespace WorkerSyncStatus { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.state != null && obj.state !== '')) { + w.uint32(10) + w.string(obj.state) + } + + if ((obj.lastActivity != null && obj.lastActivity !== 0)) { + w.uint32(16) + w.uint32(obj.lastActivity) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + state: '', + lastActivity: 0 + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.state = reader.string() + break + } + case 2: { + obj.lastActivity = reader.uint32() + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncStatus.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncStatus => { + return decodeMessage(buf, WorkerSyncStatus.codec(), opts) + } +} + +export interface WorkerSyncTask { + taskId: string + status: string + lastEventAt: number + task?: Task +} + +export namespace WorkerSyncTask { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.taskId != null && obj.taskId !== '')) { + w.uint32(10) + w.string(obj.taskId) + } + + if ((obj.status != null && obj.status !== '')) { + w.uint32(18) + w.string(obj.status) + } + + if ((obj.lastEventAt != null && obj.lastEventAt !== 0)) { + w.uint32(24) + w.uint32(obj.lastEventAt) + } + + if (obj.task != null) { + w.uint32(34) + Task.codec().encode(obj.task, w) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + taskId: '', + status: '', + lastEventAt: 0 + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.taskId = reader.string() + break + } + case 2: { + obj.status = reader.string() + break + } + case 3: { + obj.lastEventAt = reader.uint32() + break + } + case 4: { + obj.task = Task.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.task + }) + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncTask.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncTask => { + return decodeMessage(buf, WorkerSyncTask.codec(), opts) + } +} + +export interface WorkerSyncPayment { + paymentId: string + status: string + amount: string + taskId?: string + createdAt: number + payment?: Payment +} + +export namespace WorkerSyncPayment { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.paymentId != null && obj.paymentId !== '')) { + w.uint32(10) + w.string(obj.paymentId) + } + + if ((obj.status != null && obj.status !== '')) { + w.uint32(18) + w.string(obj.status) + } + + if ((obj.amount != null && obj.amount !== '')) { + w.uint32(26) + w.string(obj.amount) + } + + if (obj.taskId != null) { + w.uint32(34) + w.string(obj.taskId) + } + + if ((obj.createdAt != null && obj.createdAt !== 0)) { + w.uint32(40) + w.uint32(obj.createdAt) + } + + if (obj.payment != null) { + w.uint32(50) + Payment.codec().encode(obj.payment, w) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + paymentId: '', + status: '', + amount: '', + createdAt: 0 + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.paymentId = reader.string() + break + } + case 2: { + obj.status = reader.string() + break + } + case 3: { + obj.amount = reader.string() + break + } + case 4: { + obj.taskId = reader.string() + break + } + case 5: { + obj.createdAt = reader.uint32() + break + } + case 6: { + obj.payment = Payment.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.payment + }) + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncPayment.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncPayment => { + return decodeMessage(buf, WorkerSyncPayment.codec(), opts) + } +} + +export interface WorkerSyncResponse { + serverTime: number + workerId: string + cursor: bigint + managerPeerId: string + status?: WorkerSyncStatus + capabilities: string[] + tasks: WorkerSyncTask[] + payments: WorkerSyncPayment[] +} + +export namespace WorkerSyncResponse { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.serverTime != null && obj.serverTime !== 0)) { + w.uint32(8) + w.uint32(obj.serverTime) + } + + if ((obj.workerId != null && obj.workerId !== '')) { + w.uint32(18) + w.string(obj.workerId) + } + + if ((obj.cursor != null && obj.cursor !== 0n)) { + w.uint32(24) + w.uint64(obj.cursor) + } + + if ((obj.managerPeerId != null && obj.managerPeerId !== '')) { + w.uint32(34) + w.string(obj.managerPeerId) + } + + if (obj.status != null) { + w.uint32(42) + WorkerSyncStatus.codec().encode(obj.status, w) + } + + if (obj.capabilities != null) { + for (const value of obj.capabilities) { + w.uint32(50) + w.string(value) + } + } + + if (obj.tasks != null) { + for (const value of obj.tasks) { + w.uint32(58) + WorkerSyncTask.codec().encode(value, w) + } + } + + if (obj.payments != null) { + for (const value of obj.payments) { + w.uint32(66) + WorkerSyncPayment.codec().encode(value, w) + } + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + serverTime: 0, + workerId: '', + cursor: 0n, + managerPeerId: '', + capabilities: [], + tasks: [], + payments: [] + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.serverTime = reader.uint32() + break + } + case 2: { + obj.workerId = reader.string() + break + } + case 3: { + obj.cursor = reader.uint64() + break + } + case 4: { + obj.managerPeerId = reader.string() + break + } + case 5: { + obj.status = WorkerSyncStatus.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.status + }) + break + } + case 6: { + if (opts.limits?.capabilities != null && obj.capabilities.length === opts.limits.capabilities) { + throw new MaxLengthError('Decode error - map field "capabilities" had too many elements') + } + + obj.capabilities.push(reader.string()) + break + } + case 7: { + if (opts.limits?.tasks != null && obj.tasks.length === opts.limits.tasks) { + throw new MaxLengthError('Decode error - map field "tasks" had too many elements') + } + + obj.tasks.push(WorkerSyncTask.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.tasks$ + })) + break + } + case 8: { + if (opts.limits?.payments != null && obj.payments.length === opts.limits.payments) { + throw new MaxLengthError('Decode error - map field "payments" had too many elements') + } + + obj.payments.push(WorkerSyncPayment.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.payments$ + })) + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WorkerSyncResponse.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WorkerSyncResponse => { + return decodeMessage(buf, WorkerSyncResponse.codec(), opts) + } +} + export interface EffectProtocolMessage { task?: Task taskAccepted?: TaskAccepted @@ -563,6 +1111,8 @@ export interface EffectProtocolMessage { identifyRequest?: EffectIdentifyRequest identifyResponse?: EffectIdentifyResponse bulkProofRequest?: BulkProofRequest + workerSyncRequest?: WorkerSyncRequest + workerSyncResponse?: WorkerSyncResponse } export namespace EffectProtocolMessage { @@ -577,7 +1127,51 @@ export namespace EffectProtocolMessage { obj = { ...obj } + if (obj.workerSyncResponse != null) { + obj.workerSyncRequest = undefined + obj.bulkProofRequest = undefined + obj.identifyResponse = undefined + obj.identifyRequest = undefined + obj.requestToWorkResponse = undefined + obj.requestToWork = undefined + obj.ack = undefined + obj.error = undefined + obj.templateResponse = undefined + obj.templateRequest = undefined + obj.proofResponse = undefined + obj.proofRequest = undefined + obj.payoutRequest = undefined + obj.payment = undefined + obj.taskCompleted = undefined + obj.taskRejected = undefined + obj.taskAccepted = undefined + obj.task = undefined + } + + if (obj.workerSyncRequest != null) { + obj.workerSyncResponse = undefined + obj.bulkProofRequest = undefined + obj.identifyResponse = undefined + obj.identifyRequest = undefined + obj.requestToWorkResponse = undefined + obj.requestToWork = undefined + obj.ack = undefined + obj.error = undefined + obj.templateResponse = undefined + obj.templateRequest = undefined + obj.proofResponse = undefined + obj.proofRequest = undefined + obj.payoutRequest = undefined + obj.payment = undefined + obj.taskCompleted = undefined + obj.taskRejected = undefined + obj.taskAccepted = undefined + obj.task = undefined + } + if (obj.bulkProofRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined obj.requestToWorkResponse = undefined @@ -597,6 +1191,8 @@ export namespace EffectProtocolMessage { } if (obj.identifyResponse != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyRequest = undefined obj.requestToWorkResponse = undefined @@ -616,6 +1212,8 @@ export namespace EffectProtocolMessage { } if (obj.identifyRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.requestToWorkResponse = undefined @@ -635,6 +1233,8 @@ export namespace EffectProtocolMessage { } if (obj.requestToWorkResponse != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -654,6 +1254,8 @@ export namespace EffectProtocolMessage { } if (obj.requestToWork != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -673,6 +1275,8 @@ export namespace EffectProtocolMessage { } if (obj.ack != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -692,6 +1296,8 @@ export namespace EffectProtocolMessage { } if (obj.error != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -711,6 +1317,8 @@ export namespace EffectProtocolMessage { } if (obj.templateResponse != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -730,6 +1338,8 @@ export namespace EffectProtocolMessage { } if (obj.templateRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -749,6 +1359,8 @@ export namespace EffectProtocolMessage { } if (obj.proofResponse != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -768,6 +1380,8 @@ export namespace EffectProtocolMessage { } if (obj.proofRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -787,6 +1401,8 @@ export namespace EffectProtocolMessage { } if (obj.payoutRequest != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -806,6 +1422,8 @@ export namespace EffectProtocolMessage { } if (obj.payment != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -825,6 +1443,8 @@ export namespace EffectProtocolMessage { } if (obj.taskCompleted != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -844,6 +1464,8 @@ export namespace EffectProtocolMessage { } if (obj.taskRejected != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -863,6 +1485,8 @@ export namespace EffectProtocolMessage { } if (obj.taskAccepted != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -882,6 +1506,8 @@ export namespace EffectProtocolMessage { } if (obj.task != null) { + obj.workerSyncResponse = undefined + obj.workerSyncRequest = undefined obj.bulkProofRequest = undefined obj.identifyResponse = undefined obj.identifyRequest = undefined @@ -985,6 +1611,16 @@ export namespace EffectProtocolMessage { BulkProofRequest.codec().encode(obj.bulkProofRequest, w) } + if (obj.workerSyncRequest != null) { + w.uint32(146) + WorkerSyncRequest.codec().encode(obj.workerSyncRequest, w) + } + + if (obj.workerSyncResponse != null) { + w.uint32(154) + WorkerSyncResponse.codec().encode(obj.workerSyncResponse, w) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -1099,6 +1735,18 @@ export namespace EffectProtocolMessage { }) break } + case 18: { + obj.workerSyncRequest = WorkerSyncRequest.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.workerSyncRequest + }) + break + } + case 19: { + obj.workerSyncResponse = WorkerSyncResponse.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.workerSyncResponse + }) + break + } default: { reader.skipType(tag & 7) break @@ -1106,7 +1754,51 @@ export namespace EffectProtocolMessage { } } + if (obj.workerSyncResponse != null) { + delete obj.workerSyncRequest + delete obj.bulkProofRequest + delete obj.identifyResponse + delete obj.identifyRequest + delete obj.requestToWorkResponse + delete obj.requestToWork + delete obj.ack + delete obj.error + delete obj.templateResponse + delete obj.templateRequest + delete obj.proofResponse + delete obj.proofRequest + delete obj.payoutRequest + delete obj.payment + delete obj.taskCompleted + delete obj.taskRejected + delete obj.taskAccepted + delete obj.task + } + + if (obj.workerSyncRequest != null) { + delete obj.workerSyncResponse + delete obj.bulkProofRequest + delete obj.identifyResponse + delete obj.identifyRequest + delete obj.requestToWorkResponse + delete obj.requestToWork + delete obj.ack + delete obj.error + delete obj.templateResponse + delete obj.templateRequest + delete obj.proofResponse + delete obj.proofRequest + delete obj.payoutRequest + delete obj.payment + delete obj.taskCompleted + delete obj.taskRejected + delete obj.taskAccepted + delete obj.task + } + if (obj.bulkProofRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.identifyResponse delete obj.identifyRequest delete obj.requestToWorkResponse @@ -1126,6 +1818,8 @@ export namespace EffectProtocolMessage { } if (obj.identifyResponse != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyRequest delete obj.requestToWorkResponse @@ -1145,6 +1839,8 @@ export namespace EffectProtocolMessage { } if (obj.identifyRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.requestToWorkResponse @@ -1164,6 +1860,8 @@ export namespace EffectProtocolMessage { } if (obj.requestToWorkResponse != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1183,6 +1881,8 @@ export namespace EffectProtocolMessage { } if (obj.requestToWork != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1202,6 +1902,8 @@ export namespace EffectProtocolMessage { } if (obj.ack != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1221,6 +1923,8 @@ export namespace EffectProtocolMessage { } if (obj.error != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1240,6 +1944,8 @@ export namespace EffectProtocolMessage { } if (obj.templateResponse != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1259,6 +1965,8 @@ export namespace EffectProtocolMessage { } if (obj.templateRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1278,6 +1986,8 @@ export namespace EffectProtocolMessage { } if (obj.proofResponse != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1297,6 +2007,8 @@ export namespace EffectProtocolMessage { } if (obj.proofRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1316,6 +2028,8 @@ export namespace EffectProtocolMessage { } if (obj.payoutRequest != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1335,6 +2049,8 @@ export namespace EffectProtocolMessage { } if (obj.payment != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1354,6 +2070,8 @@ export namespace EffectProtocolMessage { } if (obj.taskCompleted != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1373,6 +2091,8 @@ export namespace EffectProtocolMessage { } if (obj.taskRejected != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1392,6 +2112,8 @@ export namespace EffectProtocolMessage { } if (obj.taskAccepted != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest @@ -1411,6 +2133,8 @@ export namespace EffectProtocolMessage { } if (obj.task != null) { + delete obj.workerSyncResponse + delete obj.workerSyncRequest delete obj.bulkProofRequest delete obj.identifyResponse delete obj.identifyRequest diff --git a/core/protocol/src/common/stores/paymentStore.spec.ts b/core/protocol/src/common/stores/paymentStore.spec.ts index 01fad924..38af04a0 100644 --- a/core/protocol/src/common/stores/paymentStore.spec.ts +++ b/core/protocol/src/common/stores/paymentStore.spec.ts @@ -6,6 +6,23 @@ import { createDataStore } from "@effectai/test-utils"; import type { Payment } from "@effectai/protobufs"; import { ulid } from "ulid"; +const createMockPayment = (nonce: bigint, amount = 100n): Payment => ({ + id: ulid(), + version: 1, + nonce, + amount, + recipient: "recipient-1", + paymentAccount: "payment-account-1", + publicKey: "manager-public-key-1", + signature: { + R8: { + R8_1: "1", + R8_2: "2", + }, + S: "3", + }, +}); + describe("createPaymentStore", () => { let datastore: Datastore; let paymentStore: ReturnType; @@ -24,11 +41,7 @@ describe("createPaymentStore", () => { describe("create", () => { it("should create a payment record", async () => { - const payment: Payment = { - id: ulid(), - nonce: 1n, - amount: 100n, - }; + const payment: Payment = createMockPayment(1n); const result = await paymentStore.create({ peerId: "peer1", @@ -46,12 +59,7 @@ describe("createPaymentStore", () => { }); it("should store payment under correct key", async () => { - const payment: Payment = { - id: ulid(), - nonce: 2n, - amount: 200n, - // ... other payment fields - }; + const payment: Payment = createMockPayment(2n, 200n); await paymentStore.create({ peerId: "peer1", @@ -64,6 +72,8 @@ describe("createPaymentStore", () => { it("should return undefined when no payments exist", async () => { const highestNonce = await paymentStore.getHighestNonce({ peerId: "peer1", + managerPublicKey: "manager-public-key-1", + recipient: "recipient-1", }); expect(highestNonce).toBe(0n); }); @@ -71,35 +81,33 @@ describe("createPaymentStore", () => { it("should return the highest nonce for a peer", async () => { // Create multiple payments for peer1 await paymentStore.create({ - id: ulid(), peerId: "peer1", - payment: { id: ulid(), nonce: 3n, amount: 100n }, + payment: createMockPayment(3n, 100n), }); await new Promise((resolve) => setTimeout(resolve, 10)); await paymentStore.create({ - id: ulid(), peerId: "peer1", - payment: { id: ulid(), nonce: 7n, amount: 300n }, + payment: createMockPayment(7n, 300n), }); await new Promise((resolve) => setTimeout(resolve, 10)); await paymentStore.create({ - id: ulid(), peerId: "peer1", - payment: { id: ulid(), nonce: 2n, amount: 200n }, + payment: createMockPayment(2n, 200n), }); await new Promise((resolve) => setTimeout(resolve, 10)); await paymentStore.create({ - id: ulid(), peerId: "peer1", - payment: { id: ulid(), nonce: 16n, amount: 200n }, + payment: createMockPayment(16n, 200n), }); await new Promise((resolve) => setTimeout(resolve, 10)); const highestNonce = await paymentStore.getHighestNonce({ peerId: "peer1", + managerPublicKey: "manager-public-key-1", + recipient: "recipient-1", }); expect(highestNonce).toBe(16n); @@ -110,13 +118,15 @@ describe("createPaymentStore", () => { for (let i = 0; i < 25; i += 4) { await paymentStore.create({ peerId: "peer1", - payment: { id: ulid(), nonce: BigInt(i), amount: BigInt(i) }, + payment: createMockPayment(BigInt(i), BigInt(i)), }); await new Promise((resolve) => setTimeout(resolve, 10)); } const payments = await paymentStore.getFrom({ peerId: "peer1", + publicKey: "manager-public-key-1", + recipient: "recipient-1", nonce: 13, }); @@ -130,7 +140,7 @@ describe("createPaymentStore", () => { for (let i = 0; i < n; i++) { await paymentStore.create({ peerId: "peer1", - payment: { id: ulid(), nonce: i, amount: BigInt(i) }, + payment: createMockPayment(BigInt(i), BigInt(i)), }); await new Promise((resolve) => setTimeout(resolve, 10)); diff --git a/modules/manager/src/main.ts b/modules/manager/src/main.ts index a5531f4f..e15cc537 100644 --- a/modules/manager/src/main.ts +++ b/modules/manager/src/main.ts @@ -250,6 +250,99 @@ export const createManager = async ({ }; }, ) + .onMessage("workerSyncRequest", async (syncRequest, { peerId }) => { + const workerId = syncRequest.workerId || peerId.toString(); + + if (workerId !== peerId.toString()) { + throw new Error("Worker ID mismatch"); + } + + const scopes = + syncRequest.scopes && syncRequest.scopes.length > 0 + ? new Set(syncRequest.scopes) + : new Set(["status", "capabilities", "tasks", "payments"]); + const limit = syncRequest.limit ?? 200; + const cursor = + typeof syncRequest.cursor === "bigint" + ? Number(syncRequest.cursor) + : 0; + const now = Math.floor(Date.now() / 1000); + const worker = await workerManager.getWorker(workerId); + + const response: EffectProtocolMessage = { + workerSyncResponse: { + serverTime: now, + workerId, + cursor: BigInt(now), + managerPeerId: entity.getPeerId().toString(), + status: + scopes.has("status") && worker + ? { + state: workerManager.workerQueue.queue.includes(workerId) + ? "connected" + : "disconnected", + lastActivity: worker.state.lastActivity, + } + : undefined, + capabilities: + scopes.has("capabilities") && worker + ? worker.state.capabilities.concat( + worker.state.managerCapabilities || [], + ) + : [], + tasks: [], + payments: [], + }, + }; + + if (scopes.has("tasks")) { + const tasks = await taskStore.listByWorker({ workerId, limit }); + for (const taskRecord of tasks) { + const lastEventAt = + taskRecord.events[taskRecord.events.length - 1]?.timestamp ?? 0; + if (cursor && lastEventAt <= cursor) { + continue; + } + + response.workerSyncResponse?.tasks.push({ + taskId: taskRecord.state.id, + status: taskRecord.syncStatus, + lastEventAt, + task: taskRecord.state, + }); + } + } + + if (scopes.has("payments")) { + const payments = await paymentStore.all({ + prefix: `payments/${workerId}`, + limit, + }); + + for (const paymentRecord of payments) { + const lastEventAt = + paymentRecord.events[paymentRecord.events.length - 1]?.timestamp ?? + 0; + if (cursor && lastEventAt <= cursor) { + continue; + } + + const createdAt = + paymentRecord.events.find((event) => event.type === "create") + ?.timestamp ?? lastEventAt; + + response.workerSyncResponse?.payments.push({ + paymentId: paymentRecord.state.id, + status: "created", + amount: paymentRecord.state.amount.toString(), + createdAt, + payment: paymentRecord.state, + }); + } + } + + return response; + }) .onMessage("task", async (task, { peerId }) => { await taskManager.createTask({ task, diff --git a/modules/manager/src/modules/createPaymentManager.spec.ts b/modules/manager/src/modules/createPaymentManager.spec.ts index 627d6d08..399f0f50 100644 --- a/modules/manager/src/modules/createPaymentManager.spec.ts +++ b/modules/manager/src/modules/createPaymentManager.spec.ts @@ -2,165 +2,61 @@ import { Keypair, PublicKey } from "@solana/web3.js"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { createPaymentManager } from "./createPaymentManager.js"; -import { signPayment } from "../utils.js"; -import { ulid } from "ulid"; - describe("createPaymentManager", () => { - let mockPeerStore: any; let mockPrivateKey: any; let mockWorkerManager: any; let mockPaymentStore: any; - let mockPeerId: PeerId; - let peerData: any; + let mockPeerId: any; let mockRecipient: PublicKey; - vi.mock(import("../utils.js"), async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - computePaymentId: vi.fn(), - }; - }); - beforeEach(() => { mockPeerId = { toString: () => "12D3KooWMockedPeerID", - } as PeerId; - - mockRecipient = new Keypair().publicKey; - - peerData = { - metadata: new Map(), }; - mockPeerStore = { - get: vi.fn().mockResolvedValue(peerData), - }; + mockRecipient = new Keypair().publicKey; mockPaymentStore = { - all: vi.fn(), - has: vi.fn(), - get: vi.fn(), - put: vi.fn(), - delete: vi.fn(), create: vi.fn(), }; mockWorkerManager = { getWorker: vi.fn().mockResolvedValue({ state: { - peerId: mockPeerId, - nonce: 5n, + recipient: mockRecipient.toBase58(), + nonce: 1n, }, }), - selectWorker: vi.fn(() => mockPeerId), updateWorkerState: vi.fn(), + workerQueue: { queue: [mockPeerId.toString()] }, }; mockPrivateKey = { raw: new Uint8Array(64).fill(1), }; - - vi.mocked(mockWorkerManager.getWorker).mockResolvedValue({ - state: { - recipient: "GGqak36ECpZP5HbZse41bynygPR2ciYsTVPsriocqjWH", - nonce: 1n, - }, - }); }); - it("generates a signed payment object", async () => { + it("generates a signed payment object and stores it", async () => { + const paymentAccount = new PublicKey("11111111111111111111111111111111"); + const paymentManager = await createPaymentManager({ workerManager: mockWorkerManager, privateKey: mockPrivateKey, paymentStore: mockPaymentStore, + logger: { log: { info: vi.fn(), error: vi.fn() } } as any, + publicKey: mockRecipient.toBase58(), + managerSettings: { + paymentAccount: paymentAccount.toBase58(), + } as any, }); - const paymentAccount = new PublicKey("11111111111111111111111111111111"); - const result = await paymentManager.generatePayment({ - id: ulid(), peerId: mockPeerId, amount: 1000n, paymentAccount, }); - // expect(result.amount).toBe(1000n); - // expect(result.paymentAccount).toBe(paymentAccount.toBase58()); - // expect(result.nonce).toBe(5n); + expect(result).toBeDefined(); + expect(mockPaymentStore.create).toHaveBeenCalled(); }); - - it( - "batches proof", - async () => { - const paymentAccount = new PublicKey( - "6XjpcA3N18t2ToVndtySfUXU2pKtDca2NZCFbygh7f56", - ); - - const paymentManager = await createPaymentManager({ - workerManager: mockWorkerManager, - privateKey: mockPrivateKey, - paymentStore: mockPaymentStore, - managerSettings: { - paymentAccount: paymentAccount.toBase58(), - }, - }); - - const proofsToGenerate = 2; - const paymentsPerProof = 10; - - const proofs = []; - //generate 2 proofs - for (let p = 0; p < proofsToGenerate; p++) { - const payments = []; - //generate 10 payments - for (let i = 0; i < paymentsPerProof; i++) { - vi.mocked(mockWorkerManager.getWorker).mockResolvedValue({ - state: { - recipient: "GGqak36ECpZP5HbZse41bynygPR2ciYsTVPsriocqjWH", - nonce: BigInt(paymentsPerProof * p + i), - }, - }); - - const result = await paymentManager.generatePayment({ - peerId: mockPeerId, - amount: 1000n, - paymentAccount, - }); - - payments.push(result); - await new Promise((resolve) => setTimeout(resolve, 100)); - } - - const { proof, publicSignals, pubKey } = - await paymentManager.generatePaymentProof(mockPrivateKey, payments); - - proofs.push({ - pubKey, - proof, - publicSignals, - }); - } - - //batch the proofs - const batchedProof = await paymentManager.bulkPaymentProofs({ - privateKey: mockPrivateKey, - recipient: mockRecipient, - r8_x: proofs[0].pubKey[0], - r8_y: proofs[0].pubKey[1], - proofs, - }); - - expect(batchedProof).toBeDefined(); - expect(batchedProof.proof).toBeDefined(); - - //expect first public signal to be 0 - expect(batchedProof.publicSignals).toBeDefined(); - expect(batchedProof.publicSignals![0]).toBe("0"); - expect(batchedProof.publicSignals![1]).toBe( - (proofsToGenerate * paymentsPerProof - 1).toString(), - ); - }, - { timeout: 240_000 }, - ); }); diff --git a/modules/manager/src/modules/createPaymentManager.ts b/modules/manager/src/modules/createPaymentManager.ts index f312b0aa..f4b96383 100644 --- a/modules/manager/src/modules/createPaymentManager.ts +++ b/modules/manager/src/modules/createPaymentManager.ts @@ -3,7 +3,7 @@ import type { PaymentStore } from "@effectai/protocol-core"; import type { PeerId, PrivateKey } from "@libp2p/interface"; import { PublicKey } from "@solana/web3.js"; -import { ProofToProofResponseMessage, computePaymentId } from "../utils.js"; +import { ProofToProofResponseMessage } from "../utils.js"; import { type Groth16Proof, @@ -68,17 +68,9 @@ export async function createPaymentManager({ })); //insert payment into the store - await paymentStore.put({ - entityId: computePaymentId(payment), - record: { - state: payment, - events: [ - { - type: "payment:created", - timestamp: Date.now(), - }, - ], - }, + await paymentStore.create({ + peerId: peerId.toString(), + payment, }); return payment; @@ -251,17 +243,9 @@ export async function createPaymentManager({ })); //save payment in store. - await paymentStore.put({ - entityId: computePaymentId(payment), - record: { - state: payment, - events: [ - { - type: "payment:created", - timestamp: Math.floor(Date.now() / 1000), - }, - ], - }, + await paymentStore.create({ + peerId: peerId.toString(), + payment, }); return payment; diff --git a/modules/manager/src/modules/createWorkerManager.spec.ts b/modules/manager/src/modules/createWorkerManager.spec.ts index 561fb291..17a075db 100644 --- a/modules/manager/src/modules/createWorkerManager.spec.ts +++ b/modules/manager/src/modules/createWorkerManager.spec.ts @@ -1,12 +1,10 @@ -import { PeerId } from "@libp2p/interface"; -import { Keypair, PublicKey } from "@solana/web3.js"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { createPaymentManager } from "./createPaymentManager.js"; +import { Keypair } from "@solana/web3.js"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { createWorkerManager } from "./createWorkerManager.js"; import { promises } from "node:fs"; -// import { createDataStore } from "@effectai/test-utils"; -import { Key, type Datastore } from "@effectai/protocol-core"; +import { createDataStore } from "@effectai/test-utils"; +import { type Datastore } from "@effectai/protocol-core"; describe("createWorkerManager", () => { let datastore: Datastore; @@ -62,7 +60,7 @@ describe("createWorkerManager", () => { const [result] = await workerManager.getAccessCodes(); expect(result.events.some((e) => e.type === "redeem")).toBe(true); - expect(result.state.code).to.equal(code); + expect(result.state.code).toBe(code); }); it("should throw an InvalidAccessCode error if given a wrong access code", () => {}); @@ -102,9 +100,9 @@ describe("createWorkerManager", () => { expect(selectedWorker).toBe("worker1"); //make worker 2 busy - await workerManager.updateWorkerState("worker2", () => ({ - totalTasks: 3, - })); + workerManager.markTaskAssigned("worker2", "task-1"); + workerManager.markTaskAssigned("worker2", "task-2"); + workerManager.markTaskAssigned("worker2", "task-3"); //expect worker 2 to be busy selectedWorker = await workerManager.selectWorker("model/gpt5"); diff --git a/modules/manager/src/stores/managerTaskStore.spec.ts b/modules/manager/src/stores/managerTaskStore.spec.ts index 9a3fa558..677f812f 100644 --- a/modules/manager/src/stores/managerTaskStore.spec.ts +++ b/modules/manager/src/stores/managerTaskStore.spec.ts @@ -10,6 +10,7 @@ import { TaskExpiredError, stringifyWithBigInt, } from "@effectai/protocol-core"; +import { ulid } from "ulid"; const mockDatastore = { has: vi.fn(), @@ -31,7 +32,7 @@ const mockPeerId = { } as PeerId; const mockTask: Task = { - id: "task123", + id: ulid(), title: "Test Task", reward: 100n, timeLimitSeconds: 3600, @@ -69,7 +70,7 @@ describe("ManagerTaskStore", () => { it("should create a new task record", async () => { const result = await taskStore.create({ task: mockTask, - providerPeerId: mockPeerId, + providerPeerIdStr: mockPeerId.toString(), }); expect(result.state).toEqual(mockTask); diff --git a/modules/manager/src/stores/managerTaskStore.ts b/modules/manager/src/stores/managerTaskStore.ts index 4959d2ca..203cc2c0 100644 --- a/modules/manager/src/stores/managerTaskStore.ts +++ b/modules/manager/src/stores/managerTaskStore.ts @@ -60,6 +60,13 @@ export interface TaskCompletedEvent extends BaseTaskEvent { } export type ManagerTaskRecord = TaskRecord; +export type ManagerTaskSyncStatus = + | "created" + | "assigned" + | "accepted" + | "submitted" + | "completed" + | "rejected"; export const createManagerTaskStore = ({ datastore, @@ -300,6 +307,85 @@ export const createManagerTaskStore = ({ await batch.commit(); }; + const getSyncStatus = ( + taskRecord: ManagerTaskRecord, + ): ManagerTaskSyncStatus => { + const lastEvent = taskRecord.events[taskRecord.events.length - 1]; + + switch (lastEvent?.type) { + case "assign": + return "assigned"; + case "accept": + return "accepted"; + case "submission": + return "submitted"; + case "payout": + return "completed"; + case "reject": + return "rejected"; + case "create": + default: + return "created"; + } + }; + + const hasWorkerEvent = (taskRecord: ManagerTaskRecord, workerId: string) => { + return taskRecord.events.some((event) => { + switch (event.type) { + case "assign": + return event.assignedToPeer === workerId; + case "accept": + return event.acceptedByPeer === workerId; + case "reject": + return event.rejectedByPeer === workerId; + case "submission": + return event.submissionByPeer === workerId; + default: + return false; + } + }); + }; + + const listByWorker = async ({ + workerId, + limit, + }: { + workerId: string; + limit?: number; + }): Promise< + Array + > => { + const tasks: Array = + []; + + const pushRecord = (record: ManagerTaskRecord) => { + if (limit && tasks.length >= limit) { + return false; + } + + if (!hasWorkerEvent(record, workerId)) { + return true; + } + + tasks.push({ + ...record, + syncStatus: getSyncStatus(record), + }); + return !limit || tasks.length < limit; + }; + + for (const prefix of ["tasks/active", "tasks/completed"]) { + const records = await coreStore.all({ prefix }); + for (const record of records) { + if (!pushRecord(record)) { + return tasks; + } + } + } + + return tasks; + }; + const assign = async ({ entityId, workerPeerIdStr, @@ -351,6 +437,7 @@ export const createManagerTaskStore = ({ payout, assign, getTask, + listByWorker, }; }; diff --git a/modules/payment/tests/circuits.spec.ts b/modules/payment/tests/circuits.spec.ts index 9ef93758..a852a2e5 100644 --- a/modules/payment/tests/circuits.spec.ts +++ b/modules/payment/tests/circuits.spec.ts @@ -6,8 +6,8 @@ import { intStringTo32Bytes, prove, signPayment, -} from "../clients/js"; -import { publicKeyToTruncatedHex } from "../clients/js"; +} from "../clients/js/node"; +import { publicKeyToTruncatedHex } from "../clients/js/node"; import { randomBytes } from "node:crypto"; import { PublicKey } from "@solana/web3.js"; import { buildEddsa } from "circomlibjs"; @@ -15,7 +15,10 @@ import { buildEddsa } from "circomlibjs"; import { setup } from "@effectai/test-utils"; import { generateKeyPairSigner } from "@solana/kit"; -describe("Generate Proof", () => { +const runIntegrationTests = process.env.RUN_INTEGRATION_TESTS === "1"; +const describeIntegration = runIntegrationTests ? describe : describe.skip; + +describeIntegration("Generate Proof", () => { it("should generate and prove a proof", async () => { const privateKeyBytes = randomBytes(32); diff --git a/modules/payment/tests/program.test.ts b/modules/payment/tests/program.test.ts index ebe9648b..6854f09c 100644 --- a/modules/payment/tests/program.test.ts +++ b/modules/payment/tests/program.test.ts @@ -11,7 +11,7 @@ import { getRecipientManagerDataAccountEncoder, PAYMENT_BATCH_SIZE, signPayment, -} from "../clients/js"; +} from "../clients/js/node"; import { address, @@ -27,23 +27,27 @@ import { } from "@effectai/utils"; import { LiteSVM } from "litesvm"; -describe("Payment Program", async () => { - const eddsa = await buildEddsa(); - const liteSVM = new LiteSVM(); - liteSVM.addProgramFromFile( - EFFECT_PAYMENT_PROGRAM_ADDRESS, - "../../../target/deploy/effect_payment.so", - ); +const runIntegrationTests = process.env.RUN_INTEGRATION_TESTS === "1"; +const describeIntegration = runIntegrationTests ? describe : describe.skip; - const managerPrivateKey = randomBytes(32); - const eddsaPublicKey = eddsa.prv2pub(managerPrivateKey); - const bs58ManagerPublicKey = getAddressDecoder().decode( - eddsa.babyJub.packPoint(eddsaPublicKey), - ); - - const provider = await createLocalSolanaProvider(); +describeIntegration("Payment Program", () => { it("can redeem a proof", async () => { + const eddsa = await buildEddsa(); + const liteSVM = new LiteSVM(); + liteSVM.addProgramFromFile( + EFFECT_PAYMENT_PROGRAM_ADDRESS, + "../../../target/deploy/effect_payment.so", + ); + + const managerPrivateKey = randomBytes(32); + const eddsaPublicKey = eddsa.prv2pub(managerPrivateKey); + const bs58ManagerPublicKey = getAddressDecoder().decode( + eddsa.babyJub.packPoint(eddsaPublicKey), + ); + + const provider = await createLocalSolanaProvider(); + const { mint, ata, signer } = await setup(); const paymentAccount = await generateKeyPairSigner(); console.log(PAYMENT_BATCH_SIZE, "PAYMENT_BATCH_SIZE"); diff --git a/modules/worker/src/index.ts b/modules/worker/src/index.ts index 1df9014d..6b670df6 100644 --- a/modules/worker/src/index.ts +++ b/modules/worker/src/index.ts @@ -1,2 +1,3 @@ export * from "./main.js"; export * from "./stores/workerTaskStore.js"; +export * from "./stores/workerSyncStateStore.js"; diff --git a/modules/worker/src/main.ts b/modules/worker/src/main.ts index 4e0cff59..d47363b2 100644 --- a/modules/worker/src/main.ts +++ b/modules/worker/src/main.ts @@ -2,6 +2,7 @@ import { webRTC } from "@libp2p/webrtc"; import { createPaymentWorker } from "./modules/createPaymentWorker.js"; import { createTaskWorker } from "./modules/createTaskWorker.js"; import { createWorkerTaskStore } from "./stores/workerTaskStore.js"; +import { createWorkerSyncStateStore } from "./stores/workerSyncStateStore.js"; import { createTemplateWorker } from "./modules/createTemplateWorker.js"; // import type { PingService } from "@libp2p/ping"; @@ -25,9 +26,13 @@ import { PingService } from "@libp2p/ping"; import { EffectProtocolMessage, + type RequestToWorkResponse, type Payment, type Task, + type WorkerSyncResponse, } from "@effectai/protobufs"; +import { applyWorkerSyncResponse } from "./sync/applyWorkerSyncResponse.js"; +import { runConnectFlow } from "./sync/runConnectFlow.js"; export interface WorkerEvents { "task:created": CustomEvent; @@ -70,10 +75,12 @@ export const createWorker = async ({ datastore, privateKey, autoExpire = true, + autoSyncBeforeConnect = true, }: { datastore: Datastore; privateKey: Uint8Array | PrivateKey; autoExpire: boolean; + autoSyncBeforeConnect?: boolean; }) => { const ed25519PrivateKey: PrivateKey = privateKey instanceof Uint8Array @@ -90,6 +97,7 @@ export const createWorker = async ({ const taskStore = createWorkerTaskStore({ datastore }); const paymentStore = createPaymentStore({ datastore }); const templateStore = createTemplateStore({ datastore }); + const syncStateStore = createWorkerSyncStateStore({ datastore }); // register worker modules const templateWorker = createTemplateWorker({ entity, templateStore }); @@ -158,6 +166,43 @@ export const createWorker = async ({ return response; }; + const syncWithManager = async ( + manager: Multiaddr, + { + scopes, + cursor, + limit, + }: { + scopes?: string[]; + cursor?: bigint; + limit?: number; + } = {}, + ) => { + const [response, error] = await entity.sendMessage(manager, { + workerSyncRequest: { + timestamp: Math.floor(Date.now() / 1000), + workerId: entity.getPeerId().toString(), + scopes: scopes ?? [], + cursor, + limit, + }, + }); + + if (error || !response) { + throw new Error(`Failed to sync with manager: ${error}`); + } + + const sync: WorkerSyncResponse = response; + await applyWorkerSyncResponse({ + sync, + taskStore, + paymentStore, + syncStateStore, + }); + + return sync; + }; + //connect to an identified manager const connect = async ( multiaddress: Multiaddr, @@ -166,20 +211,39 @@ export const createWorker = async ({ nonce, capabilities, accessCode, + skipSync, }: { recipient: string; nonce: bigint; capabilities: string[]; accessCode?: string; + skipSync?: boolean; }, ) => { - const [response, error] = await entity.sendMessage(multiaddress, { - requestToWork: { - timestamp: Math.floor(Date.now() / 1000), - recipient, - nonce, - capabilities: capabilities.join(","), - accessCode, + const [response, error] = await runConnectFlow({ + autoSyncBeforeConnect, + skipSync, + syncWithManager: async () => + await syncWithManager(multiaddress, { + scopes: ["status", "capabilities", "tasks", "payments"], + }), + requestToWork: async (): Promise< + readonly [RequestToWorkResponse | null, Error | null] + > => { + const [connectResponse, connectError] = await entity.sendMessage( + multiaddress, + { + requestToWork: { + timestamp: Math.floor(Date.now() / 1000), + recipient, + nonce, + capabilities: capabilities.join(","), + accessCode, + }, + }, + ); + + return [connectResponse ?? null, connectError]; }, }); @@ -209,6 +273,7 @@ export const createWorker = async ({ entity, events, taskStore, + syncStateStore, getTask, getTasks, @@ -227,6 +292,7 @@ export const createWorker = async ({ requestBulkProofs, identify, connect, + syncWithManager, disconnect, start, stop, diff --git a/modules/worker/src/modules/createPaymentWorker.spec.ts b/modules/worker/src/modules/createPaymentWorker.spec.ts index 24110712..563d0a03 100644 --- a/modules/worker/src/modules/createPaymentWorker.spec.ts +++ b/modules/worker/src/modules/createPaymentWorker.spec.ts @@ -1,8 +1,10 @@ -import { beforeEach, describe, it, test, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import { createPaymentWorker } from "./createPaymentWorker"; import { createPaymentStore } from "@effectai/protocol-core"; import { ulid } from "ulid"; import { promises } from "node:fs"; +import { createDataStore } from "@effectai/test-utils"; +import type { Payment } from "@effectai/protobufs"; describe("createPaymentWorker", async () => { let paymentWorker: ReturnType; @@ -34,13 +36,26 @@ describe("createPaymentWorker", async () => { it("tests worker payments", async () => { const n = 50; for (let i = 0; i < n; i++) { - console.log(i); + const payment: Payment = { + id: ulid(), + version: 1, + nonce: BigInt(i), + amount: 100n, + recipient: "recipient-test", + paymentAccount: "payment-account-test", + publicKey: "manager-public-key", + signature: { + R8: { + R8_1: "1", + R8_2: "2", + }, + S: "3", + }, + }; + await paymentWorker.createPayment({ managerPeerId: "peer-testing-1", - payment: { - id: ulid(), - nonce: BigInt(i), - }, + payment, }); } @@ -49,8 +64,6 @@ describe("createPaymentWorker", async () => { perPage: 10, }); - result.items.map((item) => { - console.log(item.state); - }); + expect(result.items.length).toBeGreaterThan(0); }); }); diff --git a/modules/worker/src/stores/workerSyncStateStore.ts b/modules/worker/src/stores/workerSyncStateStore.ts new file mode 100644 index 00000000..2f23259f --- /dev/null +++ b/modules/worker/src/stores/workerSyncStateStore.ts @@ -0,0 +1,93 @@ +import { + type Datastore, + createEntityStore, + parseWithBigInt, + stringifyWithBigInt, +} from "@effectai/protocol-core"; +import type { WorkerSyncStatus } from "@effectai/protobufs"; + +interface WorkerSyncStateUpdatedEvent { + type: "sync_state_updated"; + timestamp: number; +} + +export interface WorkerSyncStateRecord { + events: WorkerSyncStateUpdatedEvent[]; + state: { + workerId: string; + managerPeerId: string; + cursor: bigint; + serverTime: number; + status?: WorkerSyncStatus; + capabilities: string[]; + updatedAt: number; + }; +} + +export const createWorkerSyncStateStore = ({ + datastore, +}: { + datastore: Datastore; +}) => { + const coreStore = createEntityStore< + WorkerSyncStateUpdatedEvent, + WorkerSyncStateRecord + >({ + datastore, + defaultPrefix: "workerSyncState", + stringify: (record) => stringifyWithBigInt(record), + parse: (data) => parseWithBigInt(data), + }); + + const syncEntityId = "current"; + + const saveFromSync = async ({ + workerId, + managerPeerId, + cursor, + serverTime, + status, + capabilities, + }: Omit) => { + const existing = await coreStore.getSafe({ entityId: syncEntityId }); + const timestamp = Math.floor(Date.now() / 1000); + + const record: WorkerSyncStateRecord = { + events: [ + ...(existing?.events ?? []), + { + type: "sync_state_updated", + timestamp, + }, + ], + state: { + workerId, + managerPeerId, + cursor, + serverTime, + status, + capabilities, + updatedAt: timestamp, + }, + }; + + await coreStore.put({ + entityId: syncEntityId, + record, + }); + + return record; + }; + + const getCurrent = async () => { + return await coreStore.getSafe({ entityId: syncEntityId }); + }; + + return { + ...coreStore, + saveFromSync, + getCurrent, + }; +}; + +export type WorkerSyncStateStore = ReturnType; diff --git a/modules/worker/src/stores/workerTaskStore.ts b/modules/worker/src/stores/workerTaskStore.ts index 26decc3b..9914bc8f 100644 --- a/modules/worker/src/stores/workerTaskStore.ts +++ b/modules/worker/src/stores/workerTaskStore.ts @@ -88,6 +88,32 @@ export const createWorkerTaskStore = ({ await coreStore.put({ entityId: `${index}/${entityId}`, record }); }; + const upsertFromSync = async ({ + task, + status, + managerPeerId, + }: { + task: Task; + status: string; + managerPeerId?: string; + }) => { + const index = status === "completed" ? "completed" : "active"; + const record: WorkerTaskRecord = { + events: managerPeerId + ? [ + { + timestamp: Math.floor(Date.now() / 1000), + type: "create", + managerPeer: managerPeerId, + }, + ] + : [], + state: task, + }; + + await saveTask({ entityId: task.id, record, index }); + }; + const create = async ({ task, managerPeerId, @@ -251,6 +277,8 @@ export const createWorkerTaskStore = ({ accept, reject, expire, + saveTask, + upsertFromSync, }; }; export type WorkerTaskStore = ReturnType; diff --git a/modules/worker/src/sync/applyWorkerSyncResponse.ts b/modules/worker/src/sync/applyWorkerSyncResponse.ts new file mode 100644 index 00000000..71b493fe --- /dev/null +++ b/modules/worker/src/sync/applyWorkerSyncResponse.ts @@ -0,0 +1,47 @@ +import type { Payment, WorkerSyncResponse } from "@effectai/protobufs"; +import type { WorkerTaskStore } from "../stores/workerTaskStore.js"; +import type { WorkerSyncStateStore } from "../stores/workerSyncStateStore.js"; + +interface PaymentStoreLike { + create: (args: { peerId: string; payment: Payment }) => Promise; +} + +export const applyWorkerSyncResponse = async ({ + sync, + taskStore, + paymentStore, + syncStateStore, +}: { + sync: WorkerSyncResponse; + taskStore: Pick; + paymentStore: PaymentStoreLike; + syncStateStore: Pick; +}) => { + for (const task of sync.tasks ?? []) { + if (!task.task) continue; + + await taskStore.upsertFromSync({ + task: task.task, + status: task.status, + managerPeerId: sync.managerPeerId, + }); + } + + for (const payment of sync.payments ?? []) { + if (!payment.payment) continue; + + await paymentStore.create({ + peerId: sync.managerPeerId, + payment: payment.payment, + }); + } + + await syncStateStore.saveFromSync({ + workerId: sync.workerId, + managerPeerId: sync.managerPeerId, + cursor: sync.cursor, + serverTime: sync.serverTime, + status: sync.status, + capabilities: sync.capabilities ?? [], + }); +}; diff --git a/modules/worker/src/sync/runConnectFlow.ts b/modules/worker/src/sync/runConnectFlow.ts new file mode 100644 index 00000000..473be2fd --- /dev/null +++ b/modules/worker/src/sync/runConnectFlow.ts @@ -0,0 +1,17 @@ +export const runConnectFlow = async ({ + autoSyncBeforeConnect, + skipSync, + syncWithManager, + requestToWork, +}: { + autoSyncBeforeConnect: boolean; + skipSync?: boolean; + syncWithManager: () => Promise; + requestToWork: () => Promise; +}) => { + if (autoSyncBeforeConnect && !skipSync) { + await syncWithManager(); + } + + return await requestToWork(); +}; diff --git a/modules/worker/src/sync/sync.connect.spec.ts b/modules/worker/src/sync/sync.connect.spec.ts new file mode 100644 index 00000000..efcfaca6 --- /dev/null +++ b/modules/worker/src/sync/sync.connect.spec.ts @@ -0,0 +1,51 @@ +import { describe, expect, it, vi } from "vitest"; +import { runConnectFlow } from "./runConnectFlow"; + +describe("sync connect flow", () => { + it("runs sync before requestToWork by default", async () => { + const calls: string[] = []; + + const syncWithManager = vi.fn(async () => { + calls.push("sync"); + }); + + const requestToWork = vi.fn(async () => { + calls.push("connect"); + return [{}, null] as const; + }); + + await runConnectFlow({ + autoSyncBeforeConnect: true, + syncWithManager, + requestToWork, + }); + + expect(calls).toEqual(["sync", "connect"]); + expect(syncWithManager).toHaveBeenCalledTimes(1); + expect(requestToWork).toHaveBeenCalledTimes(1); + }); + + it("skips sync when skipSync is true", async () => { + const calls: string[] = []; + + const syncWithManager = vi.fn(async () => { + calls.push("sync"); + }); + + const requestToWork = vi.fn(async () => { + calls.push("connect"); + return [{}, null] as const; + }); + + await runConnectFlow({ + autoSyncBeforeConnect: true, + skipSync: true, + syncWithManager, + requestToWork, + }); + + expect(calls).toEqual(["connect"]); + expect(syncWithManager).not.toHaveBeenCalled(); + expect(requestToWork).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/library/tests/protocol.spec.ts b/packages/library/tests/protocol.spec.ts index b478eb76..b53d0911 100644 --- a/packages/library/tests/protocol.spec.ts +++ b/packages/library/tests/protocol.spec.ts @@ -31,7 +31,10 @@ import { promises } from "node:fs"; import { Keypair } from "@solana/web3.js"; -describe("Complete Task Lifecycle", () => { +const runIntegrationTests = process.env.RUN_INTEGRATION_TESTS === "1"; +const describeIntegration = runIntegrationTests ? describe : describe.skip; + +describeIntegration("Complete Task Lifecycle", () => { let manager: Awaited>; let worker: Awaited>; const providerPeerId = peerIdFromString( @@ -104,12 +107,20 @@ describe("Complete Task Lifecycle", () => { }); afterEach(async () => { - await manager.stop(); - await worker.stop(); + if (manager) { + await manager.stop(); + } + if (worker) { + await worker.stop(); + } //close the datastores - await managerDatastore.close(); - await workerDatastore.close(); + if (managerDatastore) { + await managerDatastore.close(); + } + if (workerDatastore) { + await workerDatastore.close(); + } }); it( diff --git a/vitest.config.ts b/vitest.config.ts new file mode 100644 index 00000000..1acdfc56 --- /dev/null +++ b/vitest.config.ts @@ -0,0 +1,20 @@ +import { defineConfig } from "vitest/config"; +import { fileURLToPath } from "node:url"; + +const rootDir = fileURLToPath(new URL(".", import.meta.url)); + +export default defineConfig({ + resolve: { + alias: { + "@effectai/test-utils": fileURLToPath( + new URL("./packages/test-utils/src/index.ts", import.meta.url), + ), + "@capabilities": fileURLToPath( + new URL("./apps/worker-app/app/constants/capabilities.ts", import.meta.url), + ), + }, + }, + test: { + root: rootDir, + }, +}); From 89945dd5f273bb5cfa58389b6d6cfcb26c32cdb3 Mon Sep 17 00:00:00 2001 From: Jeffrieh Date: Wed, 1 Apr 2026 11:03:44 +0200 Subject: [PATCH 2/2] feat: catch-up sync --- core/protobufs/src/effect.proto | 6 + core/protobufs/src/effect.ts | 64 ++++++++- modules/manager/src/main.ts | 122 ++++++++++++------ modules/worker/src/main.ts | 85 +++++++++--- .../worker/src/stores/workerSyncStateStore.ts | 6 + .../src/sync/applyWorkerSyncResponse.ts | 2 + modules/worker/src/sync/runCatchUpSync.ts | 66 ++++++++++ modules/worker/src/sync/sync.catchup.spec.ts | 54 ++++++++ 8 files changed, 343 insertions(+), 62 deletions(-) create mode 100644 modules/worker/src/sync/runCatchUpSync.ts create mode 100644 modules/worker/src/sync/sync.catchup.spec.ts diff --git a/core/protobufs/src/effect.proto b/core/protobufs/src/effect.proto index a5ddfe9e..989d50d5 100644 --- a/core/protobufs/src/effect.proto +++ b/core/protobufs/src/effect.proto @@ -49,6 +49,8 @@ message WorkerSyncRequest { optional uint64 cursor = 3; repeated string scopes = 4; optional uint32 limit = 5; + optional string tasks_cursor = 6; + optional string payments_cursor = 7; } message WorkerSyncStatus { @@ -81,6 +83,10 @@ message WorkerSyncResponse { repeated string capabilities = 6; repeated WorkerSyncTask tasks = 7; repeated WorkerSyncPayment payments = 8; + optional string tasks_cursor = 9; + optional string payments_cursor = 10; + bool tasks_has_more = 11; + bool payments_has_more = 12; } message EffectProtocolMessage { diff --git a/core/protobufs/src/effect.ts b/core/protobufs/src/effect.ts index 1ceca400..a1797b10 100644 --- a/core/protobufs/src/effect.ts +++ b/core/protobufs/src/effect.ts @@ -551,6 +551,8 @@ export interface WorkerSyncRequest { cursor?: bigint scopes: string[] limit?: number + tasksCursor?: string + paymentsCursor?: string } export namespace WorkerSyncRequest { @@ -590,6 +592,16 @@ export namespace WorkerSyncRequest { w.uint32(obj.limit) } + if (obj.tasksCursor != null) { + w.uint32(50) + w.string(obj.tasksCursor) + } + + if (obj.paymentsCursor != null) { + w.uint32(58) + w.string(obj.paymentsCursor) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -630,6 +642,14 @@ export namespace WorkerSyncRequest { obj.limit = reader.uint32() break } + case 6: { + obj.tasksCursor = reader.string() + break + } + case 7: { + obj.paymentsCursor = reader.string() + break + } default: { reader.skipType(tag & 7) break @@ -942,6 +962,10 @@ export interface WorkerSyncResponse { capabilities: string[] tasks: WorkerSyncTask[] payments: WorkerSyncPayment[] + tasksCursor?: string + paymentsCursor?: string + tasksHasMore: boolean + paymentsHasMore: boolean } export namespace WorkerSyncResponse { @@ -1000,6 +1024,26 @@ export namespace WorkerSyncResponse { } } + if (obj.tasksCursor != null) { + w.uint32(74) + w.string(obj.tasksCursor) + } + + if (obj.paymentsCursor != null) { + w.uint32(82) + w.string(obj.paymentsCursor) + } + + if ((obj.tasksHasMore != null && obj.tasksHasMore !== false)) { + w.uint32(88) + w.bool(obj.tasksHasMore) + } + + if ((obj.paymentsHasMore != null && obj.paymentsHasMore !== false)) { + w.uint32(96) + w.bool(obj.paymentsHasMore) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -1011,7 +1055,9 @@ export namespace WorkerSyncResponse { managerPeerId: '', capabilities: [], tasks: [], - payments: [] + payments: [], + tasksHasMore: false, + paymentsHasMore: false } const end = length == null ? reader.len : reader.pos + length @@ -1070,6 +1116,22 @@ export namespace WorkerSyncResponse { })) break } + case 9: { + obj.tasksCursor = reader.string() + break + } + case 10: { + obj.paymentsCursor = reader.string() + break + } + case 11: { + obj.tasksHasMore = reader.bool() + break + } + case 12: { + obj.paymentsHasMore = reader.bool() + break + } default: { reader.skipType(tag & 7) break diff --git a/modules/manager/src/main.ts b/modules/manager/src/main.ts index e15cc537..b53d1246 100644 --- a/modules/manager/src/main.ts +++ b/modules/manager/src/main.ts @@ -262,15 +262,17 @@ export const createManager = async ({ ? new Set(syncRequest.scopes) : new Set(["status", "capabilities", "tasks", "payments"]); const limit = syncRequest.limit ?? 200; - const cursor = - typeof syncRequest.cursor === "bigint" - ? Number(syncRequest.cursor) - : 0; + const tasksCursor = syncRequest.tasksCursor ?? ""; + const paymentsCursor = syncRequest.paymentsCursor ?? ""; const now = Math.floor(Date.now() / 1000); const worker = await workerManager.getWorker(workerId); - const response: EffectProtocolMessage = { - workerSyncResponse: { + const toCursorKey = (timestamp: number, entityId: string) => + `${timestamp.toString().padStart(10, "0")}:${entityId}`; + + const workerSyncResponse: NonNullable< + EffectProtocolMessage["workerSyncResponse"] + > = { serverTime: now, workerId, cursor: BigInt(now), @@ -292,53 +294,93 @@ export const createManager = async ({ : [], tasks: [], payments: [], - }, + tasksCursor, + paymentsCursor, + tasksHasMore: false, + paymentsHasMore: false, + }; + + const response: EffectProtocolMessage = { + workerSyncResponse, }; if (scopes.has("tasks")) { - const tasks = await taskStore.listByWorker({ workerId, limit }); - for (const taskRecord of tasks) { - const lastEventAt = - taskRecord.events[taskRecord.events.length - 1]?.timestamp ?? 0; - if (cursor && lastEventAt <= cursor) { - continue; - } - - response.workerSyncResponse?.tasks.push({ - taskId: taskRecord.state.id, - status: taskRecord.syncStatus, - lastEventAt, - task: taskRecord.state, + const tasks = await taskStore.listByWorker({ workerId }); + const taskPage = tasks + .map((taskRecord) => { + const lastEventAt = + taskRecord.events[taskRecord.events.length - 1]?.timestamp ?? 0; + return { + taskId: taskRecord.state.id, + status: taskRecord.syncStatus, + lastEventAt, + task: taskRecord.state, + cursorKey: toCursorKey(lastEventAt, taskRecord.state.id), + }; + }) + .filter((taskRecord) => taskRecord.cursorKey > tasksCursor) + .sort((a, b) => a.cursorKey.localeCompare(b.cursorKey)); + + const pagedTasks = taskPage.slice(0, limit); + workerSyncResponse.tasksHasMore = taskPage.length > limit; + + for (const taskRecord of pagedTasks) { + workerSyncResponse.tasks.push({ + taskId: taskRecord.taskId, + status: taskRecord.status, + lastEventAt: taskRecord.lastEventAt, + task: taskRecord.task, }); } + + const lastTask = pagedTasks[pagedTasks.length - 1]; + if (lastTask) { + workerSyncResponse.tasksCursor = lastTask.cursorKey; + } } if (scopes.has("payments")) { const payments = await paymentStore.all({ prefix: `payments/${workerId}`, - limit, }); - - for (const paymentRecord of payments) { - const lastEventAt = - paymentRecord.events[paymentRecord.events.length - 1]?.timestamp ?? - 0; - if (cursor && lastEventAt <= cursor) { - continue; - } - - const createdAt = - paymentRecord.events.find((event) => event.type === "create") - ?.timestamp ?? lastEventAt; - - response.workerSyncResponse?.payments.push({ - paymentId: paymentRecord.state.id, - status: "created", - amount: paymentRecord.state.amount.toString(), - createdAt, - payment: paymentRecord.state, + const paymentPage = payments + .map((paymentRecord) => { + const lastEventAt = + paymentRecord.events[paymentRecord.events.length - 1]?.timestamp ?? + 0; + const createdAt = + paymentRecord.events.find((event) => event.type === "create") + ?.timestamp ?? lastEventAt; + + return { + paymentId: paymentRecord.state.id, + status: "created", + amount: paymentRecord.state.amount.toString(), + createdAt, + payment: paymentRecord.state, + cursorKey: toCursorKey(lastEventAt, paymentRecord.state.id), + }; + }) + .filter((paymentRecord) => paymentRecord.cursorKey > paymentsCursor) + .sort((a, b) => a.cursorKey.localeCompare(b.cursorKey)); + + const pagedPayments = paymentPage.slice(0, limit); + workerSyncResponse.paymentsHasMore = paymentPage.length > limit; + + for (const paymentRecord of pagedPayments) { + workerSyncResponse.payments.push({ + paymentId: paymentRecord.paymentId, + status: paymentRecord.status, + amount: paymentRecord.amount, + createdAt: paymentRecord.createdAt, + payment: paymentRecord.payment, }); } + + const lastPayment = pagedPayments[pagedPayments.length - 1]; + if (lastPayment) { + workerSyncResponse.paymentsCursor = lastPayment.cursorKey; + } } return response; diff --git a/modules/worker/src/main.ts b/modules/worker/src/main.ts index d47363b2..baa5c365 100644 --- a/modules/worker/src/main.ts +++ b/modules/worker/src/main.ts @@ -33,6 +33,7 @@ import { } from "@effectai/protobufs"; import { applyWorkerSyncResponse } from "./sync/applyWorkerSyncResponse.js"; import { runConnectFlow } from "./sync/runConnectFlow.js"; +import { runCatchUpSync } from "./sync/runCatchUpSync.js"; export interface WorkerEvents { "task:created": CustomEvent; @@ -170,37 +171,79 @@ export const createWorker = async ({ manager: Multiaddr, { scopes, - cursor, + tasksCursor, + paymentsCursor, limit, }: { scopes?: string[]; - cursor?: bigint; + tasksCursor?: string; + paymentsCursor?: string; limit?: number; } = {}, ) => { - const [response, error] = await entity.sendMessage(manager, { - workerSyncRequest: { - timestamp: Math.floor(Date.now() / 1000), - workerId: entity.getPeerId().toString(), - scopes: scopes ?? [], - cursor, - limit, - }, - }); + const existingSyncState = await syncStateStore.getCurrent(); + const initialTasksCursor = + tasksCursor ?? existingSyncState?.state.tasksCursor; + const initialPaymentsCursor = + paymentsCursor ?? existingSyncState?.state.paymentsCursor; + + const syncPage = async ({ + scopes: pageScopes, + tasksCursor: pageTasksCursor, + paymentsCursor: pagePaymentsCursor, + limit: pageLimit, + }: { + scopes: string[]; + tasksCursor?: string; + paymentsCursor?: string; + limit?: number; + }) => { + const [response, error] = await entity.sendMessage(manager, { + workerSyncRequest: { + timestamp: Math.floor(Date.now() / 1000), + workerId: entity.getPeerId().toString(), + scopes: pageScopes, + limit: pageLimit, + tasksCursor: pageTasksCursor, + paymentsCursor: pagePaymentsCursor, + }, + }); - if (error || !response) { - throw new Error(`Failed to sync with manager: ${error}`); + if (error || !response) { + throw new Error(`Failed to sync with manager: ${error}`); + } + + const sync: WorkerSyncResponse = response; + await applyWorkerSyncResponse({ + sync, + taskStore, + paymentStore, + syncStateStore, + }); + + return sync; + }; + + const requestedScopes = scopes ?? []; + if ( + requestedScopes.includes("tasks") || + requestedScopes.includes("payments") + ) { + return await runCatchUpSync({ + scopes: requestedScopes, + limit, + tasksCursor: initialTasksCursor, + paymentsCursor: initialPaymentsCursor, + syncPage, + }); } - const sync: WorkerSyncResponse = response; - await applyWorkerSyncResponse({ - sync, - taskStore, - paymentStore, - syncStateStore, + return await syncPage({ + scopes: requestedScopes, + tasksCursor: initialTasksCursor, + paymentsCursor: initialPaymentsCursor, + limit, }); - - return sync; }; //connect to an identified manager diff --git a/modules/worker/src/stores/workerSyncStateStore.ts b/modules/worker/src/stores/workerSyncStateStore.ts index 2f23259f..45bd4ac4 100644 --- a/modules/worker/src/stores/workerSyncStateStore.ts +++ b/modules/worker/src/stores/workerSyncStateStore.ts @@ -17,6 +17,8 @@ export interface WorkerSyncStateRecord { workerId: string; managerPeerId: string; cursor: bigint; + tasksCursor?: string; + paymentsCursor?: string; serverTime: number; status?: WorkerSyncStatus; capabilities: string[]; @@ -45,6 +47,8 @@ export const createWorkerSyncStateStore = ({ workerId, managerPeerId, cursor, + tasksCursor, + paymentsCursor, serverTime, status, capabilities, @@ -64,6 +68,8 @@ export const createWorkerSyncStateStore = ({ workerId, managerPeerId, cursor, + tasksCursor, + paymentsCursor, serverTime, status, capabilities, diff --git a/modules/worker/src/sync/applyWorkerSyncResponse.ts b/modules/worker/src/sync/applyWorkerSyncResponse.ts index 71b493fe..49fa3776 100644 --- a/modules/worker/src/sync/applyWorkerSyncResponse.ts +++ b/modules/worker/src/sync/applyWorkerSyncResponse.ts @@ -40,6 +40,8 @@ export const applyWorkerSyncResponse = async ({ workerId: sync.workerId, managerPeerId: sync.managerPeerId, cursor: sync.cursor, + tasksCursor: sync.tasksCursor, + paymentsCursor: sync.paymentsCursor, serverTime: sync.serverTime, status: sync.status, capabilities: sync.capabilities ?? [], diff --git a/modules/worker/src/sync/runCatchUpSync.ts b/modules/worker/src/sync/runCatchUpSync.ts new file mode 100644 index 00000000..3ffc71ab --- /dev/null +++ b/modules/worker/src/sync/runCatchUpSync.ts @@ -0,0 +1,66 @@ +import type { WorkerSyncResponse } from "@effectai/protobufs"; + +export const runCatchUpSync = async ({ + scopes, + limit, + tasksCursor, + paymentsCursor, + syncPage, +}: { + scopes: string[]; + limit?: number; + tasksCursor?: string; + paymentsCursor?: string; + syncPage: (args: { + scopes: string[]; + limit?: number; + tasksCursor?: string; + paymentsCursor?: string; + }) => Promise; +}) => { + const remainingScopes = new Set(scopes); + let nextTasksCursor = tasksCursor; + let nextPaymentsCursor = paymentsCursor; + let lastSync: WorkerSyncResponse | null = null; + let firstPage = true; + + while (remainingScopes.size > 0) { + const pageScopes: string[] = []; + + if (firstPage) { + if (remainingScopes.has("status")) pageScopes.push("status"); + if (remainingScopes.has("capabilities")) pageScopes.push("capabilities"); + } + if (remainingScopes.has("tasks")) pageScopes.push("tasks"); + if (remainingScopes.has("payments")) pageScopes.push("payments"); + + if (pageScopes.length === 0) { + break; + } + + const sync = await syncPage({ + scopes: pageScopes, + limit, + tasksCursor: nextTasksCursor, + paymentsCursor: nextPaymentsCursor, + }); + + lastSync = sync; + nextTasksCursor = sync.tasksCursor ?? nextTasksCursor; + nextPaymentsCursor = sync.paymentsCursor ?? nextPaymentsCursor; + + remainingScopes.delete("status"); + remainingScopes.delete("capabilities"); + + if (!sync.tasksHasMore) { + remainingScopes.delete("tasks"); + } + if (!sync.paymentsHasMore) { + remainingScopes.delete("payments"); + } + + firstPage = false; + } + + return lastSync; +}; diff --git a/modules/worker/src/sync/sync.catchup.spec.ts b/modules/worker/src/sync/sync.catchup.spec.ts new file mode 100644 index 00000000..879be87d --- /dev/null +++ b/modules/worker/src/sync/sync.catchup.spec.ts @@ -0,0 +1,54 @@ +import { describe, expect, it, vi } from "vitest"; +import { runCatchUpSync } from "./runCatchUpSync"; + +describe("sync catch-up flow", () => { + it("keeps paging tasks and payments until both are drained", async () => { + const syncPage = vi + .fn() + .mockResolvedValueOnce({ + workerId: "worker-1", + managerPeerId: "manager-1", + serverTime: 100, + cursor: 100n, + capabilities: ["model/gpt5"], + tasks: [{ taskId: "task-1", status: "assigned", lastEventAt: 1 }], + payments: [{ paymentId: "payment-1", status: "created", amount: "1", createdAt: 1 }], + tasksCursor: "0000000001:task-1", + paymentsCursor: "0000000001:payment-1", + tasksHasMore: true, + paymentsHasMore: true, + }) + .mockResolvedValueOnce({ + workerId: "worker-1", + managerPeerId: "manager-1", + serverTime: 101, + cursor: 101n, + capabilities: [], + tasks: [{ taskId: "task-2", status: "accepted", lastEventAt: 2 }], + payments: [], + tasksCursor: "0000000002:task-2", + paymentsCursor: "0000000001:payment-1", + tasksHasMore: false, + paymentsHasMore: false, + }); + + const result = await runCatchUpSync({ + scopes: ["status", "capabilities", "tasks", "payments"], + limit: 1, + syncPage, + }); + + expect(syncPage).toHaveBeenCalledTimes(2); + expect(syncPage.mock.calls[0][0]).toMatchObject({ + scopes: ["status", "capabilities", "tasks", "payments"], + limit: 1, + }); + expect(syncPage.mock.calls[1][0]).toMatchObject({ + scopes: ["tasks", "payments"], + tasksCursor: "0000000001:task-1", + paymentsCursor: "0000000001:payment-1", + limit: 1, + }); + expect(result?.tasksCursor).toBe("0000000002:task-2"); + }); +});