Learn R Programming

async (version 0.3.2)

stream: Create an asynchronous iterator by writing sequential code.

Description

(Experimental as of async 0.3) stream(...) constructs a channel object, i.e. an asynchronous iterator, which will compute and return values according to sequential code written in expr. A stream is a coroutine wearing a channel interface in the same way that async is a coroutine wearing a promise interface, and a gen is a coroutine sitting behind an iteror interface.

Usage

stream(
  expr,
  ...,
  split_pipes = TRUE,
  lazy = TRUE,
  compileLevel = getOption("async.compileLevel"),
  debugR = FALSE,
  debugInternal = FALSE,
  trace = getOption("async.verbose")
)

Value

An object with (at least) classes "stream", "channel", "coroutine", "iteror", "iter".

Arguments

expr

A coroutine expression, using some combination of yield, await, awaitNext, yieldFrom, standard control flow operators and other calls.

...

Undocumented.

split_pipes

See description under async; defaults to TRUE.

lazy

If TRUE, start paused, and pause after yield() (see above.)

compileLevel

Compilation level.

debugR

Set TRUE to single-step debug at R level. Use debugAsync() to enable or disable debugging on a stream after it has been created.

debugInternal

Set TRUE to single-step debug at coroutine implementation level.

trace

An optional tracing function.

Author

Peter Meilstrup

Details

In a stream expression, you can call yield() to emit a value, and await() to wait for a value from a promise. To have your stream wait for values from another stream or channel, call awaitNext(); you can also use awaitNext when you are writing an async. You can also use a simple for loop to consume all future values from a stream or channel.

The lower-level interface to consume values from a stream is by using nextThen from the channel interface.

Streams come in both "lazy" and "eager" varieties. If lazy=TRUE, a stream starts idle, and does not process anything until it is woken up by a call to its channel's nextThen. It will pause after reaching yield if there are no more outstanding requests. If lazy=FALSE, a stream will begin executing immediately, not pausing on yield, possibly queuing up emitted values until it needs to await something.

(For comparison, in this package, gen are lazy in that they do not start executing until a call to nextOr and pause immediately after yield, while async blocks are eager, starting at construction and running until they hit an await.)

Like its coroutine counterparts, if stream is given a function expression, like stream(function(...) ...), it will return a "stream function" i.e. a function that constructs a stream object.

Examples

Run this code

# emit values _no more than_ once per second
count_to <- stream(function(n, interval=1) {
  for (i in 1:n) {
    await(delay(interval))
    yield(i)
  }
})

accumulate <- stream(function(st, sum=0) {
  for (i in st) {sum <- sum + i; yield(sum)}
})

print_each <- async(function(st) for (i in st) print(i))

count_to(10) |> accumulate() |> print_each()

Run the code above in your browser using DataLab