diff --git a/docker-compose.yml b/docker-compose.yml index 01121c05..9371318a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,12 @@ services: volumes: - rabbit_data:/var/lib/rabbitmq restart: on-failure + healthcheck: + test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 10s redis: image: redis:6.2.7-alpine diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumer.ts index 1cbb8d4f..04cb3d8a 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumer.ts @@ -306,7 +306,7 @@ export abstract class AbstractAmqpConsumer< } // Empty content for whatever reason - if (!resolveMessageResult.result || !resolveMessageResult.result.body) { + if (!resolveMessageResult.result?.body) { return ABORT_EARLY_EITHER } @@ -347,7 +347,7 @@ export abstract class AbstractAmqpConsumer< const resolvedMessage = resolveMessageResult.result // Empty content for whatever reason - if (!resolvedMessage || !resolvedMessage.body) return ABORT_EARLY_EITHER + if (!resolvedMessage?.body) return ABORT_EARLY_EITHER // @ts-expect-error if (this.messageIdField in resolvedMessage.body) { diff --git a/packages/amqp/package.json b/packages/amqp/package.json index 2fc627e1..ee7db7d4 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -23,7 +23,7 @@ "test:coverage": "npm run test -- --coverage", "lint": "biome check . && tsc", "lint:fix": "biome check --write .", - "docker:start": "docker compose up -d rabbitmq", + "docker:start": "docker compose up -d --wait rabbitmq", "docker:stop": "docker compose down", "prepublishOnly": "npm run lint && npm run build" }, @@ -45,7 +45,7 @@ "@types/node": "^25.0.2", "@vitest/coverage-v8": "^4.0.15", "amqplib": "^0.10.8", - "awilix": "^12.0.5", + "awilix": "^13.0.3", "awilix-manager": "^6.1.0", "rimraf": "^6.0.1", "typescript": "^5.9.3", diff --git a/packages/amqp/test/utils/testContext.ts b/packages/amqp/test/utils/testContext.ts index c72f44ce..1f35c263 100644 --- a/packages/amqp/test/utils/testContext.ts +++ b/packages/amqp/test/utils/testContext.ts @@ -94,7 +94,7 @@ export async function registerDependencies( dependencyOverrides: DependencyOverrides = {}, queuesEnabled = true, ) { - const diContainer = createContainer({ + const diContainer = createContainer({ injectionMode: 'PROXY', }) const awilixManager = new AwilixManager({ diff --git a/packages/core/package.json b/packages/core/package.json index bb633b3b..55f49a59 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -44,7 +44,7 @@ "@types/node": "^25.0.2", "@types/tmp": "^0.2.6", "@vitest/coverage-v8": "^4.0.15", - "awilix": "^12.0.5", + "awilix": "^13.0.3", "awilix-manager": "^6.1.0", "rimraf": "^6.0.1", "typescript": "^5.9.2", diff --git a/packages/core/test/testContext.ts b/packages/core/test/testContext.ts index 0a1cc8fc..6510b9f4 100644 --- a/packages/core/test/testContext.ts +++ b/packages/core/test/testContext.ts @@ -44,7 +44,7 @@ export const TestEvents = { export type TestEventsType = (typeof TestEvents)[keyof typeof TestEvents][] export async function registerDependencies(dependencyOverrides: DependencyOverrides = {}) { - const diContainer = createContainer({ + const diContainer = createContainer({ injectionMode: 'PROXY', }) const awilixManager = new AwilixManager({ diff --git a/packages/gcp-pubsub/package.json b/packages/gcp-pubsub/package.json index dd9bb8eb..594f4fe0 100644 --- a/packages/gcp-pubsub/package.json +++ b/packages/gcp-pubsub/package.json @@ -46,7 +46,7 @@ "@message-queue-toolkit/schemas": "*", "@types/node": "^25.0.2", "@vitest/coverage-v8": "^4.0.15", - "awilix": "^12.0.5", + "awilix": "^13.0.3", "awilix-manager": "^6.1.0", "ioredis": "^5.7.0", "rimraf": "^6.0.1", diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts index 50cc00f2..2ee60272 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts @@ -230,32 +230,30 @@ describe('PubSubPermissionConsumer - Deduplication', () => { expect(consumer.addCounter).toBe(1) }) - it( - 'processes message when lock acquisition has non-timeout error', - { timeout: 15000 }, - async () => { - const messageId = randomUUID() - - // Mock acquireLock to simulate a non-timeout error (e.g., Redis connection error) - // Non-timeout errors are swallowed and message is processed normally - vi.spyOn(messageDeduplicationStore, 'acquireLock').mockResolvedValue({ - error: new Error('Redis connection error'), - }) - - const message: PERMISSIONS_ADD_MESSAGE_TYPE = { - id: messageId, - messageType: 'add', - timestamp: new Date().toISOString(), - userIds: ['user1'], - } - - await publisher.publish(message) - - // Message should be processed even though lock acquisition failed - const result = await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed') - expect(result.processingResult.status).toBe('consumed') - expect(consumer.addCounter).toBe(1) - }, - ) + it('processes message when lock acquisition has non-timeout error', { + timeout: 15000, + }, async () => { + const messageId = randomUUID() + + // Mock acquireLock to simulate a non-timeout error (e.g., Redis connection error) + // Non-timeout errors are swallowed and message is processed normally + vi.spyOn(messageDeduplicationStore, 'acquireLock').mockResolvedValue({ + error: new Error('Redis connection error'), + }) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + // Message should be processed even though lock acquisition failed + const result = await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed') + expect(result.processingResult.status).toBe('consumed') + expect(consumer.addCounter).toBe(1) + }) }) }) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts index 9543e596..a0d39b81 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts @@ -125,89 +125,85 @@ describe('PubSubPermissionConsumer - Payload Offloading', () => { expect(consumer.addCounter).toBe(1) }) - it( - 'consumes offloaded message with array field and validates schema correctly', - { timeout: 10000 }, - async () => { - // Create a large array of userIds to trigger offloading (need > 10MB) - // Each userId needs to be ~1000 chars to make 10,500 items exceed 10MB - const largeUserIdArray = Array.from( - { length: 10500 }, - (_, i) => `user-${i}-${'x'.repeat(1000)}`, - ) + it('consumes offloaded message with array field and validates schema correctly', { + timeout: 10000, + }, async () => { + // Create a large array of userIds to trigger offloading (need > 10MB) + // Each userId needs to be ~1000 chars to make 10,500 items exceed 10MB + const largeUserIdArray = Array.from( + { length: 10500 }, + (_, i) => `user-${i}-${'x'.repeat(1000)}`, + ) - const message = { - id: 'large-array-message-1', - messageType: 'add', - timestamp: new Date().toISOString(), - userIds: largeUserIdArray, - } satisfies PERMISSIONS_ADD_MESSAGE_TYPE + const message = { + id: 'large-array-message-1', + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: largeUserIdArray, + } satisfies PERMISSIONS_ADD_MESSAGE_TYPE - // Verify the message is large enough to trigger offloading - expect(JSON.stringify(message).length).toBeGreaterThan(largeMessageSizeThreshold) + // Verify the message is large enough to trigger offloading + expect(JSON.stringify(message).length).toBeGreaterThan(largeMessageSizeThreshold) - await publisher.publish(message) + await publisher.publish(message) - // Wait for the message to be consumed - const consumptionResult = await consumer.handlerSpy.waitForMessageWithId( - message.id, - 'consumed', - ) + // Wait for the message to be consumed + const consumptionResult = await consumer.handlerSpy.waitForMessageWithId( + message.id, + 'consumed', + ) - // Verify the full payload was received including the large array - expect(consumptionResult.message).toMatchObject({ - id: message.id, - messageType: message.messageType, - userIds: largeUserIdArray, - }) - expect(consumptionResult.message.userIds).toHaveLength(largeUserIdArray.length) - expect(consumer.addCounter).toBe(1) - }, - ) - - it( - 'validates schema correctly after retrieving offloaded payload', - { timeout: 10000 }, - async () => { - // Create a message with metadata that will be validated against the schema - const message = { - id: 'schema-validation-1', - messageType: 'add', - timestamp: new Date().toISOString(), - metadata: { - largeField: 'x'.repeat(largeMessageSizeThreshold + 1000), - }, - userIds: ['test-user'], - } satisfies PERMISSIONS_ADD_MESSAGE_TYPE - - expect(JSON.stringify(message).length).toBeGreaterThan(largeMessageSizeThreshold) - - await publisher.publish(message) - - const consumptionResult = await consumer.handlerSpy.waitForMessageWithId( - message.id, - 'consumed', - ) + // Verify the full payload was received including the large array + expect(consumptionResult.message).toMatchObject({ + id: message.id, + messageType: message.messageType, + userIds: largeUserIdArray, + }) + expect(consumptionResult.message.userIds).toHaveLength(largeUserIdArray.length) + expect(consumer.addCounter).toBe(1) + }) - // Verify all fields were properly deserialized and validated - expect(consumptionResult.message).toMatchObject({ - id: message.id, - messageType: message.messageType, - userIds: message.userIds, - metadata: { - largeField: message.metadata.largeField, - }, - }) - - // Type guard to access metadata property - if (consumptionResult.message.messageType === 'add') { - expect(consumptionResult.message.metadata?.largeField).toHaveLength( - message.metadata.largeField.length, - ) - } - expect(consumer.addCounter).toBe(1) - }, - ) + it('validates schema correctly after retrieving offloaded payload', { + timeout: 10000, + }, async () => { + // Create a message with metadata that will be validated against the schema + const message = { + id: 'schema-validation-1', + messageType: 'add', + timestamp: new Date().toISOString(), + metadata: { + largeField: 'x'.repeat(largeMessageSizeThreshold + 1000), + }, + userIds: ['test-user'], + } satisfies PERMISSIONS_ADD_MESSAGE_TYPE + + expect(JSON.stringify(message).length).toBeGreaterThan(largeMessageSizeThreshold) + + await publisher.publish(message) + + const consumptionResult = await consumer.handlerSpy.waitForMessageWithId( + message.id, + 'consumed', + ) + + // Verify all fields were properly deserialized and validated + expect(consumptionResult.message).toMatchObject({ + id: message.id, + messageType: message.messageType, + userIds: message.userIds, + metadata: { + largeField: message.metadata.largeField, + }, + }) + + // Type guard to access metadata property + if (consumptionResult.message.messageType === 'add') { + expect(consumptionResult.message.metadata?.largeField).toHaveLength( + message.metadata.largeField.length, + ) + } + expect(consumer.addCounter).toBe(1) + }) }) describe('payload retrieval errors', () => { diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts index f3839190..6ca3f62d 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts @@ -289,111 +289,109 @@ describe('PubSubPermissionConsumer - Subscription Retry', () => { await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) }) - it( - 'resubscribes after subscription is temporarily unavailable', - { timeout: 30000 }, - async () => { - expect.assertions(2) - - const consumer = new PubSubPermissionConsumer(diContainer.cradle, { - creationConfig: { - topic: { name: TOPIC_NAME }, - subscription: { name: SUBSCRIPTION_NAME }, - }, - subscriptionRetryOptions: { - maxRetries: 5, - baseRetryDelayMs: 500, - maxRetryDelayMs: 2000, - }, - }) - const publisher = new PubSubPermissionPublisher(diContainer.cradle, { - creationConfig: { - topic: { name: TOPIC_NAME }, - }, - }) + it('resubscribes after subscription is temporarily unavailable', { + timeout: 30000, + }, async () => { + expect.assertions(2) - try { - await consumer.start() - await publisher.init() - - // Verify consumer is working initially - const message1 = { - id: 'reconnect-test-1', - messageType: 'add' as const, - timestamp: new Date().toISOString(), - userIds: ['user1'], - } - - await publisher.publish(message1) - await consumer.handlerSpy.waitForMessageWithId('reconnect-test-1', 'consumed') - expect(consumer.addCounter).toBe(1) - - // Delete the subscription while consumer is running - await deletePubSubSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) - - // Wait for consumer to detect the error and reconnect - // The consumer should automatically recreate the subscription via creationConfig - await setTimeout(5000) - - // Verify consumer can process messages after reconnection - const message2 = { - id: 'reconnect-test-2', - messageType: 'add' as const, - timestamp: new Date().toISOString(), - userIds: ['user2'], - } - - await publisher.publish(message2) - await consumer.handlerSpy.waitForMessageWithId('reconnect-test-2', 'consumed') - expect(consumer.addCounter).toBe(2) - } finally { - await consumer.close() - await publisher.close() - } - }, - ) - - it( - 'retries initialization when subscription does not exist initially', - { timeout: 20000 }, - async () => { - expect.assertions(1) - - // First create the topic only (no subscription) - const topic = pubSubClient.topic(TOPIC_NAME) - const [topicExists] = await topic.exists() - if (!topicExists) { - await topic.create() + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + subscriptionRetryOptions: { + maxRetries: 5, + baseRetryDelayMs: 500, + maxRetryDelayMs: 2000, + }, + }) + const publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + }) + + try { + await consumer.start() + await publisher.init() + + // Verify consumer is working initially + const message1 = { + id: 'reconnect-test-1', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user1'], } - const consumer = new PubSubPermissionConsumer(diContainer.cradle, { - locatorConfig: { - topicName: TOPIC_NAME, - subscriptionName: SUBSCRIPTION_NAME, - }, - subscriptionRetryOptions: { - maxRetries: 5, - baseRetryDelayMs: 500, - maxRetryDelayMs: 2000, - }, - }) + await publisher.publish(message1) + await consumer.handlerSpy.waitForMessageWithId('reconnect-test-1', 'consumed') + expect(consumer.addCounter).toBe(1) - // Create subscription after a delay (simulating eventual consistency) - globalThis.setTimeout(async () => { - await topic.createSubscription(SUBSCRIPTION_NAME) - }, 1500) + // Delete the subscription while consumer is running + await deletePubSubSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) - try { - // This should retry and eventually succeed when subscription is created - await consumer.start() + // Wait for consumer to detect the error and reconnect + // The consumer should automatically recreate the subscription via creationConfig + await setTimeout(5000) - // @ts-expect-error - accessing private field for testing - expect(consumer.isConsuming).toBe(true) - } finally { - await consumer.close() + // Verify consumer can process messages after reconnection + const message2 = { + id: 'reconnect-test-2', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user2'], } - }, - ) + + await publisher.publish(message2) + await consumer.handlerSpy.waitForMessageWithId('reconnect-test-2', 'consumed') + expect(consumer.addCounter).toBe(2) + } finally { + await consumer.close() + await publisher.close() + } + }) + + it('retries initialization when subscription does not exist initially', { + timeout: 20000, + }, async () => { + expect.assertions(1) + + // First create the topic only (no subscription) + const topic = pubSubClient.topic(TOPIC_NAME) + const [topicExists] = await topic.exists() + if (!topicExists) { + await topic.create() + } + + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + locatorConfig: { + topicName: TOPIC_NAME, + subscriptionName: SUBSCRIPTION_NAME, + }, + subscriptionRetryOptions: { + maxRetries: 5, + baseRetryDelayMs: 500, + maxRetryDelayMs: 2000, + }, + }) + + // Create subscription after a delay (simulating eventual consistency) + globalThis.setTimeout(() => { + topic.createSubscription(SUBSCRIPTION_NAME).catch((err) => { + expect.unreachable(`Failed to create subscription in delayed callback: ${err}`) + }) + }, 1500) + + try { + // This should retry and eventually succeed when subscription is created + await consumer.start() + + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(true) + } finally { + await consumer.close() + } + }) it('does not attempt reconnection after close is called', { timeout: 15000 }, async () => { expect.assertions(4) diff --git a/packages/gcp-pubsub/test/utils/testContext.ts b/packages/gcp-pubsub/test/utils/testContext.ts index 5259bf96..83677f7c 100644 --- a/packages/gcp-pubsub/test/utils/testContext.ts +++ b/packages/gcp-pubsub/test/utils/testContext.ts @@ -23,7 +23,7 @@ export type DependencyOverrides = Partial const TestLogger: CommonLogger = console export async function registerDependencies(dependencyOverrides: DependencyOverrides = {}) { - const diContainer = createContainer({ + const diContainer = createContainer({ injectionMode: 'PROXY', }) const awilixManager = new AwilixManager({ diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 8853aeff..b4a1b597 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -68,7 +68,7 @@ "@message-queue-toolkit/schemas": ">=7.0.0", "@types/node": "^25.0.2", "@vitest/coverage-v8": "^4.0.15", - "awilix": "^12.0.1", + "awilix": "^13.0.3", "awilix-manager": "^6.0.0", "rimraf": "^6.0.1", "typescript": "^5.9.2", diff --git a/packages/kafka/test/consumer/PermissionConsumer.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.spec.ts index 8e41a3d7..8e3489b4 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.spec.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.spec.ts @@ -674,7 +674,9 @@ describe('PermissionConsumer', () => { expect(processedMessages).toContain('concurrency-3') }) - it('should process messages synchronously across different topics', async () => { + it('should process messages synchronously across different topics', { + timeout: 15000, + }, async () => { // Given const processingOrder: string[] = [] const testMessageIds = ['cross-topic-1', 'cross-topic-2', 'cross-topic-3'] diff --git a/packages/kafka/test/utils/testContext.ts b/packages/kafka/test/utils/testContext.ts index c8576817..8b3f34fb 100644 --- a/packages/kafka/test/utils/testContext.ts +++ b/packages/kafka/test/utils/testContext.ts @@ -34,7 +34,7 @@ type Dependencies = { } export const createTestContext = async (): Promise => { - const diContainer = createContainer({ + const diContainer = createContainer({ injectionMode: 'PROXY', }) const awilixManager = new AwilixManager({ diff --git a/packages/sns/package.json b/packages/sns/package.json index 6f202cfe..35719dbd 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -53,7 +53,7 @@ "@message-queue-toolkit/sqs": "*", "@types/node": "^25.0.2", "@vitest/coverage-v8": "^4.0.15", - "awilix": "^12.1.1", + "awilix": "^13.0.3", "awilix-manager": "^6.1.0", "fauxqs": "^2.0.0", "ioredis": "^5.7.0", diff --git a/packages/sns/test/utils/testContext.ts b/packages/sns/test/utils/testContext.ts index c14b0561..80cc9e79 100644 --- a/packages/sns/test/utils/testContext.ts +++ b/packages/sns/test/utils/testContext.ts @@ -67,7 +67,7 @@ export async function registerDependencies( dependencyOverrides: DependencyOverrides = {}, queuesEnabled = true, ) { - const diContainer = createContainer({ + const diContainer = createContainer({ injectionMode: 'PROXY', }) const awilixManager = new AwilixManager({ diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 83dbe43e..ca65d6d8 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -48,7 +48,7 @@ "@message-queue-toolkit/schemas": "*", "@types/node": "^25.0.2", "@vitest/coverage-v8": "^4.0.15", - "awilix": "^12.0.5", + "awilix": "^13.0.3", "awilix-manager": "^6.1.0", "fauxqs": "^2.0.0", "ioredis": "^5.6.1", diff --git a/packages/sqs/test/utils/testContext.ts b/packages/sqs/test/utils/testContext.ts index 19503e9b..bfc86bee 100644 --- a/packages/sqs/test/utils/testContext.ts +++ b/packages/sqs/test/utils/testContext.ts @@ -25,7 +25,7 @@ export type DependencyOverrides = Partial const TestLogger: CommonLogger = console export async function registerDependencies(dependencyOverrides: DependencyOverrides = {}) { - const diContainer = createContainer({ + const diContainer = createContainer({ injectionMode: 'PROXY', }) const awilixManager = new AwilixManager({