From 95c48491d354979c9510f28cfd6ffda6172b7533 Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Fri, 13 Mar 2026 14:21:20 +0100 Subject: [PATCH 1/3] impr(CLDSRV-852): Change RateLimitClient.grantTokens() to accept resourceClass, resourceId, and measure --- lib/api/apiUtils/rateLimit/client.js | 8 +++++--- .../aws-node-sdk/test/rateLimit/client.js | 14 +++++++------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/api/apiUtils/rateLimit/client.js b/lib/api/apiUtils/rateLimit/client.js index 183ae032b5..203643fc0f 100644 --- a/lib/api/apiUtils/rateLimit/client.js +++ b/lib/api/apiUtils/rateLimit/client.js @@ -38,14 +38,16 @@ class RateLimitClient { * * Used by token reservation system to request capacity in advance. * - * @param {string} bucketName - Bucket name + * @param {string} resourceClass - Resource class name e.g. 'account' or 'bucket' + * @param {string} resourceId - Unique resource ID e.g. bucket name or account ID + * @param {string} measure - ID of rate limit measure e.g. 'rps' * @param {number} requested - Number of tokens requested * @param {number} interval - Interval per request in ms * @param {number} burstCapacity - Burst capacity in ms * @param {RateLimitClient~grantTokens} cb - Callback */ - grantTokens(bucketName, requested, interval, burstCapacity, cb) { - const key = `ratelimit:bucket:${bucketName}:rps:emptyAt`; + grantTokens(resourceClass, resourceId, measure, requested, interval, burstCapacity, cb) { + const key = `ratelimit:${resourceClass}:${resourceId}:${measure}:emptyAt`; const now = Date.now(); this.redis.grantTokens( diff --git a/tests/functional/aws-node-sdk/test/rateLimit/client.js b/tests/functional/aws-node-sdk/test/rateLimit/client.js index d82dbf636a..083eb6271b 100644 --- a/tests/functional/aws-node-sdk/test/rateLimit/client.js +++ b/tests/functional/aws-node-sdk/test/rateLimit/client.js @@ -19,7 +19,7 @@ skipIfRateLimitDisabled('RateLimitClient', () => { after(async () => client.redis.quit().catch(() => {})); beforeEach(async () => { - const keys = await client.redis.keys('ratelimit:bucket:*'); + const keys = await client.redis.keys('ratelimit:*'); if (keys.length > 0) { await client.redis.del(...keys); } @@ -42,7 +42,7 @@ skipIfRateLimitDisabled('RateLimitClient', () => { const interval = 100; // 100ms per request = 10 req/s const burstCapacity = 1000; // 1000ms burst capacity - client.grantTokens(testBucket, requested, interval, burstCapacity, (err, granted) => { + client.grantTokens('bucket', testBucket, 'rps', requested, interval, burstCapacity, (err, granted) => { assert.ifError(err); assert.strictEqual(granted, requested); done(); @@ -55,12 +55,12 @@ skipIfRateLimitDisabled('RateLimitClient', () => { const burstCapacity = 1000; // 1000ms burst capacity // First request - client.grantTokens(testBucket, requested, interval, burstCapacity, (err, granted1) => { + client.grantTokens('bucket', testBucket, 'rps', requested, interval, burstCapacity, (err, granted1) => { assert.ifError(err); assert.strictEqual(granted1, requested); // Second request immediately after - client.grantTokens(testBucket, requested, interval, burstCapacity, (err, granted2) => { + client.grantTokens('bucket', testBucket, 'rps', requested, interval, burstCapacity, (err, granted2) => { assert.ifError(err); assert.strictEqual(granted2, requested); done(); @@ -73,7 +73,7 @@ skipIfRateLimitDisabled('RateLimitClient', () => { const burstCapacity = 500; // 500ms burst capacity = max 5 tokens // Request more tokens than available in burst - client.grantTokens(testBucket, 10, interval, burstCapacity, (err, granted) => { + client.grantTokens('bucket', testBucket, 'rps', 10, interval, burstCapacity, (err, granted) => { assert.ifError(err); // Should grant partial tokens (5 tokens max with 500ms burst) assert(granted > 0, 'Should grant at least some tokens'); @@ -87,12 +87,12 @@ skipIfRateLimitDisabled('RateLimitClient', () => { const burstCapacity = 100; // 100ms burst capacity = max 1 token // First request consumes the burst capacity - client.grantTokens(testBucket, 1, interval, burstCapacity, (err, granted1) => { + client.grantTokens('bucket', testBucket, 'rps', 1, interval, burstCapacity, (err, granted1) => { assert.ifError(err); assert.strictEqual(granted1, 1); // Second request immediately after should be denied - client.grantTokens(testBucket, 1, interval, burstCapacity, (err, granted2) => { + client.grantTokens('bucket', testBucket, 'rps', 1, interval, burstCapacity, (err, granted2) => { assert.ifError(err); assert.strictEqual(granted2, 0, 'Should deny tokens when quota exhausted'); done(); From e3b8cf1b47538e0924216b4d658d96c11cfa819e Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Fri, 13 Mar 2026 14:21:59 +0100 Subject: [PATCH 2/3] impr(CLDSRV-852): Remove unused workers params from interval calculation --- lib/Config.js | 2 +- lib/api/apiUtils/rateLimit/config.js | 25 ++- lib/api/apiUtils/rateLimit/gcra.js | 29 ++-- tests/unit/api/apiUtils/rateLimit/config.js | 176 ++++++++++---------- tests/unit/api/apiUtils/rateLimit/gcra.js | 31 ++-- 5 files changed, 125 insertions(+), 138 deletions(-) diff --git a/lib/Config.js b/lib/Config.js index 20b9cd56c0..f4fdc96044 100644 --- a/lib/Config.js +++ b/lib/Config.js @@ -1873,7 +1873,7 @@ class Config extends EventEmitter { assert(this.localCache, 'localCache must be defined when rate limiting is enabled'); // Parse and validate all rate limiting configuration - this.rateLimiting = parseRateLimitConfig(config.rateLimiting, this.clusters = this.clusters || 1); + this.rateLimiting = parseRateLimitConfig(config.rateLimiting); } diff --git a/lib/api/apiUtils/rateLimit/config.js b/lib/api/apiUtils/rateLimit/config.js index 6b7c3969e7..1b6928b94f 100644 --- a/lib/api/apiUtils/rateLimit/config.js +++ b/lib/api/apiUtils/rateLimit/config.js @@ -199,11 +199,10 @@ const rateLimitConfigSchema = Joi.object({ * * @param {string} resourceClass - Rate limit class name ('bucket' or 'account') * @param {object} validatedCfg - Already validated config from Joi - * @param {number} clusters - Number of worker processes spawned per instance * @param {number} nodes - Number of instances that requests will be load balanced across * @returns {RateLimitClassConfig} Transformed rate limit config */ -function transformClassConfig(resourceClass, validatedCfg, clusters, nodes) { +function transformClassConfig(resourceClass, validatedCfg, nodes) { const transformed = { defaultConfig: undefined, configCacheTTL: validatedCfg.configCacheTTL, @@ -213,23 +212,22 @@ function transformClassConfig(resourceClass, validatedCfg, clusters, nodes) { if (validatedCfg.defaultConfig?.requestsPerSecond) { const { limit, burstCapacity } = validatedCfg.defaultConfig.requestsPerSecond; - // Validate limit against nodes AND workers (business rule) - const minLimit = nodes * clusters; - if (limit > 0 && limit < minLimit) { + // Validate limit against nodes (business rule) + if (limit > 0 && limit < nodes) { throw new Error( `rateLimiting.${resourceClass}.defaultConfig.` + `requestsPerSecond.limit (${limit}) must be >= ` + - `(nodes x workers = ${nodes} x ${clusters} = ${minLimit}) ` + - 'or 0 (unlimited). Each worker enforces limit/nodes/workers locally. ' + - `With limit < ${minLimit}, per-worker rate would be < 1 req/s, effectively blocking traffic.` + `nodes (${nodes}) ` + + 'or 0 (unlimited). Each node enforces limit/nodes locally. ' + + `With limit < ${nodes}, per-node rate would be < 1 req/s, effectively blocking traffic.` ); } // Use provided burstCapacity or fall back to default const effectiveBurstCapacity = burstCapacity || transformed.defaultBurstCapacity; - // Calculate per-worker interval using distributed architecture - const interval = calculateInterval(limit, nodes, clusters); + // Calculate per-node interval using distributed architecture + const interval = calculateInterval(limit, nodes); // Store both the original limit and the calculated values transformed.defaultConfig = { @@ -248,11 +246,10 @@ function transformClassConfig(resourceClass, validatedCfg, clusters, nodes) { * Parse and validate the complete rate limiting configuration * * @param {Object} rateLimitingConfig - config.rateLimiting object from config.json - * @param {number} clusters - Number of worker clusters (must be numeric) * @returns {Object} Fully parsed and validated rate limiting configuration * @throws {Error} If configuration is invalid */ -function parseRateLimitConfig(rateLimitingConfig, clusters) { +function parseRateLimitConfig(rateLimitingConfig) { // Validate configuration using Joi schema const { error: validationError, value: validated } = rateLimitConfigSchema.validate( rateLimitingConfig, @@ -282,8 +279,8 @@ function parseRateLimitConfig(rateLimitingConfig, clusters) { ), }; - parsed.bucket = transformClassConfig('bucket', validated.bucket, clusters, parsed.nodes); - parsed.account = transformClassConfig('account', validated.account, clusters, parsed.nodes); + parsed.bucket = transformClassConfig('bucket', validated.bucket, parsed.nodes); + parsed.account = transformClassConfig('account', validated.account, parsed.nodes); return parsed; } diff --git a/lib/api/apiUtils/rateLimit/gcra.js b/lib/api/apiUtils/rateLimit/gcra.js index 93264aee2e..e14c2e9fc5 100644 --- a/lib/api/apiUtils/rateLimit/gcra.js +++ b/lib/api/apiUtils/rateLimit/gcra.js @@ -1,38 +1,35 @@ /** - * Calculate per-worker interval based on distributed architecture + * Calculate per-node interval based on distributed architecture * - * In a distributed setup with N nodes and W workers per node: + * In a distributed setup with N nodes: * - Global limit: R requests per second - * - Per-worker limit: R / N / W - * - Interval = 1000ms / (R / N / W) + * - Per-node limit: R / N + * - Interval = 1000ms / (R / N) * * The interval represents milliseconds between requests. We divide 1000 (milliseconds * in a second) by the rate to convert "requests per second" to "milliseconds per request". * * Examples: - * - 100 req/s ÷ 1 node ÷ 10 workers = 10 req/s per worker → interval = 100ms - * - 600 req/s ÷ 6 nodes ÷ 10 workers = 10 req/s per worker → interval = 100ms + * - 100 req/s ÷ 1 node = 100 req/s per node → interval = 100ms + * - 600 req/s ÷ 6 nodes = 100 req/s per node → interval = 100ms * * Dynamic work-stealing is achieved through Redis sync reconciliation: - * - Each worker evaluates locally at its fixed per-worker quota - * - Workers report consumed / workers to Redis - * - Redis sums all workers' shares - * - Workers overwrite local counters with Redis values + * - Each worker evaluates locally using preallocated tokens + * - Workers report processed requests to Redis + * - Redis sums all workers' requests * - Idle workers' unused capacity accumulates in Redis * - Busy workers pull back higher emptyAt values and throttle proportionally * - * IMPORTANT: Limit must be >= N * W, otherwise per-worker rate < 1 req/s + * IMPORTANT: Limit must be >= N, otherwise per-node rate < 1 req/s * which results in intervals > 1000ms and effectively blocks traffic. * * @param {number} limit - Global requests per second * @param {number} nodes - Total number of nodes - * @param {number} _workers - Number of workers per node (unused in token reservation) * @returns {number} Interval in milliseconds between requests */ -// eslint-disable-next-line no-unused-vars -function calculateInterval(limit, nodes, _workers) { - // Per-node rate = limit / nodes (workers NOT divided) - // This allows dynamic work-stealing - workers evaluate at node quota + +function calculateInterval(limit, nodes) { + // Per-node rate = limit / nodes const perNodeRate = limit / nodes; // Interval = 1000ms / rate diff --git a/tests/unit/api/apiUtils/rateLimit/config.js b/tests/unit/api/apiUtils/rateLimit/config.js index 8d6d5ae95b..118032603a 100644 --- a/tests/unit/api/apiUtils/rateLimit/config.js +++ b/tests/unit/api/apiUtils/rateLimit/config.js @@ -26,7 +26,7 @@ describe('parseRateLimitConfig', () => { describe('valid configurations', () => { it('should parse complete valid configuration', () => { - const result = parseRateLimitConfig(validConfig, 10); + const result = parseRateLimitConfig(validConfig); assert.strictEqual(result.enabled, false); // Default when not specified assert.strictEqual(result.serviceUserArn, validConfig.serviceUserArn); @@ -44,7 +44,7 @@ describe('parseRateLimitConfig', () => { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', }; - const result = parseRateLimitConfig(minimalConfig, 5); + const result = parseRateLimitConfig(minimalConfig); assert.strictEqual(result.enabled, false); // Default assert.strictEqual(result.serviceUserArn, minimalConfig.serviceUserArn); @@ -68,7 +68,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 3); + const result = parseRateLimitConfig(config); assert.strictEqual(result.bucket.configCacheTTL, 600); assert.strictEqual(result.bucket.defaultConfig, undefined); @@ -80,7 +80,7 @@ describe('parseRateLimitConfig', () => { bucket: {}, }; - const result = parseRateLimitConfig(config, 2); + const result = parseRateLimitConfig(config); assert.strictEqual(result.bucket.configCacheTTL, constants.rateLimitDefaultConfigCacheTTL); }); @@ -91,7 +91,7 @@ describe('parseRateLimitConfig', () => { error: {}, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.error.code, errors.SlowDown.code); assert.strictEqual(result.error.description, errors.SlowDown.description); @@ -104,7 +104,7 @@ describe('parseRateLimitConfig', () => { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.enabled, false); }); @@ -114,7 +114,7 @@ describe('parseRateLimitConfig', () => { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.enabled, true); }); @@ -124,7 +124,7 @@ describe('parseRateLimitConfig', () => { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.enabled, false); }); @@ -135,7 +135,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid/ ); }); @@ -146,7 +146,7 @@ describe('parseRateLimitConfig', () => { const config = { nodes: 1 }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"serviceUserArn" is required/ ); }); @@ -157,7 +157,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"serviceUserArn" must be a string/ ); }); @@ -170,7 +170,7 @@ describe('parseRateLimitConfig', () => { nodes: 5, }; - const result = parseRateLimitConfig(config, 10); + const result = parseRateLimitConfig(config); assert.strictEqual(result.nodes, 5); }); @@ -179,7 +179,7 @@ describe('parseRateLimitConfig', () => { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', }; - const result = parseRateLimitConfig(config, 2); + const result = parseRateLimitConfig(config); assert.strictEqual(result.nodes, 1); }); @@ -190,7 +190,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"nodes" must be a positive number/ ); }); @@ -202,7 +202,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"nodes" must be a positive number/ ); }); @@ -214,7 +214,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"nodes" must be an integer/ ); }); @@ -226,7 +226,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"nodes" must be a number/ ); }); @@ -238,7 +238,7 @@ describe('parseRateLimitConfig', () => { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.tokenBucketBufferSize, 50); }); @@ -248,7 +248,7 @@ describe('parseRateLimitConfig', () => { tokenBucketBufferSize: 100, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.tokenBucketBufferSize, 100); }); @@ -259,7 +259,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"tokenBucketBufferSize" must be a positive number/ ); }); @@ -271,7 +271,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"tokenBucketBufferSize" must be a positive number/ ); }); @@ -283,7 +283,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"tokenBucketBufferSize" must be an integer/ ); }); @@ -295,7 +295,7 @@ describe('parseRateLimitConfig', () => { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.tokenBucketRefillThreshold, 20); }); @@ -305,7 +305,7 @@ describe('parseRateLimitConfig', () => { tokenBucketRefillThreshold: 30, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.tokenBucketRefillThreshold, 30); }); @@ -316,7 +316,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"tokenBucketRefillThreshold" must be a positive number/ ); }); @@ -328,7 +328,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"tokenBucketRefillThreshold" must be a positive number/ ); }); @@ -340,7 +340,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"tokenBucketRefillThreshold" must be an integer/ ); }); @@ -354,7 +354,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"bucket" must be of type object/ ); }); @@ -371,7 +371,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 5); + const result = parseRateLimitConfig(config); assert(result.bucket.defaultConfig); assert(result.bucket.defaultConfig.requestsPerSecond); @@ -386,7 +386,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"bucket.defaultConfig" must be of type object/ ); }); @@ -402,7 +402,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"bucket.defaultConfig.requestsPerSecond" must be of type object/ ); }); @@ -419,7 +419,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); // limit = 0 means unlimited, should be accepted assert(result.bucket.defaultConfig.requestsPerSecond); }); @@ -437,7 +437,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), // eslint-disable-next-line max-len /rateLimiting configuration is invalid.*"bucket.defaultConfig.requestsPerSecond.limit" must be larger than or equal to 0/ ); @@ -456,7 +456,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"bucket.defaultConfig.requestsPerSecond.limit" is required/ ); }); @@ -475,7 +475,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); const bucketSize = result.bucket.defaultConfig.requestsPerSecond.bucketSize; // bucketSize = burstCapacity * 1000 assert.strictEqual(bucketSize, constants.rateLimitDefaultBurstCapacity * 1000); @@ -494,7 +494,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); const bucketSize = result.bucket.defaultConfig.requestsPerSecond.bucketSize; assert.strictEqual(bucketSize, 20 * 1000); }); @@ -513,7 +513,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), // eslint-disable-next-line max-len /rateLimiting configuration is invalid.*"bucket.defaultConfig.requestsPerSecond.burstCapacity" must be a positive number/ ); @@ -533,7 +533,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), // eslint-disable-next-line max-len /rateLimiting configuration is invalid.*"bucket.defaultConfig.requestsPerSecond.burstCapacity" must be a positive number/ ); @@ -553,7 +553,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), // eslint-disable-next-line max-len /rateLimiting configuration is invalid.*"bucket.defaultConfig.requestsPerSecond.burstCapacity" must be a number/ ); @@ -572,7 +572,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); const bucketSize = result.bucket.defaultConfig.requestsPerSecond.bucketSize; assert.strictEqual(bucketSize, 1.5 * 1000); }); @@ -587,7 +587,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.bucket.configCacheTTL, 450); }); @@ -600,7 +600,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"bucket.configCacheTTL" must be a positive number/ ); }); @@ -614,7 +614,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"bucket.configCacheTTL" must be a positive number/ ); }); @@ -628,7 +628,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"bucket.configCacheTTL" must be an integer/ ); }); @@ -642,7 +642,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"bucket.configCacheTTL" must be a number/ ); }); @@ -656,7 +656,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"error" must be of type object/ ); }); @@ -671,7 +671,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result4xx = parseRateLimitConfig(config4xx, 1); + const result4xx = parseRateLimitConfig(config4xx); assert.strictEqual(result4xx.error.code, 429); assert.strictEqual(result4xx.error.description, 'Too Many Requests'); @@ -684,7 +684,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result5xx = parseRateLimitConfig(config5xx, 1); + const result5xx = parseRateLimitConfig(config5xx); assert.strictEqual(result5xx.error.code, 503); assert.strictEqual(result5xx.error.description, 'Service Unavailable'); }); @@ -699,7 +699,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"error.statusCode" must be larger than or equal to 400/ ); }); @@ -714,7 +714,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"error.statusCode" must be less than or equal to 599/ ); }); @@ -729,7 +729,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"error.statusCode" must be an integer/ ); }); @@ -744,7 +744,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"error.statusCode" must be a number/ ); }); @@ -757,7 +757,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.error.code, 503); assert.strictEqual(result.error.message, 'SlowDown'); assert.strictEqual(result.error.description, errors.SlowDown.description); @@ -773,7 +773,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"error.message" must be a string/ ); }); @@ -783,7 +783,7 @@ describe('parseRateLimitConfig', () => { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.error.code, errors.SlowDown.code); assert.strictEqual(result.error.description, errors.SlowDown.description); }); @@ -798,7 +798,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.error.code, 429); assert.strictEqual(result.error.message, 'TooManyRequests'); assert.strictEqual(result.error.description, 'Please slow down'); @@ -813,7 +813,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.error.code, 429); assert.strictEqual(result.error.message, 'SlowDown'); assert.strictEqual(result.error.description, 'Please slow down'); @@ -830,29 +830,29 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"error.code" must be a string/ ); }); }); describe('distributed rate limiting validation', () => { - it('should validate limit against nodes and workers', () => { + it('should validate limit against nodes', () => { const config = { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', nodes: 5, bucket: { defaultConfig: { requestsPerSecond: { - limit: 10, // Less than 5 nodes x 10 workers = 50 + limit: 3, // Less than 5 nodes }, }, }, }; assert.throws( - () => parseRateLimitConfig(config, 10), - /requestsPerSecond\.limit \(10\) must be >= \(nodes x workers = 5 x 10 = 50\)/ + () => parseRateLimitConfig(config), + /requestsPerSecond\.limit \(3\) must be >= nodes \(5\)/ ); }); }); @@ -873,7 +873,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 5); + const result = parseRateLimitConfig(config); assert(result.account); assert(result.account.defaultConfig); @@ -901,7 +901,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 5); + const result = parseRateLimitConfig(config); assert(result.bucket.defaultConfig); assert.strictEqual(result.bucket.defaultConfig.limit, 1000); @@ -909,22 +909,22 @@ describe('parseRateLimitConfig', () => { assert.strictEqual(result.account.defaultConfig.limit, 500); }); - it('should validate account limit against nodes and workers', () => { + it('should validate account limit against nodes', () => { const config = { serviceUserArn: 'arn:aws:iam::123456789012:user/rate-limit-service', nodes: 10, account: { defaultConfig: { requestsPerSecond: { - limit: 20, // Less than 10 nodes x 5 workers = 50 + limit: 7, // Less than 10 nodes }, }, }, }; assert.throws( - () => parseRateLimitConfig(config, 5), - /requestsPerSecond\.limit \(20\) must be >= \(nodes x workers = 10 x 5 = 50\)/ + () => parseRateLimitConfig(config), + /requestsPerSecond\.limit \(7\) must be >= nodes \(10\)/ ); }); @@ -935,7 +935,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"account" must be of type object/ ); }); @@ -952,7 +952,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); assert.strictEqual(result.account.configCacheTTL, constants.rateLimitDefaultConfigCacheTTL); assert.strictEqual(result.account.defaultBurstCapacity, constants.rateLimitDefaultBurstCapacity); @@ -972,7 +972,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); const bucketSize = result.account.defaultConfig.requestsPerSecond.bucketSize; assert.strictEqual(bucketSize, constants.rateLimitDefaultBurstCapacity * 1000); }); @@ -990,7 +990,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); const bucketSize = result.account.defaultConfig.requestsPerSecond.bucketSize; assert.strictEqual(bucketSize, 20 * 1000); }); @@ -1008,7 +1008,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); const bucketSize = result.account.defaultConfig.requestsPerSecond.bucketSize; assert.strictEqual(bucketSize, 1.5 * 1000); }); @@ -1027,7 +1027,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), // eslint-disable-next-line max-len /rateLimiting configuration is invalid.*"account.defaultConfig.requestsPerSecond.burstCapacity" must be a positive number/ ); @@ -1047,7 +1047,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), // eslint-disable-next-line max-len /rateLimiting configuration is invalid.*"account.defaultConfig.requestsPerSecond.burstCapacity" must be a positive number/ ); @@ -1067,7 +1067,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), // eslint-disable-next-line max-len /rateLimiting configuration is invalid.*"account.defaultConfig.requestsPerSecond.burstCapacity" must be a number/ ); @@ -1082,7 +1082,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"unknownField" is not allowed/ ); }); @@ -1096,7 +1096,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"bucket.unknownField" is not allowed/ ); }); @@ -1111,7 +1111,7 @@ describe('parseRateLimitConfig', () => { }; assert.throws( - () => parseRateLimitConfig(config, 1), + () => parseRateLimitConfig(config), /rateLimiting configuration is invalid.*"error.unknownField" is not allowed/ ); }); @@ -1128,7 +1128,7 @@ describe('parseRateLimitConfig', () => { assert.throws(() => { try { - parseRateLimitConfig(config, 1); + parseRateLimitConfig(config); } catch (error) { // Should contain multiple errors assert(error.message.includes('serviceUserArn')); @@ -1155,11 +1155,10 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 5); // 5 workers (ignored) + const result = parseRateLimitConfig(config); - // NEW BEHAVIOR: Per-NODE rate = 100 / 2 nodes = 50 req/s (workers NOT divided) + // Per-NODE rate = 100 / 2 nodes = 50 req/s // Interval = 1000ms / 50 = 20ms - // Workers can dynamically share node quota via Redis reconciliation const interval = result.bucket.defaultConfig.requestsPerSecond.interval; assert.strictEqual(interval, 20); }); @@ -1177,7 +1176,7 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); + const result = parseRateLimitConfig(config); // bucketSize = burstCapacity * 1000 const bucketSize = result.bucket.defaultConfig.requestsPerSecond.bucketSize; @@ -1197,9 +1196,9 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 1); // 1 worker + const result = parseRateLimitConfig(config); - // Per-worker rate = 50 / 1 / 1 = 50 req/s + // Per-node rate = 50 / 1 = 50 req/s // Interval = 1000ms / 50 = 20ms const interval = result.bucket.defaultConfig.requestsPerSecond.interval; assert.strictEqual(interval, 20); @@ -1218,11 +1217,10 @@ describe('parseRateLimitConfig', () => { }, }; - const result = parseRateLimitConfig(config, 20); // 20 workers per node (ignored) + const result = parseRateLimitConfig(config); - // NEW BEHAVIOR: Per-NODE rate = 10000 / 10 nodes = 1000 req/s (workers NOT divided) + // Per-NODE rate = 10000 / 10 nodes = 1000 req/s // Interval = 1000ms / 1000 = 1ms - // Workers dynamically share the 1000 req/s node quota via Redis const interval = result.bucket.defaultConfig.requestsPerSecond.interval; assert.strictEqual(interval, 1); }); diff --git a/tests/unit/api/apiUtils/rateLimit/gcra.js b/tests/unit/api/apiUtils/rateLimit/gcra.js index 0684cb365d..484b47fa76 100644 --- a/tests/unit/api/apiUtils/rateLimit/gcra.js +++ b/tests/unit/api/apiUtils/rateLimit/gcra.js @@ -3,15 +3,14 @@ const assert = require('assert'); const { calculateInterval } = require('../../../../../lib/api/apiUtils/rateLimit/gcra'); describe('GCRA calculateInterval function', () => { - it('should calculate interval for 1770 req/s across 177 nodes (ignoring workers)', () => { + it('should calculate interval for 1770 req/s across 177 nodes', () => { const limit = 1770; const nodes = 177; - const workers = 10; // Ignored in new implementation - // Per-NODE rate = 1770 / 177 = 10 req/s (workers NOT divided) + // Per-NODE rate = 1770 / 177 = 10 req/s // Interval = 1000 / 10 = 100ms - // This allows dynamic work-stealing: busy workers can use full node quota - const interval = calculateInterval(limit, nodes, workers); + // Workers on the same node share node quota dynamically via Redis reconciliation + const interval = calculateInterval(limit, nodes); assert.strictEqual(interval, 100); }); @@ -19,24 +18,22 @@ describe('GCRA calculateInterval function', () => { it('should calculate interval for 1000 req/s with single node', () => { const limit = 1000; const nodes = 1; - const workers = 1; // Ignored - // Per-NODE rate = 1000 / 1 = 1000 req/s (workers NOT divided) + // Per-NODE rate = 1000 / 1 = 1000 req/s // Interval = 1000 / 1000 = 1ms - const interval = calculateInterval(limit, nodes, workers); + const interval = calculateInterval(limit, nodes); assert.strictEqual(interval, 1); }); - it('should calculate interval for 100 req/s with 10 workers on single node', () => { + it('should calculate interval for 100 req/s on a single node', () => { const limit = 100; const nodes = 1; - const workers = 10; // Ignored in new implementation - // Per-NODE rate = 100 / 1 = 100 req/s (workers NOT divided) + // Per-NODE rate = 100 / 1 = 100 req/s // Interval = 1000 / 100 = 10ms - // Each worker evaluates at node quota, Redis reconciliation shares capacity - const interval = calculateInterval(limit, nodes, workers); + // Workers share the node quota; Redis reconciliation distributes capacity + const interval = calculateInterval(limit, nodes); assert.strictEqual(interval, 10); }); @@ -44,11 +41,10 @@ describe('GCRA calculateInterval function', () => { it('should handle fractional intervals', () => { const limit = 3000; const nodes = 177; - const workers = 10; // Ignored - // Per-NODE rate = 3000 / 177 = 16.95 req/s (workers NOT divided) + // Per-NODE rate = 3000 / 177 = 16.95 req/s // Interval = 1000 / 16.95 = 58.99ms - const interval = calculateInterval(limit, nodes, workers); + const interval = calculateInterval(limit, nodes); assert.strictEqual(Math.floor(interval), 58); }); @@ -56,7 +52,6 @@ describe('GCRA calculateInterval function', () => { it('should demonstrate dynamic work-stealing behavior', () => { const limit = 600; const nodes = 6; - const workers = 10; // Per-NODE rate = 600 / 6 = 100 req/s // Interval = 1000 / 100 = 10ms per request @@ -65,7 +60,7 @@ describe('GCRA calculateInterval function', () => { // - If 1 worker is busy, 9 idle: busy worker can use ~100 req/s // - If all 10 workers are busy: they share the 100 req/s (~10 req/s each) // - Redis reconciliation dynamically balances across active workers - const interval = calculateInterval(limit, nodes, workers); + const interval = calculateInterval(limit, nodes); assert.strictEqual(interval, 10); // Worker quota is NOT pre-divided: 100 req/s node quota available From 4cc343bd666ddc9f300dbc07d27df317addee775 Mon Sep 17 00:00:00 2001 From: Taylor McKinnon Date: Fri, 13 Mar 2026 14:23:18 +0100 Subject: [PATCH 3/3] impr(CLDSRV-852): Refactor TokenBucket class - Change constructor to accept resourceClass, resourceId, measure - Precalculate interval for reuse in constructor - Add hasCapacity() method - Add updateLimit() method to encasulate limit change logic - Remove unused mechanism to record request durations - Simplify refill logic --- lib/api/apiUtils/rateLimit/tokenBucket.js | 199 ++++------ lib/api/bucketDeleteRateLimit.js | 2 +- .../api/apiUtils/rateLimit/tokenBucket.js | 355 +++++++----------- 3 files changed, 206 insertions(+), 350 deletions(-) diff --git a/lib/api/apiUtils/rateLimit/tokenBucket.js b/lib/api/apiUtils/rateLimit/tokenBucket.js index 26b7850187..669c88a84f 100644 --- a/lib/api/apiUtils/rateLimit/tokenBucket.js +++ b/lib/api/apiUtils/rateLimit/tokenBucket.js @@ -16,32 +16,44 @@ const { instance: redisClient } = require('./client'); const { config } = require('../../../Config'); const { calculateInterval } = require('./gcra'); -// Map of bucket name -> WorkerTokenBucket instance +// Map of `${resourceClass}:${resourceId}:${measure}` -> WorkerTokenBucket instance const tokenBuckets = new Map(); /** * Per-bucket token bucket for a single worker */ class WorkerTokenBucket { - constructor(bucketName, limitConfig, log) { - this.bucketName = bucketName; + constructor(resourceClass, resourceId, measure, limitConfig, log) { + this.resourceClass = resourceClass; + this.resourceId = resourceId; + this.measure = measure; this.limitConfig = limitConfig; this.log = log; - // Token buffer configuration - this.bufferSize = config.rateLimiting?.tokenBucketBufferSize || 50; // Max tokens to hold - this.refillThreshold = config.rateLimiting?.tokenBucketRefillThreshold || 20; // Trigger refill when below this + this.bufferSize = config.rateLimiting?.tokenBucketBufferSize; // Max tokens to hold + this.refillThreshold = config.rateLimiting?.tokenBucketRefillThreshold; // Trigger refill when below this this.tokens = this.bufferSize; // Start with full buffer for fail-open at startup - - // Refill state - this.refillInProgress = false; + this.interval = calculateInterval(this.limitConfig.limit, config.rateLimiting.nodes); this.lastRefillTime = Date.now(); this.refillCount = 0; - this.requestCounter = 0; - this.lastRequestTime = 0; + this.refillInProgress = false; + } - // Track consumption rate for adaptive refill - this.requestTimestamps = new Set(); + hasCapacity() { + return this.tokens > 0; + } + + updateLimit(updatedConfig) { + if (this.limitConfig.limit !== updatedConfig.limit || + this.limitConfig.burstCapacity !== updatedConfig.burstCapacity + ) { + const oldConfig = this.limitConfig; + this.limitConfig = updatedConfig; + this.interval = calculateInterval(updatedConfig.limit, config.rateLimiting.nodes); + return { updated: true, oldConfig }; + } + + return { updated: false }; } /** @@ -50,11 +62,8 @@ class WorkerTokenBucket { * @returns {boolean} True if request allowed, false if throttled */ tryConsume() { - // Record request for rate calculation - this.recordRequest(); - if (this.tokens > 0) { - this.tokens--; + this.tokens -= 1; return true; // ALLOWED } @@ -78,16 +87,6 @@ class WorkerTokenBucket { return; } - // Trigger async refill - await this.refill(); - } - - /** - * Request tokens from Redis using GCRA enforcement - * - * @returns {Promise} - */ - async refill() { this.refillInProgress = true; const startTime = Date.now(); @@ -100,21 +99,16 @@ class WorkerTokenBucket { } // Calculate GCRA parameters - const nodes = config.rateLimiting.nodes || 1; - const workers = config.clusters || 1; - const interval = calculateInterval(this.limitConfig.limit, nodes, workers); - const burstCapacitySeconds = - config.rateLimiting.bucket?.defaultConfig?.requestsPerSecond?.burstCapacity || 1; - const burstCapacity = burstCapacitySeconds * 1000; - let granted = requested; if (redisClient.isReady()) { // Request tokens from Redis (atomic GCRA enforcement) granted = await util.promisify(redisClient.grantTokens.bind(redisClient))( - this.bucketName, + this.resourceClass, + this.resourceId, + this.measure, requested, - interval, - burstCapacity, + this.interval, + this.limitConfig.burstCapacity, ); } else { // Connection to redis has failed in some way. @@ -122,7 +116,11 @@ class WorkerTokenBucket { // We grant the requested amount of tokens anyway to avoid degrading service availability. this.log.warn( 'rate limit redis client not connected. granting tokens anyway to avoid service degradation', - { bucketName: this.bucketName }, + { + resourceClass: this.resourceClass, + resourceId: this.resourceId, + measure: this.measure, + }, ); } @@ -134,7 +132,9 @@ class WorkerTokenBucket { const duration = this.lastRefillTime - startTime; this.log.debug('Token refill completed', { - bucketName: this.bucketName, + resourceClass: this.resourceClass, + resourceId: this.resourceId, + measure: this.measure, requested, granted, newBalance: this.tokens, @@ -145,21 +145,27 @@ class WorkerTokenBucket { // Warn if refill took too long or granted too few if (duration > 100) { this.log.warn('Slow token refill detected', { - bucketName: this.bucketName, + resourceClass: this.resourceClass, + resourceId: this.resourceId, + measure: this.measure, durationMs: duration, }); } if (granted === 0 && requested > 0) { this.log.trace('Token refill denied - quota exhausted', { - bucketName: this.bucketName, + resourceClass: this.resourceClass, + resourceId: this.resourceId, + measure: this.measure, requested, }); } } catch (err) { this.log.error('Token refill failed', { - bucketName: this.bucketName, + resourceClass: this.resourceClass, + resourceId: this.resourceId, + measure: this.measure, error: err.message, stack: err.stack, }); @@ -167,96 +173,39 @@ class WorkerTokenBucket { this.refillInProgress = false; } } - - /** - * Record request timestamp for rate calculation - */ - recordRequest() { - const now = Date.now(); - if (now == this.lastRequestTime) { - this.requestCounter++; - } else { - this.lastRequestTime = now; - this.requestCounter = 0; - } - - this.requestTimestamps.add(now * 1000 + this.requestCounter); - - // Keep only last 1 second of timestamps - const cutoff = (now - 1000) * 1000; - for (const timestamp of this.requestTimestamps.values()) { - if (timestamp < cutoff) { - this.requestTimestamps.delete(timestamp); - } - } - } - - /** - * Get current request rate (requests per second) - * - * @returns {number} - */ - getCurrentRate() { - const now = Date.now(); - const cutoff = (now - 1000) * 1000; - - let count = 0; - for (const timestamp of this.requestTimestamps.values()) { - if (timestamp >= cutoff) { - count++; - } - } - return count; - } - - /** - * Get token bucket stats for monitoring - * - * @returns {object} - */ - getStats() { - return { - tokens: this.tokens, - bufferSize: this.bufferSize, - refillThreshold: this.refillThreshold, - refillInProgress: this.refillInProgress, - lastRefillTime: this.lastRefillTime, - refillCount: this.refillCount, - currentRate: this.getCurrentRate(), - }; - } } /** * Get or create token bucket for a bucket * - * @param {string} bucketName - Bucket name + * @param {string} resourceClass - "bucket" or "account" + * @param {string} resourceId - bucket name or account canonicalId + * @param {string} measure - measure id e.g. "rps" * @param {object} limitConfig - Rate limit configuration * @param {object} log - Logger instance * @returns {WorkerTokenBucket} */ -function getTokenBucket(bucketName, limitConfig, log) { - let bucket = tokenBuckets.get(bucketName); - +function getTokenBucket(resourceClass, resourceId, measure, limitConfig, log) { + const cacheKey = `${resourceClass}:${resourceId}:${measure}`; + let bucket = tokenBuckets.get(cacheKey); if (!bucket) { - bucket = new WorkerTokenBucket(bucketName, limitConfig, log); - tokenBuckets.set(bucketName, bucket); + bucket = new WorkerTokenBucket(resourceClass, resourceId, measure, limitConfig, log); + tokenBuckets.set(cacheKey, bucket); log.debug('Created token bucket', { - bucketName, + cacheKey, bufferSize: bucket.bufferSize, refillThreshold: bucket.refillThreshold, }); - } else if (bucket.limitConfig.limit !== limitConfig.limit) { - // Update limit config when it changes dynamically - const oldLimit = bucket.limitConfig.limit; - bucket.limitConfig = limitConfig; - - log.info('Updated token bucket limit config', { - bucketName, - oldLimit, - newLimit: limitConfig.limit, - }); + } else { + const { updated, oldConfig } = bucket.updateLimit(limitConfig); + if (updated) { + log.info('Updated token bucket limit config', { + cacheKey, + old: oldConfig, + new: limitConfig, + }); + } } return bucket; @@ -282,15 +231,15 @@ function cleanupTokenBuckets(maxIdleMs = 60000) { const now = Date.now(); const toRemove = []; - for (const [bucketName, bucket] of tokenBuckets.entries()) { + for (const [key, bucket] of tokenBuckets.entries()) { const idleTime = now - bucket.lastRefillTime; if (idleTime > maxIdleMs && bucket.tokens === 0) { - toRemove.push(bucketName); + toRemove.push(key); } } - for (const bucketName of toRemove) { - tokenBuckets.delete(bucketName); + for (const key of toRemove) { + tokenBuckets.delete(key); } return toRemove.length; @@ -299,11 +248,13 @@ function cleanupTokenBuckets(maxIdleMs = 60000) { /** * Remove a specific token bucket (used when rate limit config is deleted) * - * @param {string} bucketName - Bucket name + * @param {string} resourceClass - "bucket" or "account" + * @param {string} resourceId - bucket name or account canonicalId + * @param {string} measure - measure id e.g. "rps" * @returns {boolean} True if bucket was found and removed */ -function removeTokenBucket(bucketName) { - return tokenBuckets.delete(bucketName); +function removeTokenBucket(resourceClass, resourceId, measure) { + return tokenBuckets.delete(`${resourceClass}:${resourceId}:${measure}`); } module.exports = { diff --git a/lib/api/bucketDeleteRateLimit.js b/lib/api/bucketDeleteRateLimit.js index f500463746..445a6dc2ed 100644 --- a/lib/api/bucketDeleteRateLimit.js +++ b/lib/api/bucketDeleteRateLimit.js @@ -53,7 +53,7 @@ function bucketDeleteRateLimit(authInfo, request, log, callback) { } // Invalidate cache and remove token bucket cache.deleteCachedConfig(cache.namespace.bucket, bucketName); - removeTokenBucket(bucketName); + removeTokenBucket('bucket', bucketName, 'rps'); log.debug('invalidated rate limit cache and token bucket for bucket', { bucketName }); // TODO: implement Utapi metric support return callback(null, corsHeaders); diff --git a/tests/unit/api/apiUtils/rateLimit/tokenBucket.js b/tests/unit/api/apiUtils/rateLimit/tokenBucket.js index fcb635416b..fdd490b294 100644 --- a/tests/unit/api/apiUtils/rateLimit/tokenBucket.js +++ b/tests/unit/api/apiUtils/rateLimit/tokenBucket.js @@ -18,20 +18,12 @@ describe('WorkerTokenBucket', () => { error: sinon.stub(), }; - // Stub config sandbox.stub(config, 'rateLimiting').value({ nodes: 1, - bucket: { - defaultConfig: { - requestsPerSecond: { - burstCapacity: 2, - }, - }, - }, + tokenBucketBufferSize: 50, + tokenBucketRefillThreshold: 20, }); - sandbox.stub(config, 'clusters').value(1); - // Clear token buckets map tokenBucket.getAllTokenBuckets().clear(); }); @@ -40,16 +32,16 @@ describe('WorkerTokenBucket', () => { }); describe('constructor', () => { - it('should initialize with default values', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + it('should initialize with correct values from config', () => { + const bucket = new tokenBucket.WorkerTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); - assert.strictEqual(bucket.bucketName, 'test-bucket'); + assert.strictEqual(bucket.resourceClass, 'bucket'); + assert.strictEqual(bucket.resourceId, 'test-bucket'); + assert.strictEqual(bucket.measure, 'rps'); assert.deepStrictEqual(bucket.limitConfig, { limit: 100 }); assert.strictEqual(bucket.bufferSize, 50); assert.strictEqual(bucket.refillThreshold, 20); assert.strictEqual(bucket.tokens, 50); // Starts with full buffer for fail-open - assert.strictEqual(bucket.refillInProgress, false); - assert.strictEqual(bucket.refillCount, 0); }); it('should use custom bufferSize from config.rateLimiting', () => { @@ -57,32 +49,24 @@ describe('WorkerTokenBucket', () => { sandbox.stub(config, 'rateLimiting').value({ nodes: 1, tokenBucketBufferSize: 100, - bucket: { - defaultConfig: { - requestsPerSecond: { burstCapacity: 2 }, - }, - }, + tokenBucketRefillThreshold: 20, }); - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = new tokenBucket.WorkerTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); assert.strictEqual(bucket.bufferSize, 100); - assert.strictEqual(bucket.tokens, 100); // tokens = bufferSize + assert.strictEqual(bucket.tokens, 100); }); it('should use custom refillThreshold from config.rateLimiting', () => { sandbox.restore(); sandbox.stub(config, 'rateLimiting').value({ nodes: 1, + tokenBucketBufferSize: 50, tokenBucketRefillThreshold: 30, - bucket: { - defaultConfig: { - requestsPerSecond: { burstCapacity: 2 }, - }, - }, }); - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = new tokenBucket.WorkerTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); assert.strictEqual(bucket.refillThreshold, 30); }); @@ -93,35 +77,19 @@ describe('WorkerTokenBucket', () => { nodes: 1, tokenBucketBufferSize: 75, tokenBucketRefillThreshold: 25, - bucket: { - defaultConfig: { - requestsPerSecond: { burstCapacity: 2 }, - }, - }, }); - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = new tokenBucket.WorkerTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); assert.strictEqual(bucket.bufferSize, 75); assert.strictEqual(bucket.refillThreshold, 25); - assert.strictEqual(bucket.tokens, 75); // tokens = bufferSize - }); - - it('should fallback to defaults when rateLimiting is undefined', () => { - sandbox.restore(); - sandbox.stub(config, 'rateLimiting').value(undefined); - - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); - - assert.strictEqual(bucket.bufferSize, 50); - assert.strictEqual(bucket.refillThreshold, 20); - assert.strictEqual(bucket.tokens, 50); + assert.strictEqual(bucket.tokens, 75); }); }); describe('tryConsume', () => { it('should consume token when available', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = new tokenBucket.WorkerTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); bucket.tokens = 10; const result = bucket.tryConsume(); @@ -131,7 +99,7 @@ describe('WorkerTokenBucket', () => { }); it('should return false when no tokens available', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = new tokenBucket.WorkerTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); bucket.tokens = 0; const result = bucket.tryConsume(); @@ -140,19 +108,8 @@ describe('WorkerTokenBucket', () => { assert.strictEqual(bucket.tokens, 0); }); - it('should record request timestamp', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); - bucket.tokens = 5; - - bucket.tryConsume(); - - assert.strictEqual(bucket.requestTimestamps.size, 1); - const firstValue = bucket.requestTimestamps.values().next().value; - assert(firstValue <= Date.now() * 1000); - }); - it('should handle multiple sequential consumptions', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = new tokenBucket.WorkerTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); bucket.tokens = 3; assert.strictEqual(bucket.tryConsume(), true); @@ -169,159 +126,89 @@ describe('WorkerTokenBucket', () => { }); }); - describe('recordRequest', () => { - it('should add timestamp to array', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); - const before = Date.now() * 1000; - - bucket.recordRequest(); + describe('hasCapacity', () => { + it('should return true when tokens are available', () => { + const bucket = new tokenBucket.WorkerTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); + bucket.tokens = 1; - assert.strictEqual(bucket.requestTimestamps.size, 1); - const firstValue = bucket.requestTimestamps.values().next().value; - assert(firstValue >= before); - assert(firstValue <= Date.now() * 1000); + assert.strictEqual(bucket.hasCapacity(), true); }); - it('should expire old timestamps beyond 1 second', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); - const now = Date.now() * 1000; - - // Add old timestamp (2 seconds ago) - bucket.requestTimestamps.add(now - 2000000); - - // Add recent timestamp - bucket.requestTimestamps.add(now - 500000); - - // Record new request - bucket.recordRequest(); + it('should return false when no tokens available', () => { + const bucket = new tokenBucket.WorkerTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); + bucket.tokens = 0; - // Old timestamp should be removed - assert.strictEqual(bucket.requestTimestamps.size, 2); - const firstValue = bucket.requestTimestamps.values().next().value; - assert(firstValue >= now - 1000000); + assert.strictEqual(bucket.hasCapacity(), false); }); }); - describe('getCurrentRate', () => { - it('should return 0 when no requests', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + describe('updateLimit', () => { + it('should update limitConfig and interval when limit changes', () => { + const bucket = new tokenBucket.WorkerTokenBucket( + 'bucket', 'test-bucket', 'rps', { limit: 100, burstCapacity: 1000 }, mockLog); + const oldInterval = bucket.interval; - assert.strictEqual(bucket.getCurrentRate(), 0); + const result = bucket.updateLimit({ limit: 200, burstCapacity: 1000 }); + + assert.strictEqual(result.updated, true); + assert.deepStrictEqual(result.oldConfig, { limit: 100, burstCapacity: 1000 }); + assert.strictEqual(bucket.limitConfig.limit, 200); + assert.notStrictEqual(bucket.interval, oldInterval); }); - it('should count requests in last second', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); - const now = Date.now() * 1000; + it('should update limitConfig when burstCapacity changes', () => { + const bucket = new tokenBucket.WorkerTokenBucket( + 'bucket', 'test-bucket', 'rps', { limit: 100, burstCapacity: 1000 }, mockLog); - // Add 5 requests in last second - bucket.requestTimestamps.add(now - 900000); - bucket.requestTimestamps.add(now - 700000); - bucket.requestTimestamps.add(now - 500000); - bucket.requestTimestamps.add(now - 300000); - bucket.requestTimestamps.add(now - 100000); + const result = bucket.updateLimit({ limit: 100, burstCapacity: 2000 }); - assert.strictEqual(bucket.getCurrentRate(), 5); + assert.strictEqual(result.updated, true); + assert.strictEqual(bucket.limitConfig.burstCapacity, 2000); }); - it('should exclude requests older than 1 second', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); - const now = Date.now() * 1000; - - // Add old requests (beyond 1 second) - bucket.requestTimestamps.add(now - 1500000); - bucket.requestTimestamps.add(now - 1200000); + it('should return updated: false when config is unchanged', () => { + const bucket = new tokenBucket.WorkerTokenBucket( + 'bucket', 'test-bucket', 'rps', { limit: 100, burstCapacity: 1000 }, mockLog); - // Add recent requests - bucket.requestTimestamps.add(now - 800000); - bucket.requestTimestamps.add(now - 400000); + const result = bucket.updateLimit({ limit: 100, burstCapacity: 1000 }); - assert.strictEqual(bucket.getCurrentRate(), 2); + assert.strictEqual(result.updated, false); }); }); describe('refillIfNeeded', () => { it('should skip refill when above threshold', async () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = new tokenBucket.WorkerTokenBucket( + 'bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); bucket.tokens = 30; // Above threshold of 20 - const refillSpy = sandbox.spy(bucket, 'refill'); await bucket.refillIfNeeded(); - assert.strictEqual(refillSpy.called, false); + // refillInProgress was never set (no refill attempted) + assert.ok(!bucket.refillInProgress); }); it('should skip refill when already in progress', async () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = new tokenBucket.WorkerTokenBucket( + 'bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); bucket.tokens = 10; // Below threshold bucket.refillInProgress = true; - const refillSpy = sandbox.spy(bucket, 'refill'); await bucket.refillIfNeeded(); - assert.strictEqual(refillSpy.called, false); + // Still true — function returned early without clearing it + assert.strictEqual(bucket.refillInProgress, true); }); it('should trigger refill when below threshold', async () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = new tokenBucket.WorkerTokenBucket( + 'bucket', 'test-bucket', 'rps', { limit: 100, burstCapacity: 1000 }, mockLog); bucket.tokens = 10; // Below threshold of 20 - // Stub refill to prevent actual Redis call - sandbox.stub(bucket, 'refill').resolves(); - await bucket.refillIfNeeded(); - assert.strictEqual(bucket.refill.calledOnce, true); - }); - }); - - describe('refill', () => { - it('should skip refill when buffer is already full', async () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); - bucket.tokens = 50; // Buffer at maximum (bufferSize = 50) - - // Mock refill to verify early return - const refillSpy = sandbox.spy(bucket, 'refill'); - - await bucket.refill(); - - // Verify refill was called but returned early (no Redis call) - assert.strictEqual(refillSpy.calledOnce, true); - - // Tokens should remain unchanged - assert.strictEqual(bucket.tokens, 50); - }); - - it('should not call Redis when requested amount is zero', async () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); - bucket.tokens = 51; // More than buffer size (edge case) - - await bucket.refill(); - - // Should complete without errors even though requested < 0 - assert(true); - }); - - // NOTE: Full refill() tests with actual Redis calls and token granting logic - // are better suited for functional/integration tests. These unit tests verify - // the early-return logic and buffer boundary conditions without requiring Redis. - }); - - describe('getStats', () => { - it('should return current bucket stats', () => { - const bucket = new tokenBucket.WorkerTokenBucket('test-bucket', { limit: 100 }, mockLog); - bucket.tokens = 25; - bucket.refillCount = 5; - bucket.requestTimestamps.add(Date.now() * 1000); - - const stats = bucket.getStats(); - - assert.strictEqual(stats.tokens, 25); - assert.strictEqual(stats.bufferSize, 50); - assert.strictEqual(stats.refillThreshold, 20); - assert.strictEqual(stats.refillInProgress, false); - assert.strictEqual(stats.refillCount, 5); - assert.strictEqual(stats.currentRate, 1); - assert(typeof stats.lastRefillTime === 'number'); + // refillInProgress is cleared in finally block regardless of outcome + assert.strictEqual(bucket.refillInProgress, false); }); }); }); @@ -340,10 +227,14 @@ describe('Token bucket management functions', () => { error: sinon.stub(), }; - // Load tokenBucket module (no need for Redis client in these tests) - tokenBucket = require('../../../../../lib/api/apiUtils/rateLimit/tokenBucket'); + const { config } = require('../../../../../lib/Config'); + sandbox.stub(config, 'rateLimiting').value({ + nodes: 1, + tokenBucketBufferSize: 50, + tokenBucketRefillThreshold: 20, + }); - // Clear token buckets + tokenBucket = require('../../../../../lib/api/apiUtils/rateLimit/tokenBucket'); tokenBucket.getAllTokenBuckets().clear(); }); @@ -353,49 +244,70 @@ describe('Token bucket management functions', () => { describe('getTokenBucket', () => { it('should create new bucket on first call', () => { - const bucket = tokenBucket.getTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); assert(bucket instanceof tokenBucket.WorkerTokenBucket); - assert.strictEqual(bucket.bucketName, 'test-bucket'); + assert.strictEqual(bucket.resourceClass, 'bucket'); + assert.strictEqual(bucket.resourceId, 'test-bucket'); + assert.strictEqual(bucket.measure, 'rps'); assert(mockLog.debug.calledOnce); assert(mockLog.debug.firstCall.args[0].includes('Created token bucket')); }); it('should return existing bucket on subsequent calls', () => { - const bucket1 = tokenBucket.getTokenBucket('test-bucket', { limit: 100 }, mockLog); - const bucket2 = tokenBucket.getTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket1 = tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); + const bucket2 = tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); assert.strictEqual(bucket1, bucket2); - assert.strictEqual(mockLog.debug.callCount, 1); // Only called once + assert.strictEqual(mockLog.debug.callCount, 1); + }); + + it('should create separate buckets for different resource IDs', () => { + const bucket1 = tokenBucket.getTokenBucket('bucket', 'bucket-1', 'rps', { limit: 100 }, mockLog); + const bucket2 = tokenBucket.getTokenBucket('bucket', 'bucket-2', 'rps', { limit: 100 }, mockLog); + + assert.notStrictEqual(bucket1, bucket2); + assert.strictEqual(bucket1.resourceId, 'bucket-1'); + assert.strictEqual(bucket2.resourceId, 'bucket-2'); + }); + + it('should create separate buckets for different measures', () => { + const bucket1 = tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); + const bucket2 = tokenBucket.getTokenBucket('bucket', 'test-bucket', 'bps', { limit: 100 }, mockLog); + + assert.notStrictEqual(bucket1, bucket2); + assert.strictEqual(bucket1.measure, 'rps'); + assert.strictEqual(bucket2.measure, 'bps'); }); - it('should create separate buckets for different bucket names', () => { - const bucket1 = tokenBucket.getTokenBucket('bucket-1', { limit: 100 }, mockLog); - const bucket2 = tokenBucket.getTokenBucket('bucket-2', { limit: 100 }, mockLog); + it('should create separate buckets for different resource classes', () => { + const bucket1 = tokenBucket.getTokenBucket('bucket', 'test', 'rps', { limit: 100 }, mockLog); + const bucket2 = tokenBucket.getTokenBucket('account', 'test', 'rps', { limit: 100 }, mockLog); assert.notStrictEqual(bucket1, bucket2); - assert.strictEqual(bucket1.bucketName, 'bucket-1'); - assert.strictEqual(bucket2.bucketName, 'bucket-2'); + assert.strictEqual(bucket1.resourceClass, 'bucket'); + assert.strictEqual(bucket2.resourceClass, 'account'); }); it('should update limitConfig when limit changes', () => { - const bucket1 = tokenBucket.getTokenBucket('test-bucket', { limit: 100, source: 'bucket' }, mockLog); + const bucket1 = tokenBucket.getTokenBucket( + 'bucket', 'test-bucket', 'rps', { limit: 100, source: 'bucket' }, mockLog); assert.strictEqual(bucket1.limitConfig.limit, 100); - // Simulate rate limit change - const bucket2 = tokenBucket.getTokenBucket('test-bucket', { limit: 200, source: 'bucket' }, mockLog); + const bucket2 = tokenBucket.getTokenBucket( + 'bucket', 'test-bucket', 'rps', { limit: 200, source: 'bucket' }, mockLog); - assert.strictEqual(bucket1, bucket2); // Same bucket instance - assert.strictEqual(bucket2.limitConfig.limit, 200); // Limit updated + assert.strictEqual(bucket1, bucket2); + assert.strictEqual(bucket2.limitConfig.limit, 200); assert(mockLog.info.calledOnce); assert(mockLog.info.firstCall.args[0].includes('Updated token bucket limit config')); }); it('should not log update when limit is unchanged', () => { - tokenBucket.getTokenBucket('test-bucket', { limit: 100 }, mockLog); + tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); mockLog.info.resetHistory(); - tokenBucket.getTokenBucket('test-bucket', { limit: 100 }, mockLog); + tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); assert.strictEqual(mockLog.info.called, false); }); @@ -403,17 +315,17 @@ describe('Token bucket management functions', () => { describe('removeTokenBucket', () => { it('should remove existing bucket and return true', () => { - tokenBucket.getTokenBucket('test-bucket', { limit: 100 }, mockLog); + tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); assert.strictEqual(tokenBucket.getAllTokenBuckets().size, 1); - const result = tokenBucket.removeTokenBucket('test-bucket'); + const result = tokenBucket.removeTokenBucket('bucket', 'test-bucket', 'rps'); assert.strictEqual(result, true); assert.strictEqual(tokenBucket.getAllTokenBuckets().size, 0); }); it('should return false when bucket does not exist', () => { - const result = tokenBucket.removeTokenBucket('non-existent-bucket'); + const result = tokenBucket.removeTokenBucket('bucket', 'non-existent-bucket', 'rps'); assert.strictEqual(result, false); }); @@ -428,46 +340,43 @@ describe('Token bucket management functions', () => { }); it('should return all created buckets', () => { - tokenBucket.getTokenBucket('bucket-1', { limit: 100 }, mockLog); - tokenBucket.getTokenBucket('bucket-2', { limit: 200 }, mockLog); - tokenBucket.getTokenBucket('bucket-3', { limit: 300 }, mockLog); + tokenBucket.getTokenBucket('bucket', 'bucket-1', 'rps', { limit: 100 }, mockLog); + tokenBucket.getTokenBucket('bucket', 'bucket-2', 'rps', { limit: 200 }, mockLog); + tokenBucket.getTokenBucket('bucket', 'bucket-3', 'rps', { limit: 300 }, mockLog); const buckets = tokenBucket.getAllTokenBuckets(); assert.strictEqual(buckets.size, 3); - assert(buckets.has('bucket-1')); - assert(buckets.has('bucket-2')); - assert(buckets.has('bucket-3')); + assert(buckets.has('bucket:bucket-1:rps')); + assert(buckets.has('bucket:bucket-2:rps')); + assert(buckets.has('bucket:bucket-3:rps')); }); }); describe('cleanupTokenBuckets', () => { it('should remove idle buckets with no tokens', () => { - const bucket1 = tokenBucket.getTokenBucket('bucket-1', { limit: 100 }, mockLog); - const bucket2 = tokenBucket.getTokenBucket('bucket-2', { limit: 200 }, mockLog); + const bucket1 = tokenBucket.getTokenBucket('bucket', 'bucket-1', 'rps', { limit: 100 }, mockLog); + const bucket2 = tokenBucket.getTokenBucket('bucket', 'bucket-2', 'rps', { limit: 200 }, mockLog); - // Make bucket-1 idle (old lastRefillTime, no tokens) - bucket1.lastRefillTime = Date.now() - 120000; // 2 minutes ago + bucket1.lastRefillTime = Date.now() - 120000; bucket1.tokens = 0; - // Make bucket-2 active bucket2.lastRefillTime = Date.now(); bucket2.tokens = 10; - const removed = tokenBucket.cleanupTokenBuckets(60000); // 60s threshold + const removed = tokenBucket.cleanupTokenBuckets(60000); assert.strictEqual(removed, 1); assert.strictEqual(tokenBucket.getAllTokenBuckets().size, 1); - assert(tokenBucket.getAllTokenBuckets().has('bucket-2')); - assert(!tokenBucket.getAllTokenBuckets().has('bucket-1')); + assert(tokenBucket.getAllTokenBuckets().has('bucket:bucket-2:rps')); + assert(!tokenBucket.getAllTokenBuckets().has('bucket:bucket-1:rps')); }); it('should not remove idle buckets with tokens', () => { - const bucket = tokenBucket.getTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); - // Make bucket idle but with tokens bucket.lastRefillTime = Date.now() - 120000; - bucket.tokens = 10; // Has tokens + bucket.tokens = 10; const removed = tokenBucket.cleanupTokenBuckets(60000); @@ -476,10 +385,9 @@ describe('Token bucket management functions', () => { }); it('should not remove recently active buckets', () => { - const bucket = tokenBucket.getTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); - // Recently active - bucket.lastRefillTime = Date.now() - 30000; // 30s ago + bucket.lastRefillTime = Date.now() - 30000; bucket.tokens = 0; const removed = tokenBucket.cleanupTokenBuckets(60000); @@ -489,13 +397,12 @@ describe('Token bucket management functions', () => { }); it('should use default maxIdleMs if not provided', () => { - const bucket = tokenBucket.getTokenBucket('test-bucket', { limit: 100 }, mockLog); + const bucket = tokenBucket.getTokenBucket('bucket', 'test-bucket', 'rps', { limit: 100 }, mockLog); - // Idle beyond default (60s) bucket.lastRefillTime = Date.now() - 70000; bucket.tokens = 0; - const removed = tokenBucket.cleanupTokenBuckets(); // No argument + const removed = tokenBucket.cleanupTokenBuckets(); assert.strictEqual(removed, 1); }); @@ -507,24 +414,22 @@ describe('Token bucket management functions', () => { }); it('should remove multiple expired buckets', () => { - const bucket1 = tokenBucket.getTokenBucket('bucket-1', { limit: 100 }, mockLog); - const bucket2 = tokenBucket.getTokenBucket('bucket-2', { limit: 200 }, mockLog); - const bucket3 = tokenBucket.getTokenBucket('bucket-3', { limit: 300 }, mockLog); + const bucket1 = tokenBucket.getTokenBucket('bucket', 'bucket-1', 'rps', { limit: 100 }, mockLog); + const bucket2 = tokenBucket.getTokenBucket('bucket', 'bucket-2', 'rps', { limit: 200 }, mockLog); + const bucket3 = tokenBucket.getTokenBucket('bucket', 'bucket-3', 'rps', { limit: 300 }, mockLog); - // Make bucket-1 and bucket-2 idle bucket1.lastRefillTime = Date.now() - 120000; bucket1.tokens = 0; bucket2.lastRefillTime = Date.now() - 120000; bucket2.tokens = 0; - // Keep bucket-3 active bucket3.lastRefillTime = Date.now(); const removed = tokenBucket.cleanupTokenBuckets(60000); assert.strictEqual(removed, 2); assert.strictEqual(tokenBucket.getAllTokenBuckets().size, 1); - assert(tokenBucket.getAllTokenBuckets().has('bucket-3')); + assert(tokenBucket.getAllTokenBuckets().has('bucket:bucket-3:rps')); }); }); });