Learn R Programming

crew (version 0.5.0)

crew_class_controller: Controller class

Description

R6 class for controllers.

Arguments

Public fields

client

Router object.

launcher

Launcher object.

schedule

Schedule object.

log

Tibble with per-worker metadata about tasks.

until

Numeric of length 1, time point when throttled auto-scaling unlocks.

error

Tibble of monads from the last call to map(error = "stop).

Methods


Method new()

mirai controller constructor.

Usage

crew_class_controller$new(client = NULL, launcher = NULL, schedule = NULL)

Arguments

client

Router object. See crew_controller().

launcher

Launcher object. See crew_controller().

schedule

Schedule object from crew_schedule().

Returns

An R6 controller object.

Examples

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}


Method validate()

Validate the client.

Usage

crew_class_controller$validate()

Returns

NULL (invisibly).


Method empty()

Check if the controller is empty.

Usage

crew_class_controller$empty(controllers = NULL)

Arguments

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

A controller is empty if it has no running tasks or completed tasks waiting to be retrieved with push().

Returns

TRUE if the controller is empty, FALSE otherwise.


Method nonempty()

Check if the controller is nonempty.

Usage

crew_class_controller$nonempty(controllers = NULL)

Arguments

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

A controller is empty if it has no running tasks or completed tasks waiting to be retrieved with push().

Returns

TRUE if the controller is empty, FALSE otherwise.


Method saturated()

Check if the controller is saturated.

Usage

crew_class_controller$saturated(
  collect = TRUE,
  throttle = TRUE,
  controller = NULL
)

Arguments

collect

Logical of length 1, whether to collect the results of any newly resolved tasks before determining saturation.

throttle

Logical of length 1, whether to delay task collection until the next request at least self$client$seconds_interval seconds from the original request. The idea is similar to shiny::throttle() except that crew does not accumulate a backlog of requests. The technique improves robustness and efficiency.

controller

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

A controller is saturated if the number of unresolved tasks is greater than or equal to the maximum number of workers. In other words, in a saturated controller, every available worker has a task. You can still push tasks to a saturated controller, but tools that use crew such as targets may choose not to.

Returns

TRUE if the controller is saturated, FALSE otherwise.


Method start()

Start the controller if it is not already started.

Usage

crew_class_controller$start(controllers = NULL)

Arguments

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

Register the mirai client and register worker websockets with the launcher.

Returns

NULL (invisibly).


Method launch()

Launch one or more workers.

Usage

crew_class_controller$launch(n = 1L, controllers = NULL)

Arguments

n

Number of workers to try to launch. The actual number launched is capped so that no more than "workers" workers running at a given time, where "workers" is an argument of crew_controller(). The actual cap is the "workers" argument minus the number of connected workers minus the number of starting workers. A "connected" worker has an active websocket connection to the mirai client, and "starting" means that the worker was launched at most seconds_start seconds ago, where seconds_start is also an argument of crew_controller().

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

NULL (invisibly).


Method scale()

Auto-scale workers out to meet the demand of tasks.

Usage

crew_class_controller$scale(throttle = FALSE, controllers = NULL)

Arguments

throttle

Logical of length 1, whether to delay auto-scaling until the next auto-scaling request at least self$client$seconds_interval seconds from the original request. The idea is similar to shiny::throttle() except that crew does not accumulate a backlog of requests. The technique improves robustness and efficiency.

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

Methods push(), pop(), and wait() already invoke scale() if the scale argument is TRUE. For finer control of the number of workers launched, call launch() on the controller with the exact desired number of workers.

Returns

NULL (invisibly).


Method push()

Push a task to the head of the task list.

Usage

crew_class_controller$push(
  command,
  data = list(),
  globals = list(),
  substitute = TRUE,
  seed = NULL,
  algorithm = NULL,
  packages = character(0),
  library = NULL,
  seconds_timeout = NULL,
  scale = TRUE,
  throttle = TRUE,
  name = NA_character_,
  save_command = FALSE,
  controller = NULL
)

Arguments

command

Language object with R code to run.

data

Named list of local data objects in the evaluation environment.

globals

Named list of objects to temporarily assign to the global environment for the task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the reset_globals argument of crew_controller_local().

substitute

Logical of length 1, whether to call base::substitute() on the supplied value of the command argument. If TRUE (default) then command is quoted literally as you write it, e.g. push(command = your_function_call()). If FALSE, then crew assumes command is a language object and you are passing its value, e.g. push(command = quote(your_function_call())). substitute = TRUE is appropriate for interactive use, whereas substitute = FALSE is meant for automated R programs that invoke crew controllers.

seed

Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the seed argument of set.seed() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

algorithm

Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the kind argument of RNGkind() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

packages

Character vector of packages to load for the task.

library

Library path to load the packages. See the lib.loc argument of require().

seconds_timeout

Optional task timeout passed to the .timeout argument of mirai::mirai() (after converting to milliseconds).

scale

Logical, whether to automatically call scale() to auto-scale workers to meet the demand of the task load. By design, auto-scaling might not actually happen if throttle = TRUE.

