Learn R Programming

crew (version 0.8.0)

crew_class_controller_group: Controller group class

Description

R6 class for controller groups.

Arguments

Active bindings

controllers

List of R6 controller objects.

relay

Relay object for event-driven programming on a downstream condition variable.

Methods


Method new()

Multi-controller constructor.

Usage

crew_class_controller_group$new(controllers = NULL, relay = NULL)

Arguments

controllers

List of R6 controller objects.

relay

Relay object for event-driven programming on a downstream condition variable.

Returns

An R6 object with the controller group object.

Examples

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
  name = "transient",
  tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}


Method validate()

Validate the client.

Usage

crew_class_controller_group$validate()

Returns

NULL (invisibly).


Method empty()

See if the controllers are empty.

Usage

crew_class_controller_group$empty(controllers = NULL)

Arguments

controllers

Character vector of controller names. Set to NULL to select all controllers.

Details

A controller is empty if it has no running tasks or completed tasks waiting to be retrieved with push().

Returns

TRUE if all the selected controllers are empty, FALSE otherwise.


Method saturated()

Check if a controller is saturated.

Usage

crew_class_controller_group$saturated(
  collect = NULL,
  throttle = NULL,
  controller = NULL
)

Arguments

collect

Deprecated in version 0.5.0.9003 (2023-10-02). Not used.

throttle

Deprecated in version 0.5.0.9003 (2023-10-02). Not used.

controller

Character vector of length 1 with the controller name. Set to NULL to select the default controller that push() would choose.

Details

A controller is saturated if the number of unresolved tasks is greater than or equal to the maximum number of workers. In other words, in a saturated controller, every available worker has a task. You can still push tasks to a saturated controller, but tools that use crew such as targets may choose not to.

Returns

TRUE if all the selected controllers are saturated, FALSE otherwise.


Method start()

Start one or more controllers.

Usage

crew_class_controller_group$start(controllers = NULL)

Arguments

controllers

Character vector of controller names. Set to NULL to select all controllers.

Returns

NULL (invisibly).


Method launch()

Launch one or more workers on one or more controllers.

Usage

crew_class_controller_group$launch(n = 1L, controllers = NULL)

Arguments

n

Number of workers to launch in each controller selected.

controllers

Character vector of controller names. Set to NULL to select all controllers.

Returns

NULL (invisibly).


Method scale()

Automatically scale up the number of workers if needed in one or more controller objects.

Usage

crew_class_controller_group$scale(throttle = TRUE, controllers = NULL)

Arguments

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

controllers

Character vector of controller names. Set to NULL to select all controllers.

Details

See the scale() method in individual controller classes.

Returns

NULL (invisibly).


Method push()

Push a task to the head of the task list.

Usage

crew_class_controller_group$push(
  command,
  data = list(),
  globals = list(),
  substitute = TRUE,
  seed = NULL,
  algorithm = NULL,
  packages = character(0),
  library = NULL,
  seconds_timeout = NULL,
  scale = TRUE,
  throttle = TRUE,
  name = NULL,
  save_command = FALSE,
  controller = NULL
)

Arguments

command

Language object with R code to run.

data

Named list of local data objects in the evaluation environment.

globals

Named list of objects to temporarily assign to the global environment for the task. See the reset_globals argument of crew_controller_local().

substitute

Logical of length 1, whether to call base::substitute() on the supplied value of the command argument. If TRUE (default) then command is quoted literally as you write it, e.g. push(command = your_function_call()). If FALSE, then crew assumes command is a language object and you are passing its value, e.g. push(command = quote(your_function_call())). substitute = TRUE is appropriate for interactive use, whereas substitute = FALSE is meant for automated R programs that invoke crew controllers.

seed

Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the seed argument of set.seed() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

algorithm

Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the kind argument of RNGkind() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

packages

Character vector of packages to load for the task.

library

Library path to load the packages. See the lib.loc argument of require().

seconds_timeout

Optional task timeout passed to the .timeout argument of mirai::mirai() (after converting to milliseconds).

scale

Logical, whether to automatically scale workers to meet demand. See the scale argument of the push() method of ordinary single controllers.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

name

Optional name of the task. Replaced with a random name if NULL or in conflict with an existing name in the task list.

save_command

Logical of length 1. If TRUE, the controller deparses the command and returns it with the output on pop(). If FALSE (default), the controller skips this step to increase speed.

controller

Character of length 1, name of the controller to submit the task. If NULL, the controller defaults to the first controller in the list.

Returns

NULL (invisibly).


Method map()

Apply a single command to multiple inputs.

Usage

crew_class_controller_group$map(
  command,
  iterate,
  data = list(),
  globals = list(),
  substitute = TRUE,
  seed = NULL,
  algorithm = NULL,
  packages = character(0),
  library = NULL,
  seconds_interval = 0.5,
  seconds_timeout = NULL,
  names = NULL,
  save_command = FALSE,
  error = "stop",
  warnings = TRUE,
  verbose = interactive(),
  scale = TRUE,
  throttle = TRUE,
  controller = NULL
)

Arguments

command

Language object with R code to run.

iterate

