R6
abstract class to build other subclasses
which launch and manage workers.
workers
Data frame of worker information.
name
Name of the launcher.
seconds_interval
See crew_launcher()
.
seconds_timeout
See crew_launcher()
.
seconds_launch
See crew_launcher()
.
seconds_idle
See crew_launcher()
.
seconds_wall
See crew_launcher()
.
tasks_max
See crew_launcher()
.
tasks_timers
See crew_launcher()
.
reset_globals
See crew_launcher()
.
reset_packages
See crew_launcher()
.
reset_options
See crew_launcher()
.
garbage_collection
See crew_launcher()
.
launch_max
See crew_launcher()
.
tls
See crew_launcher()
.
processes
See crew_launcher()
.
asynchronously.
async
A crew_async()
object to run low-level launcher tasks
asynchronously.
throttle
A crew_throttle()
object to throttle scaling.
new()
Launcher constructor.
crew_class_launcher$new(
name = NULL,
seconds_interval = NULL,
seconds_timeout = NULL,
seconds_launch = NULL,
seconds_idle = NULL,
seconds_wall = NULL,
seconds_exit = NULL,
tasks_max = NULL,
tasks_timers = NULL,
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
launch_max = NULL,
tls = NULL,
processes = NULL
)
name
See crew_launcher()
.
seconds_interval
See crew_launcher()
.
seconds_timeout
See crew_launcher()
.
seconds_launch
See crew_launcher()
.
seconds_idle
See crew_launcher()
.
seconds_wall
See crew_launcher()
.
seconds_exit
See crew_launcher()
.
tasks_max
See crew_launcher()
.
tasks_timers
See crew_launcher()
.
reset_globals
See crew_launcher()
.
reset_packages
See crew_launcher()
.
reset_options
See crew_launcher()
.
garbage_collection
See crew_launcher()
.
launch_max
See crew_launcher()
.
tls
See crew_launcher()
.
processes
See crew_launcher()
.
An R6
object with the launcher.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
validate()
Validate the launcher.
crew_class_launcher$validate()
NULL
(invisibly).
set_name()
Set the name of the launcher.
crew_class_launcher$set_name(name)
name
Character of length 1, name to set for the launcher.
NULL
(invisibly).
settings()
List of arguments for mirai::daemon()
.
crew_class_launcher$settings(socket)
socket
Character of length 1, websocket address of the worker to launch.
List of arguments for mirai::daemon()
.
call()
Create a call to crew_worker()
to
help create custom launchers.
crew_class_launcher$call(socket, launcher, worker, instance)
socket
Socket where the worker will receive tasks.
launcher
Character of length 1, name of the launcher.
worker
Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches.
instance
Character of length 1 to uniquely identify the instance of the worker.
Character of length 1 with a call to crew_worker()
.
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)
sockets
For testing purposes only.
Creates the workers data frame. Meant to be called once at the beginning of the launcher life cycle, after the client has started.
NULL
(invisibly).
terminate()
Terminate the whole launcher, including all workers.
crew_class_launcher$terminate()
NULL
(invisibly).
NULL
if the launcher is not started. Otherwise, a tibble
with one row per crew
worker and the following columns:
worker
: integer index of the worker.
launches
: number of times the worker was launched. Each launch
occurs at a different websocket because the token at the end of the
URL is rotated before each new launch.
online
: logical vector, whether the current instance of each
worker was actively connected to its NNG socket during the time of
the last call to tally()
.
discovered
: logical vector, whether the current instance of each
worker had connected to its NNG socket at some point
(and then possibly disconnected) during the time of
the last call to tally()
.
assigned
: cumulative number of tasks assigned, reported by
mirai::daemons()
and summed over all
completed instances of the worker. Does not reflect the activity
of the currently running instance of the worker.
complete
: cumulative number of tasks completed, reported by
mirai::daemons()
and summed over all
completed instances of the worker. Does not reflect the activity
of the currently running instance of the worker.
socket
: current websocket URL of the worker.
tally()
Update the daemons
-related columns of the internal
workers
data frame.
crew_class_launcher$tally(daemons = NULL)
daemons
mirai
daemons matrix. For testing only. Users
should not set this.
NULL
(invisibly).
unlaunched()
Get indexes of unlaunched workers.
crew_class_launcher$unlaunched(n = Inf)
n
Maximum number of worker indexes to return.
A worker is "unlaunched" if it has never connected
to the current instance of its websocket. Once a worker
launches with the launch()
method, it is considered "launched"
until it disconnects and its websocket is rotated with rotate()
.
Integer index of workers available for launch.
booting()
Get workers that may still be booting up.
crew_class_launcher$booting()
A worker is "booting" if its launch time is within the last
seconds_launch
seconds. seconds_launch
is a configurable grace
period when crew
allows a worker to start up and connect to the
mirai
dispatcher. The booting()
function does not know about the
actual worker connection status, it just knows about launch times,
so it may return TRUE
for workers that have already connected
and started doing tasks.
active()
Get active workers.
crew_class_launcher$active()
A worker is "active" if its current instance is online and
connected, or if it is within its booting time window
and has never connected.
In other words, "active" means online | (!discovered & booting)
.
Logical vector with TRUE
for active workers and FALSE
for
inactive ones.
done()
Get done workers.
crew_class_launcher$done()
A worker is "done" if it is launched and inactive.
A worker is "launched" if launch()
was called
and the worker websocket has not been rotated since.
Integer index of inactive workers.
rotate()
crew_class_launcher$rotate()
Rotate websockets at all unlaunched workers.
NULL
(invisibly).
launch()
Launch a worker.
crew_class_launcher$launch(index)
index
Positive integer of length 1, index of the worker to launch.
NULL
(invisibly).
forward()
Forward an asynchronous launch/termination error condition of a worker.
crew_class_launcher$forward(index, condition = "error")
index
Integer of length 1, index of the worker to inspect.
condition
Character of length 1 indicating what to do
with an error if found. "error"
to throw an error,
"warning"
to throw a warning,
"message"
to print a message,
and "character"
to return a character vector of specific
task-level error messages.
The return value is NULL
if no error is found.
Throw an error, throw a warning, or return a character string,
depending on the condition
argument.
errors()
Collect and return the most recent error messages from asynchronous worker launching and termination.
crew_class_launcher$errors()
Character vector of all the most recent error messages
from asynchronous worker launching and termination. NULL
if there are no errors.
wait()
Wait for any local asynchronous launch or termination tasks to complete.
crew_class_launcher$wait()
Only relevant if processes
is a positive integer.
NULL
(invisibly).
scale()
Auto-scale workers out to meet the demand of tasks.
crew_class_launcher$scale(demand, throttle = TRUE)
demand
Number of unresolved tasks.
throttle
TRUE
to skip auto-scaling if it already happened
within the last seconds_interval
seconds. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
NULL
(invisibly)
launch_worker()
Abstract worker launch method.
crew_class_launcher$launch_worker(call, name, launcher, worker, instance)
call
Character of length 1 with a namespaced call to
crew_worker()
which will run in the worker and accept tasks.
name
Character of length 1 with an informative worker name.
launcher
Character of length 1, name of the launcher.
worker
Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches. It is always between 1 and the maximum number of concurrent workers.
instance
Character of length 1 to uniquely identify the current instance of the worker a the index in the launcher.
Launcher plugins will overwrite this method.
A handle to mock the worker launch.
terminate_worker()
Abstract worker termination method.
crew_class_launcher$terminate_worker(handle)
handle
A handle object previously
returned by launch_worker()
which allows the termination
of the worker.
Launcher plugins will overwrite this method.
A handle to mock worker termination.
terminate_workers()
Terminate one or more workers.
crew_class_launcher$terminate_workers(index = NULL)
index
Integer vector of the indexes of the workers
to terminate. If NULL
, all current workers are terminated.
NULL
(invisibly).
Other launcher:
crew_launcher()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$call`
## ------------------------------------------------
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)
Run the code above in your browser using DataLab