The async
package: Generators, async/await, and asynchronous streams
for R
This is an R package implementing generators, async blocks, and streams (collectively known as “coroutines.”)
New features in version 0.3
- Coroutines now support single step debugging. Use
debugAsync(obj, TRUE)
to pause before each call at R level. You can also usedebugAsync(obj, internal=TRUE)
to step through at the coroutine implementation level. - Coroutines are printed with a label indicating where in their code they are paused.
switch
supportsgoto()
to transfer to a different branch.- Coroutines now support
on.exit()
. - There is now syntax for generator functions:
gen(function(x, y) ...)
returns a function that constructs generators. run(...)
will execute a generator expression immediately and collect the results in a list.- There is now an experimental
stream()
coroutine backed by achannel
class (asynchronous iterator). - The underlying implementation now includes the back half of a
compiler. As evidence of this, you can draw a graph of a coroutine’s
control structures by calling
graphAsync(gen)
(this requires you have Graphvizdot
command installed on your system.)
For more details see NEWS.md.
Generators
g <- gen({...})
is like a function that knows how to “pause.” The code
in a generator runs until it hits a yield()
call, then returns that
value. The next time you call the generator it picks up where it left
off and runs until the next yield
.
From the outside a generator implements the iteror
interface. You
extract each yielded value with nextOr(g, or)
, and you can use
generators anywhere you can use an iteror. The iteror
class is cross
compatible with the
iterators package.
Example: Collatz sequence
Consider a sequence of numbers x[i]
, starting with an arbitrary
x[1]
, where each subsequent element is produced by applying the rule:
- If
x[i]
is even, then the next value will bex[i+1] = x[i]/2
. - if
x[i]
is odd, the next value will bex[i+1] = 3*x[i]+1
.
An infinite sequence of numbers will continue form each staring point
x[1]
, but it is
conjectured that all
sequences will eventually reach the loop 1, 4, 2, 1, 4, 2, …. The
following generator produces the Collatz sequence, starting from x
,
and terminating when (or if?) the sequence reaches 1.
library(async)
collatz <- gen(function(x) {
yield(x)
while (x > 1) {
if (x %% 2 == 0)
yield(x <- x / 2L)
else yield(x <- 3L * x + 1)
}
})
The call to gen
produces a generator. You can get values one at a time
with nextOr()
.
ctz <- collatz(12)
nextOr(ctz, NA)
## [1] 12
nextOr(ctz, NA)
## [1] 6
nextOr(ctz, NA)
## [1] 3
nextOr(ctz, NA)
## [1] 10
nextOr(ctz, NA)
## [1] 5
You can also use any other method that applies to an iterator, like
as.list
.
collatz(27L) |> as.list() |> as.numeric()
## [1] 27 82 41 124 62 31 94 47 142 71 214 107 322 161 484
## [16] 242 121 364 182 91 274 137 412 206 103 310 155 466 233 700
## [31] 350 175 526 263 790 395 1186 593 1780 890 445 1336 668 334 167
## [46] 502 251 754 377 1132 566 283 850 425 1276 638 319 958 479 1438
## [61] 719 2158 1079 3238 1619 4858 2429 7288 3644 1822 911 2734 1367 4102 2051
## [76] 6154 3077 9232 4616 2308 1154 577 1732 866 433 1300 650 325 976 488
## [91] 244 122 61 184 92 46 23 70 35 106 53 160 80 40 20
## [106] 10 5 16 8 4 2 1
#Try collatz(63728127L) |> as.list() |> as.numeric()...
For more examples, see the “Clapping Music” vignette.
Async/await
Like gen
, async({...})
takes a block of sequential code, which runs
until it reaches a call to await(p)
. The argument p
should be a
promise, (as defined by the
promises package,
which represents an unfinished external computation.) In turn, async()
constructs and returns a promise.
An async
block runs until it reaches a call to await(p)
and pauses.
When the promise p
resolves, the async
block continues. If p
rejects, that is evaluated like an error; you can use await(p)
into a
tryCatch
to handle rejections. When the async
block finishes, or
throws an error, its promise resolves or rejects.
Examples:
async
doesn’t handle running concurrent tasks by itself; it builds on
existing packages like future
and later
. The later
package lets
you assign tasks to be done in the event loop, when R is idle.
Ring a bell 5 times at 10 second intervals (subject to R being idle):
async({
for (i in 1:5) {
await(delay(10)) #delay() uses later::later()
cat("Beep", i, "\n")
beepr::beep(2)
}
})
Shiny apps
async()
can be used in Shiny apps! For an example, here is a version
of the “Cranwhales” demo app using
async/await..
Web scraping
async()
allows you to naturally keep track of more than one concurrent
process. The web spider
vignette shows
how this can improve the speed of web scraping using concurrent
connections.
Background processing
async
can also work with future
objects to run computations in
parallel. Download, parse, and summarize a dataset in background
processes like this:
library(future)
library(dplyr)
plan(multiprocess(workers=2))
url <- "http://analytics.globalsuperhypermegamart.com/2020/March.csv.gz"
dest <- "March.csv.gz"
dataset <- async({
if(!file.exists(dest)) {
await(future({
cat("Downloading\n")
download.file(url, dest)
}))
}
data <- await(future({
cat("Parsing\n")
read.csv(dest) |>
mutate(time = hms::trunc_hms(time, 60*60)) |>
group_by(time) |>
summarize(sales=sum(amount))
}))
})
# When the data is ready, plot it (in the main process:)
async({
await(dataset) |>
ggplot(aes(time, n)) +
xlab("Time") +
ylab("Sales")
})
Streams
New in version 0.3 are asynchronous streams and channels. A channel is
an interface for asynchronous iteration; stream()
lets you do things
with channels by writing code with await
and yield
. Here is an
example of channels being used to “walk and chew gum concurrently:”
walk <- stream({
for (i in 1:10)
for (step in c("left", "right")) {
yield(step)
await(delay(0.5))
}
})
chewGum <- stream(for (i in 1:12) {
yield("chew")
await(delay(0.8))
})
printEach <- async(function(st) {
for (each in st) {cat(each, ", ", sep="")}
cat("\n")
})
all <- combine(walk, chewGum) |> printEach()
## left, chew, right, chew, left, right, chew, left, chew, right, left, chew, right, chew, left, right, chew, left, right, chew, left, chew, right, left, chew, right, chew, left, right, chew, left, right,
How does this work anyway?
A longer article will be forthcoming, but the basic gist is the async
package transforms your given program into a state machine.
A coroutine expression is first scanned for uses of await
, yield
,
for
, break
and other control flow calls. Those calls are swapped out
for implementations local to the async
package. Other R calls are
wrapped in functions; all these functions are linked together in so that
each function calls the next in sequence. The result is a graph of
functions calling each other, each call corresponding to a step in the
program.
As of async
version 0.3 you can extract and visualize this graph with
graphAsync(g)
. (You will need Graphviz dot
installed to render these
graphs.
ctz <- collatz(23)
graphAsync(ctz, type="svg") #creates a file "ctz.svg"
Since each step in the program’s execution corresponds to a function
call, when execution reaches a yield
, the program’s state is just the
“next function” that would have been called (that is, a
continuation.) To pause
and resume execution, a generator saves that “next function” until the
next time nextOr()
is called.
You can also enable single-stepping at the graph level by calling:
debugAsync(ctz, internal=TRUE)