bpiterate
iterates over an indeterminate number of data chunks
(e.g., records in a file). Each chunk is processed by parallel workers
in an asynchronous fashion; as each worker finishes it receives a
new chunk. Data are traversed a single time.
bpiterate(ITER, FUN, ..., BPPARAM=bpparam())
"bpiterate"(ITER, FUN, ..., BPPARAM=bpparam())
"bpiterate"(ITER, FUN, ..., BPPARAM=bpparam())
ITER
;
run on parallel workers separate from the master. When
BPPARAM is a MulticoreParam, FUN is `decorated` with additional
arguments and therefore must have ... in the signature.
BiocParallelParam
instance
determining the parallel back-end to be used during evaluation, or a
list
of BiocParallelParam
instances, to be applied in
sequence for nested calls to bpiterate
.
FUN
, or REDUCE
or init
.
REDUCE
: Optional function that combines
(reduces) output from FUN
. As each worker returns,
the data are combined with the REDUCE
function.
REDUCE
takes 2 arguments; one is the current
result and the other is the output of FUN
from
a worker that just finished.
init
: Optional initial value for REDUCE
;
must be of the same type as the object returned from FUN
.
When supplied, reduce.in.order
is set to TRUE.
reduce.in.order
: Logical. When TRUE, REDUCE
is applied to the results from the workers in the same order
the tasks were sent out.
list
the same length as the number of chunks in
ITER()
. When REDUCE
is used, the return is consistent
with application of the reduction. Supported for SnowParam
and MulticorParam
.
bpiterate
iterates through an unknown number of data
chunks, dispatching chunks to parallel workers as they
become available. In contrast, other bp*apply
functions
such as bplapply
or bpmapply
require the number of
data chunks to be specified ahead of time. This quality makes
bpiterate
useful for iterating through files of unknown length.
ITER
serves up chunks of data until the end of the file
is reached at which point it returns NULL. Note that ITER
should continue to return NULL reguardless of the number of times
it is invoked after reaching the end of the file. FUN
is applied to each object (data chunk) returned by ITER
.
bpvec
for parallel, vectorized calculations.
bplapply
for parallel, lapply-like calculations.
BiocParallelParam
for details of BPPARAM
.
## Not run:
# if (require(Rsamtools) && require(RNAseqData.HNRNPC.bam.chr14) &&
# require(GenomicAlignments) && require(ShortRead)) {
#
# ## ----------------------------------------------------------------------
# ## Iterate through a BAM file
# ## ----------------------------------------------------------------------
#
# ## Select a single file and set 'yieldSize' in the BamFile object.
# fl <- RNAseqData.HNRNPC.bam.chr14_BAMFILES[[1]]
# bf <- BamFile(fl, yieldSize = 300000)
#
# ## bamIterator() is initialized with a BAM file and returns a function.
# ## The return function requires no arguments and iterates through the
# ## file returning data chunks the size of yieldSize.
# bamIterator <- function(bf) {
# done <- FALSE
# if (!isOpen( bf))
# open(bf)
#
# function() {
# if (done)
# return(NULL)
# yld <- readGAlignments(bf)
# if (length(yld) == 0L) {
# close(bf)
# done <<- TRUE
# NULL
# } else yld
# }
# }
#
# ## FUN counts reads in a region of interest.
# roi <- GRanges("chr14", IRanges(seq(19e6, 107e6, by = 10e6), width = 10e6))
# counter <- function(reads, roi, ...) {
# countOverlaps(query = roi, subject = reads)
# }
#
# ## Initialize the iterator.
# ITER <- bamIterator(bf)
#
# ## The number of chunks returned by ITER() determines the result length.
# bpparam <- MulticoreParam(workers = 3)
# bpiterate(ITER, counter, roi = roi, BPPARAM = bpparam)
#
# ## Re-initialize the iterator and combine on the fly with REDUCE:
# ITER <- bamIterator(bf)
# bpparam <- MulticoreParam(workers = 3)
# bpiterate(ITER, counter, REDUCE = sum, roi = roi, BPPARAM = bpparam)
#
# ## ----------------------------------------------------------------------
# ## Iterate through a FASTA file
# ## ----------------------------------------------------------------------
#
# ## Set data chunk size with 'n' in the FastqStreamer object.
# sp <- SolexaPath(system.file('extdata', package = 'ShortRead'))
# fl <- file.path(analysisPath(sp), "s_1_sequence.txt")
#
# ## Create an iterator that returns data chunks the size of 'n'.
# fastqIterator <- function(fqs) {
# done <- FALSE
# if (!isOpen(fqs))
# open(fqs)
#
# function() {
# if (done)
# return(NULL)
# yld <- yield(fqs)
# if (length(yld) == 0L) {
# close(fqs)
# done <<- TRUE
# NULL
# } else yld
# }
# }
#
# ## The process function summarizes the number of times each sequence occurs.
# summary <- function(reads, ...) {
# ShortRead::tables(reads, n = 0)$distribution
# }
#
# ## Create a param.
# bpparam <- SnowParam(workers = 2)
#
# ## Initialize the streamer and iterator.
# fqs <- FastqStreamer(fl, n = 100)
# ITER <- fastqIterator(fqs)
# bpiterate(ITER, summary, BPPARAM = bpparam)
#
# ## Results from the workers are combined on the fly when REDUCE is used.
# ## Collapsing the data in this way can substantially reduce memory
# ## requirements.
# fqs <- FastqStreamer(fl, n = 100)
# ITER <- fastqIterator(fqs)
# bpiterate(ITER, summary, REDUCE = merge, all = TRUE, BPPARAM = bpparam)
#
# }
# ## End(Not run)
Run the code above in your browser using DataLab