A channel
is an object that represents a sequence of values yet
to be determined. It is something like a combination of a promise
and an iteror.
channel(obj, ...)# S3 method for `function`
channel(
obj,
...,
max_queue = 500L,
max_awaiting = 500L,
wakeup = function(...) NULL
)
is.channel(x)
a channel object, supporting methods "nextThen" and "nextOr"
is.channel(x)
returns TRUE if its argument is a channel object.
A user-provided function; it will receive three
callback functions as arguments, in order, emit(val)
,
reject(err)
and close()
Specialized channel methods may take other arguments.
The maximum number of outgoing values to store if
there are no listeners. Beyond this, calling emit
will return
an error.
The maximum number of pending requests. If
there are this many outstanding requests, for values, calling
nextThen(ch, ...)
or nextElem(ch)
will raise an error.
You may optionally provide a callback function here. It will be called when the queue is empty and there is at least one listener/outstanding promise.
an object.
Peter Meilstrup
The channel interface is intended to represent and work with asynchronous, live data sources, for instance event logs, non-blocking connections, paginated query results, reactive values, and other processes that yield a sequence of values over time.
channel
is an S3 method and will attempt to convert the argument
obj
into a channel object according to its class.
The friendly way to obtain values from a channel is to use
awaitNext
or for
loops within an async or stream coroutine.
The low-level interface to obtain values from a channel is to call
nextThen(ch, onNext=, onError=, onClose=, ...)]
, providing callback
functions for at least onNext(val)
. Those callbacks will be
appended to an internal queue, and will be called as soon as data
is available, in the order that requests were received.
You can also treat a channel as an iteror over promises, calling
nextOr(pri)
to return a promise representing the next
available value. Each promise created this way will be resolved in
the order that data come in. Note that this way there is no special
signal for end of iteration; a promise will reject with
a condition message "StopIteration"
to signal end of iteration.
Be careful with the iterator-over-promises interface though: if you
call as.list.iteror(pr)
you may get stuck in an infinite loop, as
as.list
keeps calling nextElem
and receives more promises to
represent values that exist only hypothetically. This is one
reason for the max_listeners
limit.
The friendly way to create a channel with custom behavior is to use
a stream coroutine. Inside of stream()
call await to wait on
promises, awaitNext to wait on other streams and yield to yield
values. To signal end of iteration use return()
(which will
discard its value) and to signal an error use stop()
.
The low-level interface to create a channel with custom behavior
is to call channel(function(emit, reject, cancel) {...})
,
providing your own function definition; your function will
receive those three callback methods as arguments. Then use
whatever means to arrange to call emit(val)
some time in the
future as data comes in. When you are done emitting values, call
the close()
callback. To report an error call
reject(err)
; the next requestor will receive the error. If there
is more than one listener, other queued listeners will get a
close
signal.