R6
abstract class to build other subclasses
which launch and manage workers.
workers
Data frame of worker information.
name
Name of the launcher.
seconds_interval
See crew_launcher()
.
seconds_launch
See crew_launcher()
.
seconds_idle
See crew_launcher()
.
seconds_wall
See crew_launcher()
.
seconds_exit
See crew_launcher()
.
tasks_max
See crew_launcher()
.
tasks_timers
See crew_launcher()
.
reset_globals
See crew_launcher()
.
reset_packages
See crew_launcher()
.
reset_options
See crew_launcher()
.
garbage_collection
See crew_launcher()
.
launch_max
See crew_launcher()
.
tls
See crew_launcher()
.
until
Numeric of length 1, time point when throttled unlocks.
new()
Launcher constructor.
crew_class_launcher$new(
name = NULL,
seconds_interval = NULL,
seconds_launch = NULL,
seconds_idle = NULL,
seconds_wall = NULL,
seconds_exit = NULL,
tasks_max = NULL,
tasks_timers = NULL,
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
launch_max = NULL,
tls = NULL
)
name
See crew_launcher()
.
seconds_interval
See crew_launcher()
.
seconds_launch
See crew_launcher()
.
seconds_idle
See crew_launcher()
.
seconds_wall
See crew_launcher()
.
seconds_exit
See crew_launcher()
.
tasks_max
See crew_launcher()
.
tasks_timers
See crew_launcher()
.
reset_globals
See crew_launcher()
.
reset_packages
See crew_launcher()
.
reset_options
See crew_launcher()
.
garbage_collection
See crew_launcher()
.
launch_max
See crew_launcher()
.
tls
See crew_launcher()
An R6
object with the launcher.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
validate()
Validate the launcher.
crew_class_launcher$validate()
NULL
(invisibly).
settings()
List of arguments for mirai::daemon()
.
crew_class_launcher$settings(socket)
socket
Character of length 1, websocket address of the worker to launch.
List of arguments for mirai::daemon()
.
call()
Create a call to crew_worker()
to
help create custom launchers.
crew_class_launcher$call(socket, launcher, worker, instance)
socket
Socket where the worker will receive tasks.
launcher
Character of length 1, name of the launcher.
worker
Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches.
instance
Character of length 1 to uniquely identify the instance of the worker.
Character of length 1 with a call to crew_worker()
.
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)
sockets
For testing purposes only.
Creates the workers data frame. Meant to be called once at the beginning of the launcher life cycle, after the client has started.
NULL
(invisibly).
NULL
if the launcher is not started. Otherwise, a tibble
with one row per crew
worker and the following columns:
worker
: integer index of the worker.
launches
: number of times the worker was launched. Each launch
occurs at a different websocket because the token at the end of the
URL is rotated before each new launch.
assigned
: cumulative number of tasks assigned, reported by
mirai::daemons()
and summed over all
completed instances of the worker. Does not reflect the activity
of the currently running instance of the worker.
complete
: cumulative number of tasks completed, reported by
mirai::daemons()
and summed over all
completed instances of the worker. Does not reflect the activity
of the currently running instance of the worker.
socket
: current websocket URL of the worker.
done()
Get done workers.
crew_class_launcher$done(daemons = NULL)
daemons
mirai
daemons matrix. For testing only. Users
should not set this.
A worker is "done" if it is launched and inactive.
A worker is "launched" if launch()
was called
and the worker websocket has not been rotated since.
If a worker is currently online, then it is not inactive.
If a worker is not currently online, then it is inactive
if and only if (1) either it connected to the current
websocket at some point in the past,
or (2) seconds_launch
seconds elapsed since launch.
Integer index of inactive workers.
rotate()
crew_class_launcher$rotate(index)
index
Integer index of a worker.
Rotate a websocket.
NULL
(invisibly).
tally()
Update the cumulative assigned and complete statistics.
Used to detect backlogged workers with more assigned than complete tasks. If terminated, these workers need to be relaunched until the backlog of assigned tasks is complete.
crew_class_launcher$tally(daemons = NULL)
daemons
mirai
daemons matrix. For testing only. Users
should not set this.
NULL
(invisibly).
unlaunched()
Get workers available for launch.
crew_class_launcher$unlaunched(n = Inf)
n
Maximum number of worker indexes to return.
Integer index of workers available for launch.
backlogged()
List non-launched backlogged workers.
crew_class_launcher$backlogged()
Integer vector of worker indexes.
resolved()
List non-launched non-backlogged workers.
crew_class_launcher$resolved()
n
Maximum number of worker indexes to return.
Integer vector of worker indexes.
launch()
Launch a worker.
crew_class_launcher$launch(index)
index
Positive integer of length 1, index of the worker to launch.
NULL
(invisibly).
throttle()
Throttle repeated calls.
crew_class_launcher$throttle()
TRUE
to throttle, FALSE
to continue.
scale()
Auto-scale workers out to meet the demand of tasks.
crew_class_launcher$scale(demand, throttle = FALSE)
demand
Number of unresolved tasks.
throttle
Logical of length 1, whether to delay auto-scaling
until the next auto-scaling request at least
self$client$seconds_interval
seconds from the original request.
The idea is similar to shiny::throttle()
except that crew
does not
accumulate a backlog of requests. The technique improves robustness
and efficiency.
NULL
(invisibly)
terminate()
Terminate one or more workers.
crew_class_launcher$terminate(index = NULL)
index
Integer vector of the indexes of the workers
to terminate. If NULL
, all current workers are terminated.
NULL
(invisibly).
terminate_worker()
Abstract method.
crew_class_launcher$terminate_worker(handle)
handle
A handle object previously
returned by launch_worker()
which allows the termination
of the worker.
Does not actually terminate a worker. This method is a placeholder, and its presence allows manual worker termination to be optional.
NULL
(invisibly).
Other class:
crew_class_client
,
crew_class_controller_group
,
crew_class_controller
,
crew_class_schedule
,
crew_class_tls
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(workers = client$workers)
launcher$launch(index = 1L)
m <- mirai::mirai("result", .compute = client$name)
Sys.sleep(0.25)
m$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$call`
## ------------------------------------------------
launcher <- crew_launcher_local()
launcher$call(
socket = "ws://127.0.0.1:5000/3/cba033e58",
launcher = "launcher_a",
worker = 3L,
instance = "cba033e58"
)
Run the code above in your browser using DataLab