diff --git a/CHANGELOG.md b/CHANGELOG.md index 7456359..5c8f7ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Method truncate() to clean tubes (#78). +- `release_limit` configuration option: a taken task is removed from the queue + if the task has been released (by `release` or `ttr`) `release_limit` times (#8). + +- `release_limit_policy` configuration option: control the behavior when a task + reached the `release_limit` (#9). ### Changed diff --git a/README.md b/README.md index f305038..3ef7b23 100644 --- a/README.md +++ b/README.md @@ -293,5 +293,86 @@ installed and the feature is not disabled by the configuration. * You can not create or drop tubes by API calls with Tarantool 3. You need to update the role configuration instead. +* The `sharded-queue` drivers has additional configuration option `release_limit`. The option is an integer number: + + * `release_limit` == nil or `release_limit` <= 0 - a driver works as before. + * `release_limit` > 0 - a taken task is removed from the queue if the task has been released (by `release` or `ttr`) `release_limit` times. + + Example: + + ```lua + conn:call('queue.create_tube', { mytube, { + release_limit = 2, -- Delete task after 2nd release. + }}) + ``` + +* The `sharded-queue` drivers has additional configuration option `release_limit_policy` to control the behavior + when a task reached the `release_limit`: + + * `release_limit_policy` = `nil` or `DELETE` - the task is deleted. + * `release_limit_policy` = `DLQ` - the task is moved to the dead letter queue and deleted from main. + Dead letter queue is a copy of the main queue with a name suffix `_dlq`. + + Example: + + ```lua + conn:call('queue.create_tube', { mytube, { + release_limit = 2, + release_limit_policy = 'DLQ', -- A task will be moved to DLQ after 2nd release. + }}) + ``` + + ```yaml + groups: + storages: + roles: ['roles.sharded-queue-storage'] + roles_cfg: + roles.sharded-queue-storage: + cfg: + metrics: true + tubes: + mytube: + driver: sharded_queue.drivers.fifottl + release_limit: 2 + release_limit_policy: 'DLQ' + ``` + + Usage example: + + ```lua + tarantool@user:~/sharded_queue$ tarantool + Tarantool 1.10.3-6-gfbf53b9 + type 'help' for interactive help + tarantool> netbox = require('net.box') + --- + ... + tarantool> queue_conn = netbox.connect('localhost:3301', {user = 'admin',password = 'secret-cluster-cookie'}) + --- + ... + tarantool> queue_conn:call('queue.create_tube', { 'test_tube', { + release_limit = 1, + release_limit_policy = 'DLQ', -- A task will be moved to DLQ after first release. + }}) + --- + - {'wait_factor': 2, 'release_limit_policy': 'DLQ', 'log_request': false, 'tube_name': 'test_tube'} + ... + tarantool> queue_conn:call('queue.tube.test_tube:put', { 'task_1' }) + --- + - [3653, 'r', 'task_1'] + ... + tarantool> queue_conn:call('queue.tube.test_tube:take') + --- + - [3653, 't', 'task_1'] + ... + tarantool> queue_conn:call('queue.tube.test_tube:release', { 3653 }) -- task was erased from 'test_tube'. + --- + - [3653, '-', 'task_1'] + ... + tarantool> queue_conn:call('queue.tube.test_tube_dlq:take') -- task is now in 'DLQ' tube. + --- + - [3653, 't', 'task_1'] + ... + ``` + [metrics-summary]: https://www.tarantool.io/en/doc/latest/book/monitoring/api_reference/#summary [queue-statistics]: https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics diff --git a/roles/sharded-queue-router.lua b/roles/sharded-queue-router.lua index 1be651e..7eced11 100644 --- a/roles/sharded-queue-router.lua +++ b/roles/sharded-queue-router.lua @@ -6,6 +6,7 @@ local metrics = require('sharded_queue.router.metrics') local roles = require('sharded_queue.roles') local utils = require('sharded_queue.utils') local queue = require('sharded_queue.router.queue') +local consts = require('sharded_queue.consts') local role_name = roles.get_tarantool_role_name('router') local storage_role_name = roles.get_tarantool_role_name('storage') @@ -52,6 +53,11 @@ local function validate(conf) if not ok then error(role_name .. ": " .. err) end + + ok, err = utils.validate_dlq(tubes) + if not ok then + error(role_name .. ": " .. err) + end return true end @@ -71,12 +77,28 @@ local function apply(conf) for tube_name, options in pairs(conf_tubes) do if queue.map()[tube_name] == nil then queue.add(tube_name, metrics, options) + if options.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then + local dlq_options = table.deepcopy(options or {}) + dlq_options.release_limit_policy = nil + dlq_options.release_limit = -1 + queue.add(tube_name .. consts.DLQ_SUFFIX, metrics, dlq_options) + end end end for tube_name, _ in pairs(queue.map()) do if conf_tubes[tube_name] == nil then - queue.remove(tube_name) + -- Dead letter queue is not in config, + -- so instead we check that main queue is in it. + local remove = true + local tube = conf_tubes[tube_name:sub(1, -(#consts.DLQ_SUFFIX+1))] + if tube_name:sub(-#consts.DLQ_SUFFIX) == consts.DLQ_SUFFIX and tube + and tube.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then + remove = false + end + if remove then + queue.remove(tube_name) + end end end diff --git a/roles/sharded-queue-storage.lua b/roles/sharded-queue-storage.lua index 09840d5..ba43a6b 100644 --- a/roles/sharded-queue-storage.lua +++ b/roles/sharded-queue-storage.lua @@ -27,6 +27,11 @@ local function validate(conf) if not ok then error(role_name .. ": " .. err) end + + ok, err = utils.validate_dlq(conf.tubes or {}) + if not ok then + error(role_name .. ": " .. err) + end return true end diff --git a/sharded-queue-scm-1.rockspec b/sharded-queue-scm-1.rockspec index a87bb09..3abf1bb 100755 --- a/sharded-queue-scm-1.rockspec +++ b/sharded-queue-scm-1.rockspec @@ -32,6 +32,7 @@ build = { ['sharded_queue.roles'] = 'sharded_queue/roles.lua', ['sharded_queue.stash'] = 'sharded_queue/stash.lua', ['sharded_queue.state'] = 'sharded_queue/state.lua', + ['sharded_queue.consts'] = 'sharded_queue/consts.lua', ['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua', ['sharded_queue.router.config'] = 'sharded_queue/router/config.lua', ['sharded_queue.router.metrics'] = 'sharded_queue/router/metrics.lua', diff --git a/sharded_queue/api.lua b/sharded_queue/api.lua index dfd6c13..5ec3a2e 100644 --- a/sharded_queue/api.lua +++ b/sharded_queue/api.lua @@ -4,6 +4,7 @@ local config = require('sharded_queue.router.config') local metrics = require('sharded_queue.router.metrics') local utils = require('sharded_queue.utils') local queue = require('sharded_queue.router.queue') +local consts = require('sharded_queue.consts') function cfg_call(_, options) options = options or {} @@ -38,6 +39,12 @@ local function validate_config(cfg) if not ok then return ok, err end + + ok, err = utils.validate_dlq(cfg_tubes or {}) + if not ok then + return ok, err + end + return utils.validate_cfg(cfg_tubes['cfg']) end @@ -51,12 +58,25 @@ local function apply_config(cfg, opts) end elseif queue.map()[tube_name] == nil then queue.add(tube_name, metrics, options) + if options.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then + local dlq_options = table.deepcopy(options or {}) + dlq_options.release_limit_policy = nil + dlq_options.release_limit = -1 + queue.add(tube_name .. consts.DLQ_SUFFIX, metrics, dlq_options) + end end end -- try drop tubes -- for tube_name, _ in pairs(queue.map()) do if tube_name ~= 'cfg' and cfg_tubes[tube_name] == nil then + -- Dead letter queue is not in config, + -- so instead we check that main queue is in it. + local tube = cfg_tubes[tube_name:sub(1, -(#consts.DLQ_SUFFIX+1))] + if tube_name:sub(-#consts.DLQ_SUFFIX) == consts.DLQ_SUFFIX and tube + and tube.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then + break + end queue.remove(tube_name) end end diff --git a/sharded_queue/consts.lua b/sharded_queue/consts.lua new file mode 100644 index 0000000..8b72699 --- /dev/null +++ b/sharded_queue/consts.lua @@ -0,0 +1,8 @@ +return { + RELEASE_LIMIT_POLICY = { + DELETE = 'DELETE', + DLQ = 'DLQ', + }, + + DLQ_SUFFIX = '_dlq', +} diff --git a/sharded_queue/dlq.lua b/sharded_queue/dlq.lua new file mode 100644 index 0000000..87cf515 --- /dev/null +++ b/sharded_queue/dlq.lua @@ -0,0 +1,5 @@ +return { + DELETE = 'DELETE', + DLQ = 'DLQ', + DLQ_SUFFIX = '_dlq' +} diff --git a/sharded_queue/drivers/fifo.lua b/sharded_queue/drivers/fifo.lua index 1afd706..4bacc52 100644 --- a/sharded_queue/drivers/fifo.lua +++ b/sharded_queue/drivers/fifo.lua @@ -3,11 +3,21 @@ local utils = require('sharded_queue.utils') local log = require('log') -- luacheck: ignore local stats = require('sharded_queue.stats.storage') local vshard_utils = require('sharded_queue.storage.vshard_utils') +local consts = require('sharded_queue.consts') local function update_stat(tube_name, name) stats.update(tube_name, name, '+', 1) end +local index = { + task_id = 1, + bucket_id = 2, + status = 3, + data = 4, + index = 5, + release_count = 6, +} + local method = {} local function tube_create(args) @@ -22,7 +32,8 @@ local function tube_create(args) { name = 'bucket_id', type = 'unsigned' }, { name = 'status', type = 'string' }, { name = 'data', type = '*' }, - { name = 'index', type = 'unsigned' } + { name = 'index', type = 'unsigned' }, + { name = 'release_count', type = 'unsigned' } } local space = box.schema.create_space(args.name, space_options) @@ -77,22 +88,43 @@ local function normalize_task(task) return { task.task_id, task.status, task.data } end --- put task in space -function method.put(args) - local task = utils.atomic(function() - local idx = get_index(args) - local task_id = utils.pack_task_id( +local function put_in_tube(args, is_dlq) + local idx = get_index(args) + + if is_dlq == false then + args.task_id = utils.pack_task_id( args.bucket_id, args.bucket_count, idx) + end - return get_space(args):insert { - task_id, - args.bucket_id, - state.READY, - args.data, - idx - } + return get_space(args):insert { + args.task_id, + args.bucket_id, + state.READY, + args.data, + idx, + 0 + } +end + +local function put_in_dlq(args, task) + -- setup dead letter queue args + local dlq_args = {} + dlq_args.tube_name = args.tube_name .. consts.DLQ_SUFFIX + dlq_args.data = task.data + dlq_args.bucket_id = task.bucket_id + dlq_args.task_id = task.task_id + + put_in_tube(dlq_args, true) + + update_stat(dlq_args.tube_name, 'put') +end + +-- put task in space +function method.put(args) + local task = utils.atomic(function() + return put_in_tube(args, false) end) update_stat(args.tube_name, 'put') @@ -103,10 +135,10 @@ end function method.take(args) local task = utils.atomic(function() local task = get_space(args).index.status:min { state.READY } - if task == nil or task[3] ~= state.READY then + if task == nil or task[index.status] ~= state.READY then return end - return get_space(args):update(task.task_id, { { '=', 3, state.TAKEN } }) + return get_space(args):update(task.task_id, { { '=', index.status, state.TAKEN } }) end) if task == nil then return end @@ -147,14 +179,43 @@ end -- release task function method.release(args) - local task = get_space(args):update(args.task_id, { { '=', 3, state.READY } }) + local release_limit = args.options.release_limit or -1 + local deleted = false + local release_limit_policy = + args.options.release_limit_policy or consts.RELEASE_LIMIT_POLICY.DELETE + + local task = utils.atomic(function() + local task = get_space(args):update(args.task_id, { + { '=', index.status, state.READY }, + { '+', index.release_count, 1 }, + }) + if task ~= nil and release_limit > 0 then + if task.release_count >= release_limit then + get_space(args):delete(args.task_id) + if release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then + put_in_dlq(args, task) + end + deleted = true + end + end + return task + end) + update_stat(args.tube_name, 'release') + + if deleted then + task = task:tomap() + task.status = state.DONE + update_stat(args.tube_name, 'delete') + update_stat(args.tube_name, 'done') + end + return normalize_task(task) end -- bury task function method.bury(args) - local task = get_space(args):update(args.task_id, { { '=', 3, state.BURIED } }) + local task = get_space(args):update(args.task_id, { { '=', index.status, state.BURIED } }) update_stat(args.tube_name, 'bury') return normalize_task(task) end @@ -166,11 +227,11 @@ function method.kick(args) if task == nil then return i - 1 end - if task[2] ~= state.BURIED then + if task[index.status] ~= state.BURIED then return i - 1 end - task = get_space(args):update(task[1], { { '=', 3, state.READY } }) + task = get_space(args):update(task[index.task_id], { { '=', index.status, state.READY } }) update_stat(args.tube_name, 'kick') end return args.count diff --git a/sharded_queue/drivers/fifottl.lua b/sharded_queue/drivers/fifottl.lua index c148d09..cb1b537 100644 --- a/sharded_queue/drivers/fifottl.lua +++ b/sharded_queue/drivers/fifottl.lua @@ -4,19 +4,21 @@ local utils = require('sharded_queue.utils') local stats = require('sharded_queue.stats.storage') local time = require('sharded_queue.time') local vshard_utils = require('sharded_queue.storage.vshard_utils') +local consts = require('sharded_queue.consts') local log = require('log') -- luacheck: ignore local index = { - task_id = 1, - bucket_id = 2, - status = 3, - created = 4, - priority = 5, - ttl = 6, - ttr = 7, - next_event = 8, - data = 9, - index = 10 + task_id = 1, + bucket_id = 2, + status = 3, + created = 4, + priority = 5, + ttl = 6, + ttr = 7, + next_event = 8, + data = 9, + index = 10, + release_count = 11, } local function update_stat(tube_name, name) @@ -49,10 +51,84 @@ local function wc_wait(tube_name, time) end end +local function get_index(tube_name, bucket_id) + local task = box.space[tube_name].index.idx:max { bucket_id } + if not task or task[index.bucket_id] ~= bucket_id then + return 1 + else + return task[index.index] + 1 + end +end + +local function put_in_tube(args, is_dlq, delay, ttl, ttr, priority) + local idx = get_index(args.tube_name, args.bucket_id) + + local next_event = time.event(ttl) + local status = state.READY + if is_dlq == false then + args.task_id = utils.pack_task_id( + args.bucket_id, + args.bucket_count, + idx) + if delay > 0 then + status = state.DELAYED + ttl = ttl + delay + next_event = time.event(delay) + else + next_event = time.event(ttl) + end + end + + return box.space[args.tube_name]:insert { + args.task_id, -- task_id + args.bucket_id, -- bucket_id + status, -- state + time.cur(), -- created + priority, -- priority + time.nano(ttl), -- ttl + time.nano(ttr), -- ttr + next_event, -- next_event + args.data, -- data + idx, -- index + 0 -- release_count + } +end + +local function put_in_dlq(options, task, tube_name, extra) + -- setup dead letter queue args + local dlq_args = {} + dlq_args.tube_name = tube_name .. consts.DLQ_SUFFIX + dlq_args.data = task.data + dlq_args.bucket_id = task.bucket_id + dlq_args.task_id = task.task_id + dlq_args.options = table.deepcopy(options or {}) + dlq_args.options.release_limit = -1 + dlq_args.options.release_limit_policy = consts.RELEASE_LIMIT_POLICY.DELETE + dlq_args.extra = table.deepcopy(extra or {}) + + -- setup params -- + + local ttl = dlq_args.options.ttl or time.MAX_TIMEOUT + local ttr = dlq_args.options.ttr or ttl + local priority = dlq_args.options.priority or 0 + + put_in_tube(dlq_args, true, nil, ttl, ttr, priority) + + if dlq_args.extra and dlq_args.extra.log_request then + log_operation("put", task) + end + + update_stat(dlq_args.tube_name, 'put') + wc_signal(dlq_args.tube_name) +end + -- FIBERS METHODS -- -local function fiber_iteration(tube_name, processed) +local function fiber_iteration(tube_name, processed, options) local cur = time.cur() local estimated = time.MAX_TIMEOUT + local release_limit = options.release_limit or -1 + local release_limit_policy = + options.release_limit_policy or consts.RELEASE_LIMIT_POLICY.DELETE -- delayed tasks @@ -93,10 +169,20 @@ local function fiber_iteration(tube_name, processed) task = box.space[tube_name].index.watch:min { state.TAKEN } if task and task[index.status] == state.TAKEN then if cur >= task[index.next_event] then - box.space[tube_name]:update(task[index.task_id], { - { '=', index.status, state.READY }, - { '=', index.next_event, task[index.created] + task[index.ttl] } - }) + if release_limit > 0 and task.release_count + 1 >= release_limit then + box.space[tube_name]:delete(task[index.task_id]) + update_stat(tube_name, 'delete') + update_stat(tube_name, 'done') + if release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then + put_in_dlq(options, task, tube_name) + end + else + box.space[tube_name]:update(task[index.task_id], { + { '=', index.status, state.READY }, + { '=', index.next_event, task[index.created] + task[index.ttl] }, + { '+', index.release_count, 1} + }) + end estimated = 0 processed = processed + 1 else @@ -114,7 +200,7 @@ local function fiber_iteration(tube_name, processed) return processed end -local function fiber_common(tube_name) +local function fiber_common(tube_name, options) fiber.name(tube_name) wait_cond_map[tube_name] = fiber.cond() @@ -122,7 +208,7 @@ local function fiber_common(tube_name) while true do if not box.cfg.read_only then - local ok, ret = pcall(fiber_iteration, tube_name, processed) + local ok, ret = pcall(fiber_iteration, tube_name, processed, options) if not ok and not (ret.code == box.error.READONLY) then return 1 elseif ok then @@ -144,16 +230,17 @@ local function tube_create(args) space_options.temporary = args.options.temporary or false space_options.engine = args.options.engine or 'memtx' space_options.format = { - { 'task_id', 'unsigned' }, - { 'bucket_id', 'unsigned' }, - { 'status', 'string' }, - { 'created', 'unsigned' }, - { 'priority', 'unsigned' }, - { 'ttl', 'unsigned' }, - { 'ttr', 'unsigned' }, - { 'next_event', 'unsigned' }, - { 'data', '*' }, - { 'index', 'unsigned' } + { 'task_id', 'unsigned' }, + { 'bucket_id', 'unsigned' }, + { 'status', 'string' }, + { 'created', 'unsigned' }, + { 'priority', 'unsigned' }, + { 'ttl', 'unsigned' }, + { 'ttr', 'unsigned' }, + { 'next_event', 'unsigned' }, + { 'data', '*' }, + { 'index', 'unsigned' }, + { 'release_count', 'unsigned'} } local space = box.schema.space.create(args.name, space_options) @@ -207,22 +294,13 @@ local function tube_create(args) {if_not_exists = true}) -- run fiber for tracking event - fiber.create(fiber_common, args.name) + fiber.create(fiber_common, args.name, args.options) end local function tube_drop(tube_name) box.space[tube_name]:drop() end -local function get_index(tube_name, bucket_id) - local task = box.space[tube_name].index.idx:max { bucket_id } - if not task or task[index.bucket_id] ~= bucket_id then - return 1 - else - return task[index.index] + 1 - end -end - local function normalize_task(task) if task == nil then return nil end return { task.task_id, task.status, task.data } @@ -239,35 +317,7 @@ function method.put(args) local priority = args.priority or args.options.priority or 0 local task = utils.atomic(function() - local idx = get_index(args.tube_name, args.bucket_id) - - local next_event - local task_id = utils.pack_task_id( - args.bucket_id, - args.bucket_count, - idx) - - local status = state.READY - if delay > 0 then - status = state.DELAYED - ttl = ttl + delay - next_event = time.event(delay) - else - next_event = time.event(ttl) - end - - return box.space[args.tube_name]:insert { - task_id, -- task_id - args.bucket_id, -- bucket_id - status, -- state - time.cur(), -- created - priority, -- priority - time.nano(ttl), -- ttl - time.nano(ttr), -- ttr - next_event, -- next_event - args.data, -- data - idx -- index - } + return put_in_tube(args, false, delay, ttl, ttr, priority) end) if args.extra and args.extra.log_request then @@ -397,23 +447,56 @@ function method.peek(args) end function method.release(args) + local release_limit = args.options.release_limit or -1 + local release_limit_policy = + args.options.release_limit_policy or consts.RELEASE_LIMIT_POLICY.DELETE + local deleted = false + local updated = false + local task = utils.atomic(function() local task = box.space[args.tube_name]:get(args.task_id) - if task ~= nil then + if task == nil then + return nil + end + + if release_limit > 0 and task.release_count + 1 >= release_limit then + box.space[args.tube_name]:delete(args.task_id) + if release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then + put_in_dlq(args.options, task, args.tube_name, args.extra) + end + deleted = true + else task = box.space[args.tube_name]:update(args.task_id, { {'=', index.status, state.READY}, {'=', index.next_event, task[index.created] + task[index.ttl]}, + {'+', index.release_count, 1}, }) + updated = true end return task end) - if args.extra and args.extra.log_request then - log_operation("release", task) + if updated or deleted then + if args.extra and args.extra.log_request then + log_operation("release", task) + end + update_stat(args.tube_name, 'release') + end + + if deleted then + task = task:tomap() + task.status = state.DONE + + if args.extra and args.extra.log_request then + log_operation("delete", task) + end + + update_stat(args.tube_name, 'delete') + update_stat(args.tube_name, 'done') end - update_stat(args.tube_name, 'release') wc_signal(args.tube_name) + return normalize_task(task) end diff --git a/sharded_queue/router/queue.lua b/sharded_queue/router/queue.lua index accccb7..d555dac 100644 --- a/sharded_queue/router/queue.lua +++ b/sharded_queue/router/queue.lua @@ -2,6 +2,7 @@ local is_cartridge_package, cartridge = pcall(require, 'cartridge') local vshard = require('vshard') local tube = require('sharded_queue.router.tube') local utils = require('sharded_queue.utils') +local consts = require('sharded_queue.consts') local queue_global = { tube = {}, @@ -66,6 +67,7 @@ if is_cartridge_package then end tubes[tube_name] = options + ok, err = cartridge.config_patch_clusterwide({ tubes = tubes }) if not ok then error(err) diff --git a/sharded_queue/router/tube.lua b/sharded_queue/router/tube.lua index ff3e5e5..919d5ea 100644 --- a/sharded_queue/router/tube.lua +++ b/sharded_queue/router/tube.lua @@ -421,6 +421,7 @@ local function new(name, metrics, options) wait_max = options.wait_max, wait_factor = options.wait_factor or time.DEFAULT_WAIT_FACTOR, log_request = utils.normalize.log_request(options.log_request), + release_limit_policy = options.release_limit_policy, }, new_metrics_metatable(metrics)) end diff --git a/sharded_queue/storage.lua b/sharded_queue/storage.lua index f33d03f..8bc81e6 100644 --- a/sharded_queue/storage.lua +++ b/sharded_queue/storage.lua @@ -16,6 +16,12 @@ local function validate_config(cfg) if not ok then return ok, err end + + ok, err = utils.validate_dlq(cfg_tubes or {}) + if not ok then + return ok, err + end + return utils.validate_cfg(cfg_tubes['cfg']) end diff --git a/sharded_queue/storage/tubes.lua b/sharded_queue/storage/tubes.lua index f29417c..12ee801 100644 --- a/sharded_queue/storage/tubes.lua +++ b/sharded_queue/storage/tubes.lua @@ -1,4 +1,5 @@ local drivers = require('sharded_queue.storage.drivers') +local consts = require('sharded_queue.consts') local tubes = {} @@ -9,6 +10,9 @@ local function map_tubes(cfg_tubes) for tube_name, tube_opts in pairs(cfg_tubes) do if tube_opts.enable == nil or tube_opts.enable == true then result[tube_name] = drivers.get(tube_opts.driver) + if tube_opts.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then + result[tube_name .. consts.DLQ_SUFFIX] = drivers.get(tube_opts.driver) + end end end return result @@ -42,10 +46,21 @@ local function update(self, cfg_tubes) local new = {} for tube_name, driver in pairs(self.tubes) do if existing_tubes[tube_name] == nil then - self.tubes[tube_name].create({ - name = tube_name, - options = cfg_tubes[tube_name] - }) + local tube = cfg_tubes[tube_name:sub(1, -5)] + if tube_name:sub(-4) == consts.DLQ_SUFFIX and tube then + local dlq_options = table.deepcopy(tube or {}) + dlq_options.release_limit = -1 + dlq_options.release_limit_policy = nil + self.tubes[tube_name].create({ + name = tube_name, + options = dlq_options + }) + else + self.tubes[tube_name].create({ + name = tube_name, + options = cfg_tubes[tube_name] + }) + end table.insert(new, tube_name) end end diff --git a/sharded_queue/utils.lua b/sharded_queue/utils.lua index b6c31da..1f42bd0 100644 --- a/sharded_queue/utils.lua +++ b/sharded_queue/utils.lua @@ -1,6 +1,7 @@ local fiber = require('fiber') local metrics = require('sharded_queue.metrics') +local consts = require('sharded_queue.consts') local utils = {} @@ -98,6 +99,29 @@ function utils.validate_options(options) end end + if options.release_limit then + if type(options.release_limit) ~= 'number' then + return false, "release_limit must be number" + end + end + + if options.release_limit_policy then + if type(options.release_limit_policy) ~= 'string' then + return false, "release_limit_policy must be string" + end + + local found = false + for _, v in pairs(consts.RELEASE_LIMIT_POLICY) do + if options.release_limit_policy == v then + found = true + end + end + + if not found then + return false, "unknown release_limit_policy" + end + end + return true end @@ -145,6 +169,18 @@ function utils.validate_cfg(cfg) return true end +function utils.validate_dlq(tubes) + local tubes = tubes or {} + for tube_name, tube_opts in pairs(tubes) do + if tube_opts.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ + and tubes[tube_name .. consts.DLQ_SUFFIX] ~= nil then + local msg = 'dead letter queue %s could not be created for tube %s' + return nil, msg:format(tube_name .. consts.DLQ_SUFFIX, tube_name) + end + end + return true +end + function utils.get_tarantool_version() local version_parts = rawget(_G, '_TARANTOOL'):split('-', 3) diff --git a/test/dlq_test.lua b/test/dlq_test.lua new file mode 100644 index 0000000..e7f6735 --- /dev/null +++ b/test/dlq_test.lua @@ -0,0 +1,202 @@ +local t = require('luatest') +local g = t.group('dlq_test') + +local helper = require('test.helper') +local utils = require('test.helper.utils') + +local consts = require('sharded_queue.consts') + +g.before_all(function() + g.queue_conn = helper.get_evaler('queue-router') +end) + +for test_name, options in pairs({ + fifottl = { + driver = 'sharded_queue.drivers.fifottl', + release_limit = 5, + release_limit_policy = consts.RELEASE_LIMIT_POLICY.DELETE, + }, + fifo = { + driver = 'sharded_queue.drivers.fifo', + release_limit = 5, + release_limit_policy = consts.RELEASE_LIMIT_POLICY.DELETE, + }, + fifo_release_policy_nil = { + driver = 'sharded_queue.drivers.fifo', + release_limit = 5, + }, +}) do + g['test_dlq_delete_' .. test_name] = function() + local tube_name = 'test_dlq_delete_' .. test_name + helper.create_tube(tube_name, options) + + local task_count = 10 + local tasks_data = {} + + for i = 1, task_count do + table.insert(tasks_data, { + name = 'task_' .. i, + raw = '*' + }) + end + + local task_ids = {} + for _, data in pairs(tasks_data) do + local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data }) + t.assert_equals(task[utils.index.status], utils.state.READY) + task_ids[task[utils.index.task_id]] = true + end + + -- Release tasks several times to reach 'release_limit'. + for _ = 1, options.release_limit do + local taken_task_ids = {} + + for _ = 1, task_count do + local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take')) + t.assert_equals(task[utils.index.status], utils.state.TAKEN) + taken_task_ids[task[utils.index.task_id]] = true + end + t.assert_equals(task_ids, taken_task_ids) + + for task_id, _ in pairs(taken_task_ids) do + g.queue_conn:call(utils.shape_cmd(tube_name, 'release'), { task_id }) + end + end + + -- All tasks should be deleted. + local stat = g.queue_conn:call('queue.statistics', { tube_name }) + + t.assert_equals(stat.calls.delete, task_count) + end +end + +for test_name, options in pairs({ + fifottl = { + driver = 'sharded_queue.drivers.fifottl', + release_limit = 2, + release_limit_policy = consts.RELEASE_LIMIT_POLICY.DLQ, + }, + fifo = { + driver = 'sharded_queue.drivers.fifo', + release_limit = 2, + release_limit_policy = consts.RELEASE_LIMIT_POLICY.DLQ, + } +}) do + g['test_dlq_move_' .. test_name] = function() + local tube_name = 'test_dlq_move_' .. test_name + helper.create_tube(tube_name, options) + + local task_count = 10 + local tasks_data = {} + + for i = 1, task_count do + table.insert(tasks_data, { + name = 'task_' .. i, + raw = '*' + }) + end + + local task_ids = {} + for _, data in pairs(tasks_data) do + local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data }) + t.assert_equals(task[utils.index.status], utils.state.READY) + task_ids[task[utils.index.task_id]] = true + end + + -- Release tasks several times to reach 'release_limit'. + for _ = 1, options.release_limit do + local taken_task_ids = {} + + for _ = 1, task_count do + local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take')) + t.assert_equals(task[utils.index.status], utils.state.TAKEN) + taken_task_ids[task[utils.index.task_id]] = true + end + t.assert_equals(task_ids, taken_task_ids) + + for task_id, _ in pairs(taken_task_ids) do + g.queue_conn:call(utils.shape_cmd(tube_name, 'release'), { task_id }) + end + end + + -- All tasks should be deleten and put into DLQ. + local stat = g.queue_conn:call('queue.statistics', { tube_name }) + t.assert_equals(stat.calls.delete, task_count) + local dlq_stat = g.queue_conn:call('queue.statistics', { tube_name .. consts.DLQ_SUFFIX }) + t.assert_equals(dlq_stat.calls.put, task_count) + end +end + +function g.test_ttr_release_move_dlq() + local tube_name = 'test_ttr_release_move_dlq' + helper.create_tube( + tube_name, + { + driver = 'sharded_queue.drivers.fifottl', + release_limit = 1, + release_limit_policy = consts.RELEASE_LIMIT_POLICY.DLQ + } + ) + + g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { + 'simple data', + { ttl = 60, ttr = 0.1 } + }) + local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take')) + t.assert_equals(task[utils.index.data], 'simple data') + t.assert_equals(task[utils.index.status], utils.state.TAKEN) + + -- Wait for a ttr. + t.helpers.retrying({}, function() + -- Task should be deleten because of release_limit = 1 and put into DLQ. + local stat = g.queue_conn:call('queue.statistics', { tube_name }) + t.assert_equals(stat.calls.delete, 1) + local dlq_stat = g.queue_conn:call('queue.statistics', { tube_name .. consts.DLQ_SUFFIX }) + t.assert_equals(dlq_stat.calls.put, 1) + end) +end + +function g.test_dlq_validation() + t.skip_if(utils.is_tarantool_3(), 'the role is available only for Cartridge') + local tube_name = 'test_dlq_validation' + + utils.try(function() + local options = { + driver = 'sharded_queue.drivers.fifottl', + release_limit = 1, + release_limit_policy = 'DLQfoo' + } + g.queue_conn:eval('queue.create_tube(...)', {tube_name, options}) + end, function(err) + t.assert_str_contains(tostring(err), 'unknown release_limit_policy') + end) + + utils.try(function() + local options = { + driver = 'sharded_queue.drivers.fifottl', + release_limit = 1, + release_limit_policy = 123 + } + g.queue_conn:eval('queue.create_tube(...)', {tube_name, options}) + end, function(err) + t.assert_str_contains(tostring(err), 'release_limit_policy must be string') + end) +end + +function g.test_dlq_validation_already_exists() + t.skip_if(utils.is_tarantool_3(), 'the role is available only for Cartridge') + local tube_name = 'test_dlq_validation_already_exists' + local dlq_tube_name = tube_name .. consts.DLQ_SUFFIX + + utils.try(function() + local options = { + driver = 'sharded_queue.drivers.fifottl', + release_limit = 1, + release_limit_policy = consts.RELEASE_LIMIT_POLICY.DLQ + } + g.queue_conn:eval('queue.create_tube(...)', {dlq_tube_name, options}) + g.queue_conn:eval('queue.create_tube(...)', {tube_name, options}) + end, function(err) + t.assert_str_contains(tostring(err), 'test_dlq_validation_already_exists_dlq could not be created') + end) +end diff --git a/test/helper/utils.lua b/test/helper/utils.lua index 7a55387..908b621 100644 --- a/test/helper/utils.lua +++ b/test/helper/utils.lua @@ -64,4 +64,11 @@ function utils.is_tarantool_3() return major == 3 end +function utils.try(f, catch_f) + local status, exception = pcall(f) + if not status then + catch_f(exception) + end +end + return utils diff --git a/test/release_limit_test.lua b/test/release_limit_test.lua new file mode 100644 index 0000000..9bef8f7 --- /dev/null +++ b/test/release_limit_test.lua @@ -0,0 +1,106 @@ +local t = require('luatest') +local g = t.group('release_limit_test') + +local fiber = require('fiber') + +local helper = require('test.helper') +local utils = require('test.helper.utils') + +g.before_all(function() + g.queue_conn = helper.get_evaler('queue-router') +end) + +for test_name, options in pairs({ + fifottl = { + driver = 'sharded_queue.drivers.fifottl', + release_limit = 5, + }, + fifo = { + driver = 'sharded_queue.drivers.fifo', + release_limit = 5, + } +}) do + g['test_release_limit_' .. test_name] = function() + local tube_name = 'test_release_limit_' .. test_name + helper.create_tube(tube_name, options) + + local task_count = 10 + local tasks_data = {} + + for i = 1, task_count do + table.insert(tasks_data, { + name = 'task_' .. i, + raw = '*' + }) + end + + local task_ids = {} + for _, data in pairs(tasks_data) do + local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data }) + t.assert_equals(task[utils.index.status], utils.state.READY) + task_ids[task[utils.index.task_id]] = true + end + + -- Release tasks several times to reach 'release_limit' + for _ = 1, options.release_limit do + local taken_task_ids = {} + + for _ = 1, task_count do + local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take')) + t.assert_equals(task[utils.index.status], utils.state.TAKEN) + taken_task_ids[task[utils.index.task_id]] = true + end + t.assert_equals(task_ids, taken_task_ids) + + for task_id, _ in pairs(taken_task_ids) do + g.queue_conn:call(utils.shape_cmd(tube_name, 'release'), { task_id }) + end + end + + -- All tasks should be deleted + local stat = g.queue_conn:call('queue.statistics', { tube_name }) + + t.assert_equals(stat.calls.delete, task_count) + end +end + +function g.test_ttr_release_limit_test() + local tube_name = 'test_ttr_release_limit_test' + helper.create_tube( + tube_name, + { + driver = 'sharded_queue.drivers.fifottl', + release_limit = 1, + } + ) + + g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { + 'simple data', + { ttl = 60, ttr = 0.1 } + }) + local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'take')) + t.assert_equals(task[utils.index.data], 'simple data') + t.assert_equals(task[utils.index.status], utils.state.TAKEN) + + -- Wait for a ttr + fiber.sleep(0.2) + + -- Task should be deleten because of release_limit = 1 + local stat = g.queue_conn:call('queue.statistics', { tube_name }) + t.assert_equals(stat.calls.delete, 1) +end + +function g.test_release_limit_validation() + t.skip_if(utils.is_tarantool_3(), 'the role is available only for Cartridge') + local tube_name = 'test_release_limit_validation' + local options = { + driver = 'sharded_queue.drivers.fifottl', + release_limit = 'foo', + } + + utils.try(function() + g.queue_conn:eval('queue.create_tube(...)', {tube_name, options}) + end, function(err) + t.assert_str_contains(tostring(err), 'release_limit must be number') + end) +end diff --git a/test/timeout_test.lua b/test/timeout_test.lua index 837f56f..261fda4 100644 --- a/test/timeout_test.lua +++ b/test/timeout_test.lua @@ -37,7 +37,7 @@ function g.test_try_waiting() local waiting_time = tonumber(channel:get()) / 1e6 local task = channel:get() - t.assert_almost_equals(waiting_time, 3, 0.1) + t.assert_almost_equals(waiting_time, 3, 0.3) t.assert_equals(task, nil) channel:close() @@ -64,7 +64,7 @@ function g.test_wait_put_taking() local waiting_time = tonumber(channel:get()) / 1e6 local task = channel:get() - t.assert_almost_equals(waiting_time, timeout / 2, 0.1) + t.assert_almost_equals(waiting_time, timeout / 2, 0.3) t.assert_equals(task[utils.index.data], 'simple_task') channel:close()