Named list of vectors or lists to iterate over. For example, to run function calls f(x = 1, y = "a") and f(x = 2, y = "b"), set command to f(x, y), and set iterate to list(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated as f(x = iterate$x[[1]], y = iterate$y[[1]]) and f(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements of iterate must have the same length. If there are any name conflicts between iterate and data, iterate takes precedence.

data

Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.

globals

Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the reset_globals argument of crew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.

substitute

Logical of length 1, whether to call base::substitute() on the supplied value of the command argument. If TRUE (default) then command is quoted literally as you write it, e.g. push(command = your_function_call()). If FALSE, then crew assumes command is a language object and you are passing its value, e.g. push(command = quote(your_function_call())). substitute = TRUE is appropriate for interactive use, whereas substitute = FALSE is meant for automated R programs that invoke crew controllers.

seed

Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the seed argument of set.seed() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

algorithm

Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the kind argument of RNGkind() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

packages

Character vector of packages to load for the task.

library

Library path to load the packages. See the lib.loc argument of require().

seconds_interval

Number of seconds to wait between intervals polling the tasks for completion.

seconds_timeout

Optional task timeout passed to the .timeout argument of mirai::mirai() (after converting to milliseconds).

names

Optional character of length 1, name of the element of iterate with names for the tasks. If names is supplied, then iterate[[names]] must be a character vector.

save_command

Logical of length 1, whether to store a text string version of the R command in the output.

error

Character vector of length 1, choice of action if a task has an error. Possible values:

  • "stop": throw an error in the main R session instead of returning a value. In case of an error, the results from the last errored map() are in the error field of the controller, e.g. controller_object$error. To reduce memory consumption, set controller_object$error <- NULL after you are finished troubleshooting.

  • "warn": throw a warning. This allows the return value with all the error messages and tracebacks to be generated.

  • "silent": do nothing special.

warnings

Logical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.

verbose

Logical of length 1, whether to print progress messages.

scale

Logical, whether to automatically scale workers to meet demand. See also the throttle argument.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

controller

Character of length 1, name of the controller to submit the task. If NULL, the controller defaults to the first controller in the list.

Details

The idea comes from functional programming: for example, the map() function from the purrr package.

Returns

A tibble of results and metadata: one row per task and columns corresponding to the output of pop().


Method collect()

Deprecated in version 0.5.0.9003 (2023-10-02).

Usage

crew_class_controller_group$collect(throttle = NULL, controllers = NULL)

Arguments

throttle

Deprecated in version 0.5.0.9003 (2023-10-02).

controllers

Deprecated in version 0.5.0.9003 (2023-10-02).

Returns

NULL.


Method pop()

Pop a completed task from the results data frame.

Usage

crew_class_controller_group$pop(
  scale = TRUE,
  collect = NULL,
  throttle = TRUE,
  controllers = NULL
)

Arguments

scale

Logical, whether to automatically scale workers to meet demand. See the scale argument of the pop() method of ordinary single controllers.

collect

Deprecated in version 0.5.0.9003 (2023-10-02). Not used.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

controllers

Character vector of controller names. Set to NULL to select all controllers.

Returns

If there is no task to collect, return NULL. Otherwise, return a one-row tibble with the same columns as pop() for ordinary controllers.


Method wait()

Wait for tasks.

Usage

crew_class_controller_group$wait(
  mode = "all",
  seconds_interval = 0.5,
  seconds_timeout = Inf,
  scale = TRUE,
  throttle = TRUE,
  controllers = NULL
)

Arguments

mode

Character of length 1: "all" to wait for all tasks in all controllers to complete, "one" to wait for a single task in a single controller to complete. In this scheme, the timeout limit is applied to each controller sequentially, and a timeout is treated the same as a completed controller.

seconds_interval

Number of seconds to interrupt the wait in order to scale up workers as needed.

seconds_timeout

Timeout length in seconds waiting for results to become available.

scale

Logical of length 1, whether to call scale_later() on each selected controller to schedule auto-scaling. See the scale argument of the wait() method of ordinary single controllers.

throttle

TRUE to skip auto-scaling if it already happened within the last seconds_interval seconds. FALSE to auto-scale every time scale() is called. Throttling avoids overburdening the mirai dispatcher and other resources.

controllers

Character vector of controller names. Set to NULL to select all controllers.

Details

The wait() method blocks the calling R session and repeatedly auto-scales workers for tasks that need them. The function runs until it either times out or the condition in mode is met.

Returns

A logical of length 1, invisibly. TRUE if the condition in mode was met, FALSE otherwise.


Method summary()

Summarize the workers of one or more controllers.

Usage

crew_class_controller_group$summary(controllers = NULL)

Arguments

controllers

Character vector of controller names. Set to NULL to select all controllers.

Returns

A data frame of aggregated worker summary statistics of all the selected controllers. It has one row per worker, and the rows are grouped by controller. See the documentation of the summary() method of the controller class for specific information about the columns in the output.


Method terminate()

Terminate the workers and disconnect the client for one or more controllers.

Usage

crew_class_controller_group$terminate(controllers = NULL)

Arguments

controllers

Character vector of controller names. Set to NULL to select all controllers.

Returns

NULL (invisibly).

Details

See crew_controller_group().

See Also

Other controller_group: crew_controller_group()

Examples

Run this code
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
  name = "transient",
  tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}

## ------------------------------------------------
## Method `crew_class_controller_group$new`
## ------------------------------------------------

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
  name = "transient",
  tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}

Run the code above in your browser using DataLab