throttle

If scale is TRUE, whether to defer auto-scaling until the next request at least self$client$seconds_interval seconds from the original request. The idea is similar to shiny::throttle() except that crew does not accumulate a backlog of requests. The technique improves robustness and efficiency.

name

Optional name of the task.

save_command

Logical of length 1. If TRUE, the controller deparses the command and returns it with the output on pop(). If FALSE (default), the controller skips this step to increase speed.

controller

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

NULL (invisibly).


Method shove()

Quickly push a task to the head of the task list.

Usage

crew_class_controller$shove(
  command,
  data = list(),
  globals = list(),
  seed = NULL,
  algorithm = NULL,
  packages = character(0),
  library = NULL,
  .timeout = NULL,
  name = NA_character_,
  string = NA_character_
)

Arguments

command

Language object with R code to run.

data

Named list of local data objects in the evaluation environment.

globals

Named list of objects to temporarily assign to the global environment for the task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the reset_globals argument of crew_controller_local().

seed

Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the seed argument of set.seed() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

algorithm

Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the kind argument of RNGkind() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

packages

Character vector of packages to load for the task.

library

Library path to load the packages. See the lib.loc argument of require().

.timeout

Optional task timeout passed to the .timeout argument of mirai::mirai() (after converting to milliseconds).

name

Optional name of the task.

string

Optional character string with the deparsed command.

scale

Logical, whether to automatically call scale() to auto-scale workers to meet the demand of the task load. By design, auto-scaling might not actually happen if throttle = TRUE.

throttle

If scale is TRUE, whether to defer auto-scaling until the next request at least self$client$seconds_interval seconds from the original request. The idea is similar to shiny::throttle() except that crew does not accumulate a backlog of requests. The technique improves robustness and efficiency.

Details

Exists to support map(). For developers only and not supported for controller groups. Relative to push(), shove() skips user options and guardrails for to aggressively optimize performance.

Returns

NULL (invisibly).


Method map()

Apply a single command to multiple inputs.

Usage

crew_class_controller$map(
  command,
  iterate,
  data = list(),
  globals = list(),
  substitute = TRUE,
  seed = NULL,
  algorithm = NULL,
  packages = character(0),
  library = NULL,
  seconds_interval = NULL,
  seconds_timeout = NULL,
  names = NULL,
  save_command = FALSE,
  error = "stop",
  verbose = interactive(),
  controller = NULL
)

Arguments

command

Language object with R code to run.

iterate

Named list of vectors or lists to iterate over. For example, to run function calls f(x = 1, y = "a") and f(x = 2, y = "b"), set command to f(x, y), and set iterate to list(x = c(1, 2), y = c("a", "b")). The individual function calls are evaluated as f(x = iterate$x[[1]], y = iterate$y[[1]]) and f(x = iterate$x[[2]], y = iterate$y[[2]]). All the elements of iterate must have the same length. If there are any name conflicts between iterate and data, iterate takes precedence.

data

Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.

globals

Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the reset_globals argument of crew_controller_local(). Objects in this list are treated as single values and are held constant for each iteration of the map.

substitute

Logical of length 1, whether to call base::substitute() on the supplied value of the command argument. If TRUE (default) then command is quoted literally as you write it, e.g. push(command = your_function_call()). If FALSE, then crew assumes command is a language object and you are passing its value, e.g. push(command = quote(your_function_call())). substitute = TRUE is appropriate for interactive use, whereas substitute = FALSE is meant for automated R programs that invoke crew controllers.

seed

Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the seed argument of set.seed() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

algorithm

Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the kind argument of RNGkind() if not NULL. If algorithm and seed are both NULL, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported by mirai::nextstream(). See vignette("parallel", package = "parallel") for details.

packages

Character vector of packages to load for the task.

library

Library path to load the packages. See the lib.loc argument of require().

seconds_interval

Number of seconds to wait between intervals polling the tasks for completion. Defaults to the seconds_interval field in the client object

seconds_timeout

Optional task timeout passed to the .timeout argument of mirai::mirai() (after converting to milliseconds).

names

Optional character of length 1, name of the element of iterate with names for the tasks. If names is supplied, then iterate[[names]] must be a character vector.

save_command

Logical of length 1, whether to store a text string version of the R command in the output.

error

Character vector of length 1, choice of action if a task has an error. Possible values:

  • "stop": throw an error in the main R session instead of returning a value. In case of an error, the results from the last errored map() are in the error field of the controller, e.g. controller_object$error. To reduce memory consumption, set controller_object$error <- NULL after you are finished troubleshooting.

  • "warn": throw a warning. This allows the return value with all the error messages and tracebacks to be generated.

  • "silent": do nothing special.

verbose

Logical of length 1, whether to print progress messages.

controller

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

The idea comes from functional programming: for example, the map() function from the purrr package.

Returns

A tibble of results and metadata: one row per task and columns corresponding to the output of pop().


Method collect()

Check for done tasks and move the results to the results list.

Usage

