Jobs go in. Results come out.
hooks
A named list of currently registered callback hooks.
jobs
Get or set - List of Jobs currently managed by this Queue.
state
The Queue's state: 'starting'
, 'idle'
, 'busy'
, 'stopped'
, or 'error.'
uid
Get or set - Unique identifier, e.g. 'Q1'
.
tmp
The Queue's temporary directory.
workers
Get or set - List of Workers used for processing Jobs.
cnd
The error that caused the Queue to stop.
new()
Creates a pool of background processes for handling $run()
and
$submit()
calls. These workers are initialized according to the
globals
, packages
, and init
arguments.
Queue$new(
globals = NULL,
packages = NULL,
init = NULL,
max_cpus = availableCores(),
workers = ceiling(max_cpus * 1.2),
timeout = NULL,
hooks = NULL,
reformat = NULL,
signal = FALSE,
cpus = 1L,
stop_id = NULL,
copy_id = NULL
)
globals
A named list of variables that all <Job>$expr
s will have
access to. Alternatively, an object that can be coerced to a named
list with as.list()
, e.g. named vector, data.frame, or environment.
packages
Character vector of package names to load on workers.
init
A call or R expression wrapped in curly braces to evaluate on
each worker just once, immediately after start-up. Will have access
to variables defined by globals
and assets from packages
.
Returned value is ignored.
max_cpus
Total number of CPU cores that can be reserved by all
running Jobs (sum(<Job>$cpus)
). Does not enforce limits on actual
CPU utilization.
workers
How many background Worker processes to start. Set to more
than max_cpus
to enable standby Workers to quickly swap out with
Workers that need to restart.
timeout, hooks, reformat, signal, cpus, stop_id, copy_id
Defaults for this Queue's $run()
method. Here only, stop_id
and copy_id
must be either a function (job)
or NULL
.
hooks
can set queue, worker, and/or job hooks - see the
"Attaching" section in vignette('hooks')
.
A Queue
object.
...
Arguments are not used currently.
run()
Creates a Job object and submits it to the queue for running.
Any NA
arguments will be replaced with their value from Queue$new()
.
Queue$run(
expr,
vars = list(),
timeout = NA,
hooks = NA,
reformat = NA,
signal = NA,
cpus = NA,
stop_id = NA,
copy_id = NA,
...
)
expr
A call or R expression wrapped in curly braces to evaluate on a
worker. Will have access to any variables defined by vars
, as well
as the Worker's globals
, packages
, and init
configuration.
See vignette('eval')
.
vars
A named list of variables to make available to expr
during
evaluation. Alternatively, an object that can be coerced to a named
list with as.list()
, e.g. named vector, data.frame, or environment.
Or a function (job)
that returns such an object.
timeout
A named numeric vector indicating the maximum number of
seconds allowed for each state the job passes through, or 'total' to
apply a single timeout from 'submitted' to 'done'. Can also limit the
'starting' state for Workers. A function (job)
can be used in place
of a number. Example: timeout = c(total = 2.5, running = 1)
.
See vignette('stops')
.
hooks
A named list of functions to run when the Job state changes,
of the form hooks = list(created = function (worker) {...})
. Or a
function (job)
that returns the same. Names of worker hooks are
typically 'created'
, 'submitted'
, 'queued'
, 'dispatched'
,
'starting'
, 'running'
, 'done'
, or '*'
(duplicates okay).
See vignette('hooks')
.
reformat
Set reformat = function (job)
to define what
<Job>$result
should return. The default, reformat = NULL
passes
<Job>$output
to <Job>$result
unchanged.
See vignette('results')
.
signal
Should calling <Job>$result
signal on condition objects?
When FALSE
, <Job>$result
will return the object without
taking additional action. Setting to TRUE
or a character vector of
condition classes, e.g. c('interrupt', 'error', 'warning')
, will
cause the equivalent of stop(<condition>)
to be called when those
conditions are produced. Alternatively, a function (job)
that
returns TRUE
or FALSE
. See vignette('results')
.
cpus
How many CPU cores to reserve for this Job. Or a
function (job)
that returns the same. Used to limit the number of
Jobs running simultaneously to respect <Queue>$max_cpus
. Does not
prevent a Job from using more CPUs than reserved.
stop_id
If an existing Job in the Queue has the same stop_id
,
that Job will be stopped and return an 'interrupt' condition object
as its result. stop_id
can also be a function (job)
that returns
the stop_id
to assign to a given Job. A stop_id
of NULL
disables this feature. See vignette('stops')
.
copy_id
If an existing Job in the Queue has the same copy_id
,
the newly submitted Job will become a "proxy" for that earlier Job,
returning whatever result the earlier Job returns. copy_id
can also
be a function (job)
that returns the copy_id
to assign to a given
Job. A copy_id
of NULL
disables this feature.
See vignette('stops')
.
...
Arbitrary named values to add to the returned Job object.
The new Job object.
submit()
Adds a Job to the Queue for running on a background process.
Queue$submit(job)
job
A Job object, as created by Job$new()
.
This Queue, invisibly.
wait()
Blocks until the Queue enters the given state.
Queue$wait(state = "idle", timeout = NULL, signal = TRUE)
state
The name of a Queue state. Typically one of:
'*'
- Every time the state changes.
'.next'
- Only one time, the next time the state changes.
'starting'
- Workers are starting.
'idle'
- All workers are ready/idle.
'busy'
- At least one worker is busy.
'stopped'
- Shutdown is complete.
timeout
Stop the Queue if it takes longer than this number of seconds, or NULL
.
signal
Raise an error if encountered (will also be recorded in <Queue>$cnd
).
This Queue, invisibly.
on()
Attach a callback function to execute when the Queue enters state
.
Queue$on(state, func)
state
The name of a Queue state. Typically one of:
'*'
- Every time the state changes.
'.next'
- Only one time, the next time the state changes.
'starting'
- Workers are starting.
'idle'
- All workers are ready/idle.
'busy'
- At least one worker is busy.
'stopped'
- Shutdown is complete.
func
A function that accepts a Queue object as input. Return value is ignored.
A function that when called removes this callback from the Queue.
stop()
Stop all jobs and workers.
Queue$stop(reason = "job queue shut down by user", cls = NULL)
reason
Passed to <Job>$stop()
for any Jobs currently managed by
this Queue.
cls
Passed to <Job>$stop()
for any Jobs currently managed by
this Queue.
This Queue, invisibly.