Skip to content

Commit c06aaa7

Browse files
committed
jobqueue updates
1 parent 5d3d989 commit c06aaa7

13 files changed

Lines changed: 243 additions & 209 deletions

File tree

DESCRIPTION

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
Package: webqueue
22
Type: Package
33
Title: Multicore HTTP Server
4-
Version: 1.1.0
5-
Date: 2025-04-04
4+
Version: 1.2.0
5+
Date: 2025-05-21
66
Authors@R: c(
77
person(
88
"Daniel P.", "Smith",
@@ -27,14 +27,14 @@ Depends: R (>= 4.2.0)
2727
Imports:
2828
cli,
2929
httpuv,
30+
interprocess (>= 1.2.0),
3031
jsonlite,
31-
jobqueue (>= 1.6.0),
32+
jobqueue (>= 1.6.6),
3233
later,
3334
parallelly,
3435
promises,
3536
R6,
3637
rlang,
37-
semaphore,
3838
webutils,
3939
utils
4040
Suggests:
@@ -44,3 +44,4 @@ Suggests:
4444
rmarkdown,
4545
testthat (>= 3.0.0),
4646
withr
47+
Remotes:cmmr/jobqueue

NAMESPACE

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@
22

33
importFrom("cli", "ansi_strip", "cli_abort", "cli_text", "style_italic", "col_grey", "style_bold")
44
importFrom("httpuv", "randomPort", "startServer", "staticPath", "staticPathOptions")
5-
importFrom("jobqueue", "Queue", "Worker", "Job", "%<>%", "%>%")
5+
importFrom("jobqueue", "jobqueue", "%<>%", "%>%")
66
importFrom("jsonlite", "fromJSON", "minify", "prettify", "toJSON", "unbox")
77
importFrom("later", "later", "run_now")
88
importFrom("parallelly", "availableCores")
99
importFrom("promises", "as.promise", "then")
1010
importFrom("R6", "R6Class")
1111
importFrom("rlang", "%||%", "as_function", "catch_cnd", "cnd_signal", "env_unbind", "is_formula", "is_environment")
12-
importFrom("semaphore", "create_semaphore", "decrement_semaphore", "remove_semaphore")
12+
importFrom("interprocess", "semaphore")
1313
importFrom("webutils", "parse_http", "parse_query")
1414
importFrom("utils", "capture.output", "hasName", "str")
1515

1616
export(`cookie`)
1717
export(`header`)
1818
export(`js_obj`)
1919
export(`response`)
20-
export(`WebQueue`)
20+
export(`webqueue`)
2121

2222
S3method(print, wq_header)
2323
S3method(print, wq_response)

R/demo.r

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ demo <- function () { # nocov start
161161
}
162162

163163

164-
svr <- WebQueue$new(
164+
svr <- webqueue(
165165
handler = handler,
166166
host = '0.0.0.0',
167167
port = 8080L,

R/response.r

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
#' Compile an HTTP response.
33
#'
4-
#' If your WebQueue's `handler` function returns a list, json object, character
4+
#' If your webqueue's `handler` function returns a list, json object, character
55
#' vector, or scalar integer, `response()` will be used to transform that
66
#' result into an HTTP response.\cr\cr
77
#' You may also call `response()` within your handler to better customize the

R/utils.r

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ valid_string <- function (x, ok = '', no = '', null_ok = FALSE) {
4444

4545
}
4646

47+
is_cran_check <- function() {
48+
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
49+
FALSE
50+
} else {
51+
Sys.getenv("_R_CHECK_PACKAGE_NAME_", "") != ""
52+
}
53+
}
54+
4755

4856
code_to_msg <- list(
4957
'100' = "Continue",

R/webqueue.r

Lines changed: 112 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11

22
#' Queues and Services HTTP Requests
33
#'
4-
#' @name WebQueue
5-
#'
64
#' @description
75
#'
86
#' Connects the 'httpuv' and 'jobqueue' R packages.
@@ -89,44 +87,110 @@
8987
#' static paths. If not set or NULL, then it will use the result from
9088
#' calling `httpuv::staticPathOptions()` with no arguments.
9189
#'
90+
#'
91+
#' @return
92+
#' A `webqueue` object with the following methods:
93+
#' * `$url`
94+
#' - Returns the URL where the server is available.
95+
#' * `$stop(reason = 'server stopped')`
96+
#' - Shuts down the webqueue and all associated subprocesses. Stopped Jobs
97+
#' will have their `$output` set to a object of class `<interrupt/condition>`.
98+
#' - `reason` - A brief message for the condition object.
99+
#' - Returns this webqueue, invisibly.
92100
#'
93101
#' @export
94-
#' @examples
102+
#' @examplesIf ! jobqueue:::is_cran_check()
95103
#'
96104
#' library(webqueue)
97105
#'
98-
#' wq <- WebQueue$new(function (req) 'Hello World!\n')
106+
#' wq <- webqueue(function (req) 'Hello World!\n')
99107
#' readLines(wq$url)
100108
#' wq$stop()
101109
#'
102110

103-
WebQueue <- R6Class(
104-
classname = "WebQueue",
111+
webqueue <- function (
112+
handler,
113+
host = '0.0.0.0',
114+
port = 8080L,
115+
parse = NULL,
116+
globals = list(),
117+
packages = NULL,
118+
namespace = NULL,
119+
init = NULL,
120+
max_cpus = availableCores(),
121+
workers = ceiling(max_cpus * 1.2),
122+
timeout = NULL,
123+
hooks = NULL,
124+
reformat = NULL,
125+
stop_id = NULL,
126+
copy_id = NULL,
127+
bg = TRUE,
128+
quiet = FALSE,
129+
onHeaders = NULL,
130+
staticPaths = NULL,
131+
staticPathOptions = NULL ) {
132+
133+
# Capture curly-brace expression
134+
init_subst <- substitute(init)
135+
if (isa(init_subst, '{')) init <- init_subst
136+
137+
# Forward arguments onto class constructor
138+
webqueue_class$new(
139+
handler = handler,
140+
host = host,
141+
port = port,
142+
parse = parse,
143+
globals = globals,
144+
packages = packages,
145+
namespace = namespace,
146+
init = init,
147+
max_cpus = max_cpus,
148+
workers = workers,
149+
timeout = timeout,
150+
hooks = hooks,
151+
reformat = reformat,
152+
stop_id = stop_id,
153+
copy_id = copy_id,
154+
bg = bg,
155+
quiet = quiet,
156+
onHeaders = onHeaders,
157+
staticPaths = staticPaths,
158+
staticPathOptions = staticPathOptions )
159+
160+
}
161+
162+
163+
164+
#' @noRd
165+
#' @keywords internal
166+
167+
webqueue_class <- R6Class(
168+
classname = "webqueue",
105169
cloneable = FALSE,
106170

107171
public = list(
108172

109173
#' @description
110-
#' Creates an `httpuv::WebServer` with requests handled by a `jobqueue::Queue`.
174+
#' Creates an `httpuv::WebServer` with requests handled by a `jobqueue::jobqueue`.
111175
#'
112176
#' @return A `WebQueue` object.
113177
initialize = function (
114178
handler,
115-
host = '0.0.0.0',
116-
port = 8080L,
117-
parse = NULL,
118-
globals = list(),
119-
packages = NULL,
120-
namespace = NULL,
121-
init = NULL,
122-
max_cpus = availableCores(),
123-
workers = ceiling(max_cpus * 1.2),
124-
timeout = NULL,
125-
hooks = NULL,
126-
reformat = NULL,
127-
stop_id = NULL,
128-
copy_id = NULL,
129-
bg = TRUE,
179+
host = '0.0.0.0',
180+
port = 8080L,
181+
parse = NULL,
182+
globals = list(),
183+
packages = NULL,
184+
namespace = NULL,
185+
init = NULL,
186+
max_cpus = availableCores(),
187+
workers = ceiling(max_cpus * 1.2),
188+
timeout = NULL,
189+
hooks = NULL,
190+
reformat = NULL,
191+
stop_id = NULL,
192+
copy_id = NULL,
193+
bg = TRUE,
130194
quiet = FALSE,
131195
onHeaders = NULL,
132196
staticPaths = NULL,
@@ -163,38 +227,38 @@ WebQueue <- R6Class(
163227
# Launch WebQueue on a different R process
164228
if (isTRUE(bg)) {
165229

166-
worker <- jobqueue::Worker$new()
167-
sem <- create_semaphore()
230+
worker <- jobqueue::worker_class$new()
231+
sem <- interprocess::semaphore()
168232
start_t <- Sys.time()
169233

170-
job <- jobqueue::Job$new(
234+
job <- jobqueue::job_class$new(
171235
vars = environment(),
172236
expr = { # nocov start
173237

174238
# signals an error if unable to start
175-
webqueue::WebQueue$new(
176-
handler = handler,
177-
host = host,
178-
port = port,
179-
parse = parse,
180-
globals = globals,
181-
packages = packages,
182-
namespace = namespace,
183-
init = init,
184-
max_cpus = max_cpus,
185-
workers = workers,
186-
timeout = timeout,
187-
hooks = hooks,
188-
reformat = reformat,
189-
stop_id = stop_id,
190-
copy_id = copy_id,
191-
bg = FALSE,
239+
webqueue::webqueue(
240+
handler = handler,
241+
host = host,
242+
port = port,
243+
parse = parse,
244+
globals = globals,
245+
packages = packages,
246+
namespace = namespace,
247+
init = init,
248+
max_cpus = max_cpus,
249+
workers = workers,
250+
timeout = timeout,
251+
hooks = hooks,
252+
reformat = reformat,
253+
stop_id = stop_id,
254+
copy_id = copy_id,
255+
bg = FALSE,
192256
quiet = quiet,
193257
onHeaders = onHeaders,
194258
staticPaths = staticPaths,
195259
staticPathOptions = staticPathOptions )
196260

197-
semaphore::increment_semaphore(sem)
261+
sem$post()
198262

199263
httpuv::service(timeoutMs = Inf)
200264

@@ -206,7 +270,7 @@ WebQueue <- R6Class(
206270

207271
cnd <- catch_cnd({
208272

209-
while (!decrement_semaphore(sem, wait = FALSE)) {
273+
while (!sem$wait(timeout_ms = 0)) {
210274

211275
if (job$is_done) {
212276
output <- job$output
@@ -220,7 +284,7 @@ WebQueue <- R6Class(
220284
})
221285

222286
if (!is.null(cnd)) worker$stop()
223-
remove_semaphore(sem)
287+
sem$remove()
224288
if (!is.null(cnd)) cnd_signal(cnd)
225289

226290
}
@@ -233,8 +297,8 @@ WebQueue <- R6Class(
233297

234298
cnd <- catch_cnd({
235299

236-
# Start a Queue.
237-
private$.jobqueue <- jobqueue::Queue$new(
300+
# Start a `jobqueue`.
301+
private$.jobqueue <- jobqueue::jobqueue(
238302
globals = globals,
239303
packages = packages,
240304
namespace = namespace,
@@ -250,7 +314,7 @@ WebQueue <- R6Class(
250314

251315
later::run_now()
252316
if (!identical(private$.jobqueue$state, 'idle'))
253-
stop('Unable to start jobqueue::Queue') # nocov
317+
stop('Unable to start a `jobqueue`') # nocov
254318

255319

256320
# Start a Server.

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pak::pak("cmmr/webqueue")
3838
```r
3939
library(webqueue)
4040

41-
wq <- WebQueue$new(~{ 'Hello world!\n' })
41+
wq <- webqueue(~{ 'Hello world!\n' })
4242

4343
readLines('http://localhost:8080')
4444
#> [1] "Hello world!"
@@ -50,7 +50,7 @@ wq$stop()
5050
## Query Parameters
5151

5252
```r
53-
wq <- WebQueue$new(~{ jsonlite::toJSON(.$ARGS) })
53+
wq <- webqueue(~{ jsonlite::toJSON(.$ARGS) })
5454

5555
cat(RCurl::getURL('http://localhost:8080?myvar=123'))
5656
#> {"myvar":["123"]}
@@ -64,7 +64,7 @@ Accepts both GET and POST parameters.
6464
## Simple API
6565

6666
```r
67-
wq <- WebQueue$new(
67+
wq <- webqueue(
6868
handler = function (req) {
6969
switch(
7070
EXPR = req$PATH_INFO,
@@ -106,7 +106,7 @@ See `vignette('interrupts')` for more detailed examples.
106106
### Set a time limit
107107

108108
```r
109-
wq <- WebQueue$new(
109+
wq <- webqueue(
110110
handler = ~{ Sys.sleep(.$ARGS$s); 'Hello world!' },
111111
timeout = 1 )
112112

@@ -124,13 +124,13 @@ wq$stop()
124124
### Merge duplicate requests
125125

126126
```r
127-
wq <- WebQueue$new(
127+
wq <- webqueue(
128128
handler = function (req) { Sys.sleep(1); req$ARGS$x },
129129
copy_id = function (job) job$req$PATH_INFO )
130130
# ^^^^^^^ `copy_id` will be '/a' or '/b'
131131

132132
# Fetch two URLs at the same time. '/b' path is merged.
133-
jq <- jobqueue::Queue$new(workers = 3L)$wait() # vv
133+
jq <- jobqueue::jobqueue(workers = 3L)$wait() # vv
134134
a1 <- jq$run({ RCurl::getURL('http://localhost:8080/a?x=first') })
135135
b1 <- jq$run({ RCurl::getURL('http://localhost:8080/b?x=second') })
136136
b2 <- jq$run({ RCurl::getURL('http://localhost:8080/b?x=third') })
@@ -145,13 +145,13 @@ wq$stop()
145145

146146
### Stop duplicate requests
147147
```r
148-
wq <- WebQueue$new(
148+
wq <- webqueue(
149149
handler = function (req) { Sys.sleep(1); req$ARGS$x },
150150
stop_id = function (job) job$req$PATH_INFO )
151151
# ^^^^^^^ `stop_id` will be '/a' or '/b'
152152

153153
# Fetch three URLs at the same time. '/b' path is stopped.
154-
jq <- jobqueue::Queue$new(workers = 3L)$wait() # vv
154+
jq <- jobqueue::jobqueue(workers = 3L)$wait() # vv
155155
a1 <- jq$run({ RCurl::getURL('http://localhost:8080/a?x=first') })
156156
b1 <- jq$run({ RCurl::getURL('http://localhost:8080/b?x=second') })
157157
b2 <- jq$run({ RCurl::getURL('http://localhost:8080/b?x=third') })

0 commit comments

Comments
 (0)