crew_class_controller$collect(throttle = FALSE, controllers = NULL)

Arguments

throttle

whether to defer task collection until the next task collection request at least seconds_interval seconds from the original request. The idea is similar to shiny::throttle() except that crew does not accumulate a backlog of requests. The technique improves robustness and efficiency.

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

NULL (invisibly). Removes elements from the queue list as applicable and moves them to the results list.


Method pop()

Pop a completed task from the results data frame.

Usage

crew_class_controller$pop(
  scale = TRUE,
  collect = TRUE,
  throttle = TRUE,
  controllers = NULL
)

Arguments

scale

Logical of length 1, whether to automatically call scale() to auto-scale workers to meet the demand of the task load. Auto-scaling might not actually happen if throttle is TRUE. Scaling up on pop() may be important for transient or nearly transient workers that tend to drop off quickly after doing little work.

collect

Logical of length 1, whether to collect the results of completed tasks.

throttle

Whether to defer auto-scaling and task collection until the next request at least self$client$seconds_interval seconds from the original request. The idea is similar to shiny::throttle() except that crew does not accumulate a backlog of requests. The technique improves robustness and efficiency.

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

If not task is currently completed and collected, pop() will attempt to auto-scale workers as needed and collect any newly completed results.

Returns

If there is no task to collect, return NULL. Otherwise, return a one-row tibble with the following columns.

  • name: the task name if given.

  • command: a character string with the R command if save_command was set to TRUE in push().

  • result: a list containing the return value of the R command.

  • seconds: number of seconds that the task ran.

  • seed: the single integer originally supplied to push(), NA otherwise. The pseudo-random number generator state just prior to the task can be restored using set.seed(seed = seed, kind = algorithm), where seed and algorithm are part of this output.

  • algorithm: name of the pseudo-random number generator algorithm originally supplied to push(), NA otherwise. The pseudo-random number generator state just prior to the task can be restored using set.seed(seed = seed, kind = algorithm), where seed and algorithm are part of this output.

  • error: the first 2048 characters of the error message if the task threw an error, NA otherwise.

  • trace: the first 2048 characters of the text of the traceback if the task threw an error, NA otherwise.

  • warnings: the first 2048 characters. of the text of warning messages that the task may have generated, NA otherwise.

  • launcher: name of the crew launcher where the task ran.


Method wait()

Wait for tasks.

Usage

crew_class_controller$wait(
  mode = "all",
  seconds_interval = NULL,
  seconds_timeout = Inf,
  scale = TRUE,
  throttle = TRUE,
  controllers = NULL
)

Arguments

mode

If mode is "all", then the method waits for all tasks to complete. If mode is "one", then it waits until a one task is complete.

seconds_interval

Number of seconds to wait between polling intervals waiting for tasks. Defaults to the seconds_interval field of the client object.

seconds_timeout

Timeout length in seconds waiting for tasks.

scale

Logical, whether to automatically call scale() to auto-scale workers to meet the demand of the task load. Might not actually auto-scale on every iteration if throttle is TRUE.

throttle

Whether to defer auto-scaling and task collection until the next request at least self$client$seconds_interval seconds from the original request. The idea is similar to shiny::throttle() except that crew does not accumulate a backlog of requests. The technique improves robustness and efficiency. Highly recommended to keep throttle = TRUE in wait() to ensure efficiency and robustness.

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Details

The wait() method blocks the calling R session, repeatedly auto-scales workers for tasks that need them, and repeatedly collects results. The function runs until it either times out or reaches its stopping condition based on the mode argument.

Returns

NULL (invisibly). Call pop() to get the result.


Method summary()

Summarize the workers and tasks of the controller.

Usage

crew_class_controller$summary(controllers = NULL)

Arguments

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

A data frame of summary statistics on the workers and tasks. It has one row per worker websocket and the following columns:

  • controller: name of the controller. . * worker: integer index of the worker.

  • tasks: number of tasks which were completed by a worker at the websocket and then returned by calling pop() on the controller object.

  • seconds: total number of runtime and seconds of all the tasks that ran on a worker connected to this websocket and then were retrieved by calling pop() on the controller object.

  • errors: total number of tasks which ran on a worker at the website, encountered an error in R, and then retrieved with pop().

  • warnings: total number of tasks which ran on a worker at the website, encountered one or more warnings in R, and then retrieved with pop(). Note: warnings is actually the number of tasks, not the number of warnings. (A task could throw more than one warning.


Method terminate()

Terminate the workers and the mirai client.

Usage

crew_class_controller$terminate(controllers = NULL)

Arguments

controllers

Not used. Included to ensure the signature is compatible with the analogous method of controller groups.

Returns

NULL (invisibly).

Details

See crew_controller().

See Also

Other class: crew_class_client, crew_class_controller_group, crew_class_launcher, crew_class_schedule, crew_class_tls

Examples

Run this code
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}

## ------------------------------------------------
## Method `crew_class_controller$new`
## ------------------------------------------------

if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}

Run the code above in your browser using DataLab