Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,7 @@ class Config extends EventEmitter {
assert(this.localCache, 'localCache must be defined when rate limiting is enabled');

// Parse and validate all rate limiting configuration
this.rateLimiting = parseRateLimitConfig(config.rateLimiting, this.clusters = this.clusters || 1);
this.rateLimiting = parseRateLimitConfig(config.rateLimiting);
}


Expand Down
8 changes: 5 additions & 3 deletions lib/api/apiUtils/rateLimit/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ class RateLimitClient {
*
* Used by token reservation system to request capacity in advance.
*
* @param {string} bucketName - Bucket name
* @param {string} resourceClass - Resource class name e.g. 'account' or 'bucket'
* @param {string} resourceId - Unique resource ID e.g. bucket name or account ID
* @param {string} measure - ID of rate limit measure e.g. 'rps'
* @param {number} requested - Number of tokens requested
* @param {number} interval - Interval per request in ms
* @param {number} burstCapacity - Burst capacity in ms
* @param {RateLimitClient~grantTokens} cb - Callback
*/
grantTokens(bucketName, requested, interval, burstCapacity, cb) {
const key = `ratelimit:bucket:${bucketName}:rps:emptyAt`;
grantTokens(resourceClass, resourceId, measure, requested, interval, burstCapacity, cb) {
const key = `ratelimit:${resourceClass}:${resourceId}:${measure}:emptyAt`;
const now = Date.now();

this.redis.grantTokens(
Expand Down
25 changes: 11 additions & 14 deletions lib/api/apiUtils/rateLimit/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,10 @@ const rateLimitConfigSchema = Joi.object({
*
* @param {string} resourceClass - Rate limit class name ('bucket' or 'account')
* @param {object} validatedCfg - Already validated config from Joi
* @param {number} clusters - Number of worker processes spawned per instance
* @param {number} nodes - Number of instances that requests will be load balanced across
* @returns {RateLimitClassConfig} Transformed rate limit config
*/
function transformClassConfig(resourceClass, validatedCfg, clusters, nodes) {
function transformClassConfig(resourceClass, validatedCfg, nodes) {
const transformed = {
defaultConfig: undefined,
configCacheTTL: validatedCfg.configCacheTTL,
Expand All @@ -213,23 +212,22 @@ function transformClassConfig(resourceClass, validatedCfg, clusters, nodes) {
if (validatedCfg.defaultConfig?.requestsPerSecond) {
const { limit, burstCapacity } = validatedCfg.defaultConfig.requestsPerSecond;

// Validate limit against nodes AND workers (business rule)
const minLimit = nodes * clusters;
if (limit > 0 && limit < minLimit) {
// Validate limit against nodes (business rule)
if (limit > 0 && limit < nodes) {
throw new Error(
`rateLimiting.${resourceClass}.defaultConfig.` +
`requestsPerSecond.limit (${limit}) must be >= ` +
`(nodes x workers = ${nodes} x ${clusters} = ${minLimit}) ` +
'or 0 (unlimited). Each worker enforces limit/nodes/workers locally. ' +
`With limit < ${minLimit}, per-worker rate would be < 1 req/s, effectively blocking traffic.`
`nodes (${nodes}) ` +
'or 0 (unlimited). Each node enforces limit/nodes locally. ' +
`With limit < ${nodes}, per-node rate would be < 1 req/s, effectively blocking traffic.`
);
}

// Use provided burstCapacity or fall back to default
const effectiveBurstCapacity = burstCapacity || transformed.defaultBurstCapacity;

// Calculate per-worker interval using distributed architecture
const interval = calculateInterval(limit, nodes, clusters);
// Calculate per-node interval using distributed architecture
const interval = calculateInterval(limit, nodes);

// Store both the original limit and the calculated values
transformed.defaultConfig = {
Expand All @@ -248,11 +246,10 @@ function transformClassConfig(resourceClass, validatedCfg, clusters, nodes) {
* Parse and validate the complete rate limiting configuration
*
* @param {Object} rateLimitingConfig - config.rateLimiting object from config.json
* @param {number} clusters - Number of worker clusters (must be numeric)
* @returns {Object} Fully parsed and validated rate limiting configuration
* @throws {Error} If configuration is invalid
*/
function parseRateLimitConfig(rateLimitingConfig, clusters) {
function parseRateLimitConfig(rateLimitingConfig) {
// Validate configuration using Joi schema
const { error: validationError, value: validated } = rateLimitConfigSchema.validate(
rateLimitingConfig,
Expand Down Expand Up @@ -282,8 +279,8 @@ function parseRateLimitConfig(rateLimitingConfig, clusters) {
),
};

parsed.bucket = transformClassConfig('bucket', validated.bucket, clusters, parsed.nodes);
parsed.account = transformClassConfig('account', validated.account, clusters, parsed.nodes);
parsed.bucket = transformClassConfig('bucket', validated.bucket, parsed.nodes);
parsed.account = transformClassConfig('account', validated.account, parsed.nodes);

return parsed;
}
Expand Down
29 changes: 13 additions & 16 deletions lib/api/apiUtils/rateLimit/gcra.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
/**
* Calculate per-worker interval based on distributed architecture
* Calculate per-node interval based on distributed architecture
*
* In a distributed setup with N nodes and W workers per node:
* In a distributed setup with N nodes:
* - Global limit: R requests per second
* - Per-worker limit: R / N / W
* - Interval = 1000ms / (R / N / W)
* - Per-node limit: R / N
* - Interval = 1000ms / (R / N)
*
* The interval represents milliseconds between requests. We divide 1000 (milliseconds
* in a second) by the rate to convert "requests per second" to "milliseconds per request".
*
* Examples:
* - 100 req/s ÷ 1 node ÷ 10 workers = 10 req/s per worker → interval = 100ms
* - 600 req/s ÷ 6 nodes ÷ 10 workers = 10 req/s per worker → interval = 100ms
* - 100 req/s ÷ 1 node = 100 req/s per node → interval = 100ms
* - 600 req/s ÷ 6 nodes = 100 req/s per node → interval = 100ms
*
* Dynamic work-stealing is achieved through Redis sync reconciliation:
* - Each worker evaluates locally at its fixed per-worker quota
* - Workers report consumed / workers to Redis
* - Redis sums all workers' shares
* - Workers overwrite local counters with Redis values
* - Each worker evaluates locally using preallocated tokens
* - Workers report processed requests to Redis
* - Redis sums all workers' requests
* - Idle workers' unused capacity accumulates in Redis
* - Busy workers pull back higher emptyAt values and throttle proportionally
*
* IMPORTANT: Limit must be >= N * W, otherwise per-worker rate < 1 req/s
* IMPORTANT: Limit must be >= N, otherwise per-node rate < 1 req/s
* which results in intervals > 1000ms and effectively blocks traffic.
*
* @param {number} limit - Global requests per second
* @param {number} nodes - Total number of nodes
* @param {number} _workers - Number of workers per node (unused in token reservation)
* @returns {number} Interval in milliseconds between requests
*/
// eslint-disable-next-line no-unused-vars
function calculateInterval(limit, nodes, _workers) {
// Per-node rate = limit / nodes (workers NOT divided)
// This allows dynamic work-stealing - workers evaluate at node quota

function calculateInterval(limit, nodes) {
// Per-node rate = limit / nodes
const perNodeRate = limit / nodes;

// Interval = 1000ms / rate
Expand Down
Loading
Loading