R6
class for controller groups.
controllers
List of R6
controller objects.
new()
Multi-controller constructor.
crew_class_controller_group$new(controllers = NULL)
controllers
List of R6
controller objects.
An R6
object with the controller group object.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
validate()
Validate the client.
crew_class_controller_group$validate()
NULL
(invisibly).
empty()
See if the controllers are empty.
crew_class_controller_group$empty(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push()
.
TRUE
if all the selected controllers are empty,
FALSE
otherwise.
saturated()
Check if a controller is saturated.
crew_class_controller_group$saturated(
collect = TRUE,
throttle = TRUE,
controller = NULL
)
collect
Logical of length 1, whether to collect the results of any newly resolved tasks before determining saturation.
throttle
Logical of length 1, whether to delay task collection
until the next request at least self$client$seconds_interval
seconds from the original request.
The idea is similar to shiny::throttle()
except that crew
does not
accumulate a backlog of requests. The technique improves robustness
and efficiency.
controller
Character vector of length 1 with the controller name.
Set to NULL
to select the default controller that push()
would choose.
A controller is saturated if the number of unresolved tasks
is greater than or equal to the maximum number of workers.
In other words, in a saturated controller, every available worker
has a task.
You can still push tasks to a saturated controller, but
tools that use crew
such as targets
may choose not to.
TRUE
if all the selected controllers are saturated,
FALSE
otherwise.
start()
Start one or more controllers.
crew_class_controller_group$start(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
NULL
(invisibly).
launch()
Launch one or more workers on one or more controllers.
crew_class_controller_group$launch(n = 1L, controllers = NULL)
n
Number of workers to launch in each controller selected.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
NULL
(invisibly).
scale()
Automatically scale up the number of workers if needed in one or more controller objects.
crew_class_controller_group$scale(throttle = FALSE, controllers = NULL)
throttle
Logical of length 1, whether to delay auto-scaling
until the next auto-scaling request at least
self$client$seconds_interval
seconds from the original request.
The idea is similar to shiny::throttle()
except that crew
does not
accumulate a backlog of requests. The technique improves robustness
and efficiency.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
See the scale()
method in individual controller classes.
NULL
(invisibly).
push()
Push a task to the head of the task list.
crew_class_controller_group$push(
command,
data = list(),
globals = list(),
substitute = TRUE,
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
seconds_timeout = NULL,
scale = TRUE,
throttle = TRUE,
name = NULL,
save_command = FALSE,
controller = NULL
)
command
Language object with R code to run.
data
Named list of local data objects in the evaluation environment.
globals
Named list of objects to temporarily assign to the
global environment for the task. See the reset_globals
argument of crew_controller_local()
.
substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of the
command
argument. If TRUE
(default) then command
is quoted
literally as you write it, e.g.
push(command = your_function_call())
. If FALSE
, then crew
assumes command
is a language object and you are passing its
value, e.g. push(command = quote(your_function_call()))
.
substitute = TRUE
is appropriate for interactive use,
whereas substitute = FALSE
is meant for automated R programs
that invoke crew
controllers.
seed
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed
argument of set.seed()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
algorithm
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind
argument of RNGkind()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the lib.loc
argument of require()
.
seconds_timeout
Optional task timeout passed to the .timeout
argument of mirai::mirai()
(after converting to milliseconds).
scale
Logical, whether to automatically scale workers to meet
demand.
If TRUE
, then collect()
runs first
so demand can be properly assessed before scaling and the number
of workers is not too high.
throttle
If scale
is TRUE
, whether to defer auto-scaling
until the next auto-scaling request at least
self$client$seconds_interval
seconds from the original request.
The idea is similar to shiny::throttle()
except that crew
does not
accumulate a backlog of requests. The technique improves robustness
and efficiency.
name
Optional name of the task. Replaced with a random name
if NULL
or in conflict with an existing name in the task list.
save_command
Logical of length 1. If TRUE
, the controller
deparses the command and returns it with the output on pop()
.
If FALSE
(default), the controller skips this step to
increase speed.
controller
Character of length 1,
name of the controller to submit the task.
If NULL
, the controller defaults to the
first controller in the list.
NULL
(invisibly).
map()
Apply a single command to multiple inputs.
crew_class_controller_group$map(
command,
iterate,
data = list(),
globals = list(),
substitute = TRUE,
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
seconds_interval = NULL,
seconds_timeout = NULL,
names = NULL,
save_command = FALSE,
error = "stop",
verbose = interactive(),
controller = NULL
)
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over.
For example, to run function calls
f(x = 1, y = "a")
and f(x = 2, y = "b")
,
set command
to f(x, y)
, and set iterate
to
list(x = c(1, 2), y = c("a", "b"))
. The individual
function calls are evaluated as
f(x = iterate$x[[1]], y = iterate$y[[1]])
and
f(x = iterate$x[[2]], y = iterate$y[[2]])
.
All the elements of iterate
must have the same length.
If there are any name conflicts between iterate
and data
,
iterate
takes precedence.
data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily
assign to the global environment for each task. This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals
argument of crew_controller_local()
.
Objects in this list are treated as single
values and are held constant for each iteration of the map.
substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of the
command
argument. If TRUE
(default) then command
is quoted
literally as you write it, e.g.
push(command = your_function_call())
. If FALSE
, then crew
assumes command
is a language object and you are passing its
value, e.g. push(command = quote(your_function_call()))
.
substitute = TRUE
is appropriate for interactive use,
whereas substitute = FALSE
is meant for automated R programs
that invoke crew
controllers.
seed
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed
argument of set.seed()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
algorithm
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind
argument of RNGkind()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the lib.loc
argument of require()
.
seconds_interval
Number of seconds to wait between intervals
polling the tasks for completion. Defaults to the seconds_interval
field of the client object.
seconds_timeout
Optional task timeout passed to the .timeout
argument of mirai::mirai()
(after converting to milliseconds).
names
Optional character of length 1, name of the element of
iterate
with names for the tasks. If names
is supplied,
then iterate[[names]]
must be a character vector.
save_command
Logical of length 1, whether to store a text string version of the R command in the output.
error
Character vector of length 1, choice of action if a task has an error. Possible values:
"stop"
: throw an error in the main R session instead of returning
a value. In case of an error, the results from the last errored
map()
are in the error
field
of the controller, e.g. controller_object$error
. To reduce
memory consumption, set controller_object$error <- NULL
after
you are finished troubleshooting.
"warn"
: throw a warning. This allows the return value with
all the error messages and tracebacks to be generated.
"silent"
: do nothing special.
verbose
Logical of length 1, whether to print progress messages.
controller
Character of length 1,
name of the controller to submit the task.
If NULL
, the controller defaults to the
first controller in the list.
The idea comes from functional programming: for example,
the map()
function from the purrr
package.
A tibble
of results and metadata: one row per task and
columns corresponding to the output of pop()
.
collect()
Check for done tasks and move the results to the results list.
crew_class_controller_group$collect(throttle = FALSE, controllers = NULL)
throttle
whether to defer task collection
until the next task collection request at least
self$client$seconds_interval
seconds from the original request.
The idea is similar to shiny::throttle()
except that crew
does not
accumulate a backlog of requests. The technique improves robustness
and efficiency.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
NULL
(invisibly). Removes elements from the queue
list as applicable and moves them to the results
list.
pop()
Pop a completed task from the results data frame.
crew_class_controller_group$pop(
scale = TRUE,
collect = TRUE,
throttle = TRUE,
controllers = NULL
)
scale
Logical, whether to automatically scale workers to meet
demand.
If TRUE
, then collect()
runs first
so demand can be properly assessed before scaling and the number
of workers is not too high. Scaling up on pop()
may be important
for transient or nearly transient workers that tend to drop off
quickly after doing little work.
collect
Logical of length 1. If scale
is FALSE
,
whether to call collect()
to pick up the results of completed tasks. This task collection
step always happens (with throttling) when scale
is TRUE
.
throttle
If scale
is TRUE
, whether to defer auto-scaling
until the next auto-scaling request at least
self$client$seconds_interval
seconds from the original request.
The idea is similar to shiny::throttle()
except that crew
does not
accumulate a backlog of requests. The technique improves robustness
and efficiency.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
If there is no task to collect, return NULL
. Otherwise,
return a one-row tibble
with the same columns as pop()
for ordinary controllers.
wait()
Wait for tasks.
crew_class_controller_group$wait(
mode = "all",
seconds_interval = 0.01,
seconds_timeout = Inf,
scale = TRUE,
throttle = TRUE,
controllers = NULL
)
mode
If mode
is "all"
,
then the method waits for all tasks to complete. If mode
is
"one"
, then it waits until a one task is complete.
seconds_interval
Number of seconds to wait between
polling intervals while checking for results.
Defaults to a fixed value because the internal seconds_interval
field may be different from controller to controller.
seconds_timeout
Timeout length in seconds waiting for results to become available.
scale
Logical of length 1, whether to call scale_later()
on each selected controller to schedule auto-scaling.
By design, auto-scaling might not actually happen on
every iteration if throttle
is TRUE
.
throttle
If scale
is TRUE
, whether to defer auto-scaling
and task collection until the next request at least
self$client$seconds_interval
seconds from the original request.
The idea is similar to shiny::throttle()
except that crew
does not
accumulate a backlog of requests. The technique improves robustness
and efficiency. Highly recommended to keep
the default throttle = TRUE
for wait()
.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
The wait()
method blocks the calling R session,
repeatedly auto-scales workers for tasks
that need them, and repeatedly collects results.
The function runs until it either times out or one of the
controllers reaches the stopping condition
based on the mode
argument.
NULL
(invisibly). Call pop()
to get the result.
summary()
Summarize the workers of one or more controllers.
crew_class_controller_group$summary(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
A data frame of aggregated worker summary statistics
of all the selected controllers. It has one row per worker,
and the rows are grouped by controller.
See the documentation of the summary()
method of the controller
class for specific information about the columns in the output.
terminate()
Terminate the workers and disconnect the client for one or more controllers.
crew_class_controller_group$terminate(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
NULL
(invisibly).
See crew_controller_group()
.
Other class:
crew_class_client
,
crew_class_controller
,
crew_class_launcher
,
crew_class_schedule
,
crew_class_tls
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
## ------------------------------------------------
## Method `crew_class_controller_group$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
Run the code above in your browser using DataLab