Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added

- Method truncate() to clean tubes (#78).
- `release_limit` configuration option: a taken task is removed from the queue
if the task has been released (by `release` or `ttr`) `release_limit` times (#8).

- `release_limit_policy` configuration option: control the behavior when a task
reached the `release_limit` (#9).

### Changed

Expand Down
81 changes: 81 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,5 +293,86 @@ installed and the feature is not disabled by the configuration.
* You can not create or drop tubes by API calls with Tarantool 3. You need
to update the role configuration instead.

* The `sharded-queue` drivers has additional configuration option `release_limit`. The option is an integer number:

* `release_limit` == nil or `release_limit` <= 0 - a driver works as before.
* `release_limit` > 0 - a taken task is removed from the queue if the task has been released (by `release` or `ttr`) `release_limit` times.

Example:

```lua
conn:call('queue.create_tube', { mytube, {
release_limit = 2, -- Delete task after 2nd release.
}})
```

* The `sharded-queue` drivers has additional configuration option `release_limit_policy` to control the behavior
when a task reached the `release_limit`:

* `release_limit_policy` = `nil` or `DELETE` - the task is deleted.
* `release_limit_policy` = `DLQ` - the task is moved to the dead letter queue and deleted from main.
Dead letter queue is a copy of the main queue with a name suffix `_dlq`.

Example:

```lua
conn:call('queue.create_tube', { mytube, {
release_limit = 2,
release_limit_policy = 'DLQ', -- A task will be moved to DLQ after 2nd release.
}})
```

```yaml
groups:
storages:
roles: ['roles.sharded-queue-storage']
roles_cfg:
roles.sharded-queue-storage:
cfg:
metrics: true
tubes:
mytube:
driver: sharded_queue.drivers.fifottl
release_limit: 2
release_limit_policy: 'DLQ'
```

Usage example:

```lua
tarantool@user:~/sharded_queue$ tarantool
Tarantool 1.10.3-6-gfbf53b9
type 'help' for interactive help
tarantool> netbox = require('net.box')
---
...
tarantool> queue_conn = netbox.connect('localhost:3301', {user = 'admin',password = 'secret-cluster-cookie'})
---
...
tarantool> queue_conn:call('queue.create_tube', { 'test_tube', {
release_limit = 1,
release_limit_policy = 'DLQ', -- A task will be moved to DLQ after first release.
}})
---
- {'wait_factor': 2, 'release_limit_policy': 'DLQ', 'log_request': false, 'tube_name': 'test_tube'}
...
tarantool> queue_conn:call('queue.tube.test_tube:put', { 'task_1' })
---
- [3653, 'r', 'task_1']
...
tarantool> queue_conn:call('queue.tube.test_tube:take')
---
- [3653, 't', 'task_1']
...
tarantool> queue_conn:call('queue.tube.test_tube:release', { 3653 }) -- task was erased from 'test_tube'.
---
- [3653, '-', 'task_1']
...
tarantool> queue_conn:call('queue.tube.test_tube_dlq:take') -- task is now in 'DLQ' tube.
---
- [3653, 't', 'task_1']
...
```

[metrics-summary]: https://www.tarantool.io/en/doc/latest/book/monitoring/api_reference/#summary
[queue-statistics]: https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics
24 changes: 23 additions & 1 deletion roles/sharded-queue-router.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local metrics = require('sharded_queue.router.metrics')
local roles = require('sharded_queue.roles')
local utils = require('sharded_queue.utils')
local queue = require('sharded_queue.router.queue')
local consts = require('sharded_queue.consts')

local role_name = roles.get_tarantool_role_name('router')
local storage_role_name = roles.get_tarantool_role_name('storage')
Expand Down Expand Up @@ -52,6 +53,11 @@ local function validate(conf)
if not ok then
error(role_name .. ": " .. err)
end

ok, err = utils.validate_dlq(tubes)
if not ok then
error(role_name .. ": " .. err)
end
return true
end

Expand All @@ -71,12 +77,28 @@ local function apply(conf)
for tube_name, options in pairs(conf_tubes) do
if queue.map()[tube_name] == nil then
queue.add(tube_name, metrics, options)
if options.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then
local dlq_options = table.deepcopy(options or {})
dlq_options.release_limit_policy = nil
dlq_options.release_limit = -1
queue.add(tube_name .. consts.DLQ_SUFFIX, metrics, dlq_options)
end
end
end

for tube_name, _ in pairs(queue.map()) do
if conf_tubes[tube_name] == nil then
queue.remove(tube_name)
-- Dead letter queue is not in config,
-- so instead we check that main queue is in it.
local remove = true
local tube = conf_tubes[tube_name:sub(1, -(#consts.DLQ_SUFFIX+1))]
if tube_name:sub(-#consts.DLQ_SUFFIX) == consts.DLQ_SUFFIX and tube
and tube.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then
remove = false
end
if remove then
queue.remove(tube_name)
end
end
end

Expand Down
5 changes: 5 additions & 0 deletions roles/sharded-queue-storage.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ local function validate(conf)
if not ok then
error(role_name .. ": " .. err)
end

ok, err = utils.validate_dlq(conf.tubes or {})
if not ok then
error(role_name .. ": " .. err)
end
return true
end

Expand Down
1 change: 1 addition & 0 deletions sharded-queue-scm-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ build = {
['sharded_queue.roles'] = 'sharded_queue/roles.lua',
['sharded_queue.stash'] = 'sharded_queue/stash.lua',
['sharded_queue.state'] = 'sharded_queue/state.lua',
['sharded_queue.consts'] = 'sharded_queue/consts.lua',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new file is missed.

Suggested change
['sharded_queue.consts'] = 'sharded_queue/consts.lua',
['sharded_queue.consts'] = 'sharded_queue/consts.lua',
['sharded_queue.dlq'] = 'sharded_queue/dlq.lua',

['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua',
['sharded_queue.router.config'] = 'sharded_queue/router/config.lua',
['sharded_queue.router.metrics'] = 'sharded_queue/router/metrics.lua',
Expand Down
20 changes: 20 additions & 0 deletions sharded_queue/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local config = require('sharded_queue.router.config')
local metrics = require('sharded_queue.router.metrics')
local utils = require('sharded_queue.utils')
local queue = require('sharded_queue.router.queue')
local consts = require('sharded_queue.consts')

function cfg_call(_, options)
options = options or {}
Expand Down Expand Up @@ -38,6 +39,12 @@ local function validate_config(cfg)
if not ok then
return ok, err
end

ok, err = utils.validate_dlq(cfg_tubes or {})
if not ok then
return ok, err
end

return utils.validate_cfg(cfg_tubes['cfg'])
end

Expand All @@ -51,12 +58,25 @@ local function apply_config(cfg, opts)
end
elseif queue.map()[tube_name] == nil then
queue.add(tube_name, metrics, options)
if options.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then
local dlq_options = table.deepcopy(options or {})
dlq_options.release_limit_policy = nil
dlq_options.release_limit = -1
queue.add(tube_name .. consts.DLQ_SUFFIX, metrics, dlq_options)
end
end
end

-- try drop tubes --
for tube_name, _ in pairs(queue.map()) do
if tube_name ~= 'cfg' and cfg_tubes[tube_name] == nil then
-- Dead letter queue is not in config,
-- so instead we check that main queue is in it.
local tube = cfg_tubes[tube_name:sub(1, -(#consts.DLQ_SUFFIX+1))]
if tube_name:sub(-#consts.DLQ_SUFFIX) == consts.DLQ_SUFFIX and tube
and tube.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then
break
end
queue.remove(tube_name)
end
end
Expand Down
8 changes: 8 additions & 0 deletions sharded_queue/consts.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
return {
RELEASE_LIMIT_POLICY = {
DELETE = 'DELETE',
DLQ = 'DLQ',
},

DLQ_SUFFIX = '_dlq',
}
5 changes: 5 additions & 0 deletions sharded_queue/dlq.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
return {
DELETE = 'DELETE',
DLQ = 'DLQ',
DLQ_SUFFIX = '_dlq'
}
99 changes: 80 additions & 19 deletions sharded_queue/drivers/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@ local utils = require('sharded_queue.utils')
local log = require('log') -- luacheck: ignore
local stats = require('sharded_queue.stats.storage')
local vshard_utils = require('sharded_queue.storage.vshard_utils')
local consts = require('sharded_queue.consts')

local function update_stat(tube_name, name)
stats.update(tube_name, name, '+', 1)
end

local index = {
task_id = 1,
bucket_id = 2,
status = 3,
data = 4,
index = 5,
release_count = 6,
}

local method = {}

local function tube_create(args)
Expand All @@ -22,7 +32,8 @@ local function tube_create(args)
{ name = 'bucket_id', type = 'unsigned' },
{ name = 'status', type = 'string' },
{ name = 'data', type = '*' },
{ name = 'index', type = 'unsigned' }
{ name = 'index', type = 'unsigned' },
{ name = 'release_count', type = 'unsigned' }
}

local space = box.schema.create_space(args.name, space_options)
Expand Down Expand Up @@ -77,22 +88,43 @@ local function normalize_task(task)
return { task.task_id, task.status, task.data }
end

-- put task in space
function method.put(args)
local task = utils.atomic(function()
local idx = get_index(args)
local task_id = utils.pack_task_id(
local function put_in_tube(args, is_dlq)
local idx = get_index(args)

if is_dlq == false then
args.task_id = utils.pack_task_id(
args.bucket_id,
args.bucket_count,
idx)
end

return get_space(args):insert {
task_id,
args.bucket_id,
state.READY,
args.data,
idx
}
return get_space(args):insert {
args.task_id,
args.bucket_id,
state.READY,
args.data,
idx,
0
}
end

local function put_in_dlq(args, task)
-- setup dead letter queue args
local dlq_args = {}
dlq_args.tube_name = args.tube_name .. consts.DLQ_SUFFIX
dlq_args.data = task.data
dlq_args.bucket_id = task.bucket_id
dlq_args.task_id = task.task_id

put_in_tube(dlq_args, true)

update_stat(dlq_args.tube_name, 'put')
end

-- put task in space
function method.put(args)
local task = utils.atomic(function()
return put_in_tube(args, false)
end)

update_stat(args.tube_name, 'put')
Expand All @@ -103,10 +135,10 @@ end
function method.take(args)
local task = utils.atomic(function()
local task = get_space(args).index.status:min { state.READY }
if task == nil or task[3] ~= state.READY then
if task == nil or task[index.status] ~= state.READY then
return
end
return get_space(args):update(task.task_id, { { '=', 3, state.TAKEN } })
return get_space(args):update(task.task_id, { { '=', index.status, state.TAKEN } })
end)
if task == nil then return end

Expand Down Expand Up @@ -147,14 +179,43 @@ end

-- release task
function method.release(args)
local task = get_space(args):update(args.task_id, { { '=', 3, state.READY } })
local release_limit = args.options.release_limit or -1
local deleted = false
local release_limit_policy =
args.options.release_limit_policy or consts.RELEASE_LIMIT_POLICY.DELETE

local task = utils.atomic(function()
local task = get_space(args):update(args.task_id, {
{ '=', index.status, state.READY },
{ '+', index.release_count, 1 },
})
if task ~= nil and release_limit > 0 then
if task.release_count >= release_limit then
get_space(args):delete(args.task_id)
if release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then
put_in_dlq(args, task)
end
deleted = true
end
end
return task
end)

update_stat(args.tube_name, 'release')

if deleted then
task = task:tomap()
task.status = state.DONE
update_stat(args.tube_name, 'delete')
update_stat(args.tube_name, 'done')
end

return normalize_task(task)
end

-- bury task
function method.bury(args)
local task = get_space(args):update(args.task_id, { { '=', 3, state.BURIED } })
local task = get_space(args):update(args.task_id, { { '=', index.status, state.BURIED } })
update_stat(args.tube_name, 'bury')
return normalize_task(task)
end
Expand All @@ -166,11 +227,11 @@ function method.kick(args)
if task == nil then
return i - 1
end
if task[2] ~= state.BURIED then
if task[index.status] ~= state.BURIED then
return i - 1
end

task = get_space(args):update(task[1], { { '=', 3, state.READY } })
task = get_space(args):update(task[index.task_id], { { '=', index.status, state.READY } })
update_stat(args.tube_name, 'kick')
end
return args.count
Expand Down
Loading
Loading