R6 class for controllers.
clientRouter object.
launcherLauncher object.
tasksA list of mirai::mirai() task objects.
pushedNumber of tasks pushed since the controller was started.
poppedNumber of tasks popped since the controller was started.
logTibble with per-worker metadata about tasks.
errorTibble of task results (with one result per row)
from the last call to map(error = "stop).
new()mirai controller constructor.
crew_class_controller$new(client = NULL, launcher = NULL)clientRouter object. See crew_controller().
launcherLauncher object. See crew_controller().
An R6 controller object.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
validate()Validate the client.
crew_class_controller$validate()NULL (invisibly).
empty()Check if the controller is empty.
crew_class_controller$empty(controllers = NULL)controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push().
TRUE if the controller is empty, FALSE otherwise.
nonempty()Check if the controller is nonempty.
crew_class_controller$nonempty(controllers = NULL)controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push().
TRUE if the controller is empty, FALSE otherwise.
resolved()Number of resolved mirai() tasks.
crew_class_controller$resolved()resolved() is cumulative: it counts all the resolved
tasks over the entire lifetime of the controller session.
Non-negative integer of length 1,
number of resolved mirai() tasks.
The return value is 0 if the condition variable does not exist
(i.e. if the client is not running).
unresolved()Number of unresolved mirai() tasks.
crew_class_controller$unresolved()Non-negative integer of length 1,
number of unresolved mirai() tasks.
unpopped()Number of resolved mirai() tasks available via pop().
crew_class_controller$unpopped()Non-negative integer of length 1,
number of resolved mirai() tasks available via pop().
saturated()Check if the controller is saturated.
crew_class_controller$saturated(
collect = NULL,
throttle = NULL,
controller = NULL
)collectDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttleDeprecated in version 0.5.0.9003 (2023-10-02). Not used.
controllerNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
A controller is saturated if the number of unresolved tasks
is greater than or equal to the maximum number of workers.
In other words, in a saturated controller, every available worker
has a task.
You can still push tasks to a saturated controller, but
tools that use crew such as targets may choose not to.
TRUE if the controller is saturated, FALSE otherwise.
start()Start the controller if it is not already started.
crew_class_controller$start(controllers = NULL)controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Register the mirai client and register worker websockets with the launcher.
NULL (invisibly).
launch()Launch one or more workers.
crew_class_controller$launch(n = 1L, controllers = NULL)nNumber of workers to try to launch. The actual
number launched is capped so that no more than "workers"
workers running at a given time, where "workers"
is an argument of crew_controller(). The
actual cap is the "workers" argument minus the number of connected
workers minus the number of starting workers. A "connected"
worker has an active websocket connection to the mirai client,
and "starting" means that the worker was launched at most
seconds_start seconds ago, where seconds_start is
also an argument of crew_controller().
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL (invisibly).
scale()Auto-scale workers out to meet the demand of tasks.
crew_class_controller$scale(throttle = TRUE, controllers = NULL)throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
Methods push(), pop(), and wait() already invoke
scale() if the scale argument is TRUE.
For finer control of the number of workers launched,
call launch() on the controller with the exact desired
number of workers.
NULL (invisibly).
push()Push a task to the head of the task list.
crew_class_controller$push(
command,
data = list(),
globals = list(),
substitute = TRUE,
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
seconds_timeout = NULL,
scale = TRUE,
throttle = TRUE,
name = NA_character_,
save_command = FALSE,
controller = NULL
)commandLanguage object with R code to run.
dataNamed list of local data objects in the evaluation environment.
globalsNamed list of objects to temporarily assign to the
global environment for the task.
This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals argument
of crew_controller_local().
substituteLogical of length 1, whether to call
base::substitute() on the supplied value of the
command argument. If TRUE (default) then command is quoted
literally as you write it, e.g.
push(command = your_function_call()). If FALSE, then crew
assumes command is a language object and you are passing its
value, e.g. push(command = quote(your_function_call())).
substitute = TRUE is appropriate for interactive use,
whereas substitute = FALSE is meant for automated R programs
that invoke crew controllers.
seedInteger of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed argument of set.seed() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
algorithmInteger of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind argument of RNGkind() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the lib.loc
argument of require().
seconds_timeoutOptional task timeout passed to the .timeout
argument of mirai::mirai() (after converting to milliseconds).
scaleLogical, whether to automatically call scale()
to auto-scale workers to meet the demand of the task load. Also
see the throttle argument.
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
nameOptional name of the task.
save_commandLogical of length 1. If TRUE, the controller
deparses the command and returns it with the output on pop().
If FALSE (default), the controller skips this step to
increase speed.
controllerNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL (invisibly).
map()Apply a single command to multiple inputs.
crew_class_controller$map(
command,
iterate,
data = list(),
globals = list(),
substitute = TRUE,
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
seconds_interval = 0.5,
seconds_timeout = NULL,
names = NULL,
save_command = FALSE,
error = "stop",
warnings = TRUE,
verbose = interactive(),
scale = TRUE,
throttle = TRUE,
controller = NULL
)commandLanguage object with R code to run.
iterateNamed list of vectors or lists to iterate over.
For example, to run function calls
f(x = 1, y = "a") and f(x = 2, y = "b"),
set command to f(x, y), and set iterate to
list(x = c(1, 2), y = c("a", "b")). The individual
function calls are evaluated as
f(x = iterate$x[[1]], y = iterate$y[[1]]) and
f(x = iterate$x[[2]], y = iterate$y[[2]]).
All the elements of iterate must have the same length.
If there are any name conflicts between iterate and data,
iterate takes precedence.
dataNamed list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globalsNamed list of constant objects to temporarily
assign to the global environment for each task. This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals argument of crew_controller_local().
Objects in this list are treated as single
values and are held constant for each iteration of the map.
substituteLogical of length 1, whether to call
base::substitute() on the supplied value of the
command argument. If TRUE (default) then command is quoted
literally as you write it, e.g.
push(command = your_function_call()). If FALSE, then crew
assumes command is a language object and you are passing its
value, e.g. push(command = quote(your_function_call())).
substitute = TRUE is appropriate for interactive use,
whereas substitute = FALSE is meant for automated R programs
that invoke crew controllers.
seedInteger of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed argument of set.seed() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
algorithmInteger of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind argument of RNGkind() if not NULL.
If algorithm and seed are both NULL,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream().
See vignette("parallel", package = "parallel") for details.
packagesCharacter vector of packages to load for the task.
libraryLibrary path to load the packages. See the lib.loc
argument of require().
seconds_intervalNumber of seconds to wait between intervals polling the tasks for completion.
seconds_timeoutOptional task timeout passed to the .timeout
argument of mirai::mirai() (after converting to milliseconds).
namesOptional character of length 1, name of the element of
iterate with names for the tasks. If names is supplied,
then iterate[[names]] must be a character vector.
save_commandLogical of length 1, whether to store a text string version of the R command in the output.
errorCharacter vector of length 1, choice of action if a task has an error. Possible values:
"stop": throw an error in the main R session instead of returning
a value. In case of an error, the results from the last errored
map() are in the error field
of the controller, e.g. controller_object$error. To reduce
memory consumption, set controller_object$error <- NULL after
you are finished troubleshooting.
"warn": throw a warning. This allows the return value with
all the error messages and tracebacks to be generated.
"silent": do nothing special.
warningsLogical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verboseLogical of length 1, whether to print progress messages.
scaleLogical, whether to automatically scale workers to meet
demand. See also the throttle argument.
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
controllerNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
map() cannot be used unless all prior tasks are
completed and popped. You may need to wait and then pop them
manually. Alternatively, you can start over: either call
terminate() on the current controller object to reset it, or
create a new controller object entirely.
A tibble of results and metadata: one row per task
and columns corresponding to the output of pop().
collect()Deprecated in version 0.5.0.9003 (2023-10-02).
crew_class_controller$collect(throttle = NULL, controllers = NULL)throttleDeprecated in version 0.5.0.9003 (2023-10-02).
controllersDeprecated in version 0.5.0.9003 (2023-10-02).
NULL.
pop()Pop a completed task from the results data frame.
crew_class_controller$pop(
scale = TRUE,
collect = NULL,
throttle = TRUE,
controllers = NULL
)scaleLogical of length 1,
whether to automatically call scale()
to auto-scale workers to meet the demand of the task load.
Scaling up on pop() may be important
for transient or nearly transient workers that tend to drop off
quickly after doing little work.
See also the throttle argument.
collectDeprecated in version 0.5.0.9003 (2023-10-02).
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
If not task is currently completed, pop()
will attempt to auto-scale workers as needed.
If there is no task to collect, return NULL. Otherwise,
return a one-row tibble with the following columns.
name: the task name if given.
command: a character string with the R command if save_command
was set to TRUE in push().
result: a list containing the return value of the R command.
seconds: number of seconds that the task ran.
seed: the single integer originally supplied to push(),
NA otherwise. The pseudo-random number generator state
just prior to the task can be restored using
set.seed(seed = seed, kind = algorithm), where seed and
algorithm are part of this output.
algorithm: name of the pseudo-random number generator algorithm
originally supplied to push(),
NA otherwise. The pseudo-random number generator state
just prior to the task can be restored using
set.seed(seed = seed, kind = algorithm), where seed and
algorithm are part of this output.
error: the first 2048 characters of the error message if
the task threw an error, NA otherwise.
trace: the first 2048 characters of the text of the traceback
if the task threw an error, NA otherwise.
warnings: the first 2048 characters. of the text of
warning messages that the task may have generated, NA otherwise.
launcher: name of the crew launcher where the task ran.
wait()Wait for tasks.
crew_class_controller$wait(
mode = "all",
seconds_interval = 0.5,
seconds_timeout = Inf,
scale = TRUE,
throttle = TRUE,
controllers = NULL
)modeCharacter of length 1: "all" to wait for all tasks to
complete, "one" to wait for a single task to complete.
seconds_intervalNumber of seconds to interrupt the wait in order to scale up workers as needed.
seconds_timeoutTimeout length in seconds waiting for tasks.
scaleLogical, whether to automatically call scale()
to auto-scale workers to meet the demand of the task load.
See also the throttle argument.
throttleTRUE to skip auto-scaling if it already happened
within the last seconds_interval seconds. FALSE to auto-scale
every time scale() is called. Throttling avoids
overburdening the mirai dispatcher and other resources.
controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
The wait() method blocks the calling R session and
repeatedly auto-scales workers for tasks that need them.
The function runs until it either times out or the condition
in mode is met.
A logical of length 1, invisibly. TRUE if the condition
in mode was met, FALSE otherwise.
summary()Summarize the workers and tasks of the controller.
crew_class_controller$summary(controllers = NULL)controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
A data frame of summary statistics on the workers and tasks. It has one row per worker websocket and the following columns:
controller: name of the controller.
. * worker: integer index of the worker.
tasks: number of tasks which were completed by
a worker at the websocket and then returned by calling
pop() on the controller object.
seconds: total number of runtime and seconds of
all the tasks that ran on a worker connected to this websocket
and then were retrieved by calling pop() on the controller
object.
errors: total number of tasks which ran on a worker
at the website, encountered an error in R, and then retrieved
with pop().
warnings: total number of tasks which ran on a worker
at the website, encountered one or more warnings in R,
and then retrieved with pop(). Note: warnings
is actually the number of tasks, not the number of warnings.
(A task could throw more than one warning.
terminate()Terminate the workers and the mirai client.
crew_class_controller$terminate(controllers = NULL)controllersNot used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL (invisibly).
See crew_controller().
Other controller:
crew_controller()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
## ------------------------------------------------
## Method `crew_class_controller$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
Run the code above in your browser using DataLab