-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtasks.lua
More file actions
174 lines (156 loc) · 4.91 KB
/
tasks.lua
File metadata and controls
174 lines (156 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
local Task
do
local remove = table.remove
local unpack = table.unpack
local create = coroutine.create
local resume = coroutine.resume
local status = coroutine.status
Task = {}
local meta = {__index = Task}
--[[@
@name new
@desc Creates a new instance of Task: a function that can be run by an EventLoop
@desc If you await a Task, it will return the raw function returned values.
@desc /!\ If you safely await it, it might return nil, and you need to check its error manually.
@param fnc<function> The function that the task will execute. It can have special EventLoop calls like await, sleep, call_soon...
@param args<table> A table (with no associative members) to set as the arguments. Can have multiple items.
@param obj?<table> The table to turn into a Task.
@returns Task The task object.
@struct {
arguments = {}, -- The arguments to give the function the next time Task:run is executed.
coro = coroutine_function, -- The coroutine wrapping the task function.
futures = {}, -- A list of futures to set the result after the task is done.
futures_index = 0, -- The futures list pointer
stop_error_propagation = false, -- Whether to stop the error propagation or not
error = false or string, -- The error, if any
done = false, -- Whether the task is done or not
cancelled = false, -- Whether the task is cancelled or not
timer = nil or Timer, -- nil if the task is not scheduled, a Timer object otherwise.
ran_once = false, -- Whether the task did run (or at least partially run)
_scheduled = false, -- Whether the task is scheduled or not (in EventLoop.tasks)
_is_error_handler = false -- Whether the task is the error handler or not
}
]]
function Task.new(fnc, args, obj)
obj = obj or {}
obj.arguments = args
obj.coro = create(fnc)
obj.futures = {}
obj.futures_index = 0
return setmetatable(obj, meta)
end
--[[@
@name _can_await
@desc Throws an error if the Task can't be awaited.
@param loop<EventLoop> The EventLoop executing await
]]
function Task:_can_await(loop)
if self.cancelled or self.done then
error("Can't await a cancelled or done Task.", 3)
end
end
--[[@
@name _pause_await
@desc Returns whether the task awaiting this one needs to be paused or not
@param loop<EventLoop> The EventLoop executing await
@returns boolean If the task awaiting needs to be paused or not
]]
function Task:_pause_await(loop)
return true
end
--[[@
@name _await
@desc Schedules this task, pauses the awaiting one, and returns once the result is done.
@param loop<EventLoop> The EventLoop executing await
@returns mixed The returned value.
]]
function Task:_await(loop)
if self._next_task then
error("Can't await a Task more than once. Use Futures instead.", 3)
end
self.paused = false
self._next_task = loop.current_task
loop:add_task(self)
return loop:stop_task_execution()
end
--[[@
@name cancel
@desc Cancels the task, and if it is awaiting something, cancels the awaiting object too.
]]
function Task:cancel()
if self.timer then
self.timer.list:remove(self.timer)
self.timer.event_loop:add_task(self)
elseif self.awaiting then
self.awaiting:cancel()
end
self.cancelled = true
end
--[[@
@name run
@desc Runs the task function
@param loop<EventLoop> The loop that will run this part of the task
]]
function Task:run(loop)
self.ran_once = true
local data
if self.arguments then
data = {resume(self.coro, unpack(self.arguments))}
self.arguments = nil
else
data = {resume(self.coro)}
end
while data[2] == "get_event_loop" do
data = {resume(self.coro, loop)}
end
if not self.cancelled then
if status(self.coro) == "dead" then
self.done = true
if data[1] then
if self.futures_index > 0 or self._next_task then
remove(data, 1)
else
return
end
local future
for index = 1, self.futures_index do
future = self.futures[index]
future.obj:set_result(data, true, future.index)
end
if self._next_task then
self._next_task.arguments = data
self._next_task.awaiting = nil
loop:add_task(self._next_task)
end
else
self.error = debug.traceback(self.coro, data[2])
end
end
end
end
--[[@
@name add_future
@desc Adds a future that will be set after the task runs.
@param future<Future> The future object. Can be a variant too.
@param index?<int> The index given to the future object (used only with FutureSemaphore)
]]
function Task:add_future(future, index)
self.futures_index = self.futures_index + 1
self.futures[self.futures_index] = {obj=future, index=index}
end
end
--[[@
@name async
@desc A decorator function that will create a new task object with the function passed it everytime it is called.
@param fnc<function> The function
@returns function The wrapper.
]]
local function async(fnc)
return function(...)
return Task.new(fnc, {...}, {})
end
end
return {
Task = Task,
async = async
}