Functions that extend the collection of cluster-level functions of the
parallel/snow package while providing additional features, including reproducibility and dynamic cluster resizing. The heart of the package is the function
performParallel
.
performParallel(count, x, fun, initfun = NULL, initexpr = NULL,
export = NULL, exitfun = NULL,
printfun = NULL, printargs = NULL,
printrepl = max(length(x)/10,1),
cltype = getClusterOption("type"),
cluster.args = NULL,
gentype = "RNGstream", seed = sample(1:9999999,6),
prngkind = "default", para = 0,
mngtfiles = c(".clustersize",".proc",".proc_fail"),
ft_verbose = FALSE, ...)clusterApplyFT(cl, x, fun, initfun = NULL, initexpr = NULL,
export = NULL, exitfun = NULL,
printfun = NULL, printargs = NULL,
printrepl = max(length(x)/10,1), gentype = "None",
seed = rep(123456,6), prngkind = "default", para = 0,
mngtfiles = c(".clustersize",".proc",".proc_fail"),
ft_verbose = FALSE, ...)
clusterCallpart(cl, nodes, fun, ...)
clusterEvalQpart(cl, nodes, expr)
printClusterInfo(cl)
clusterApplyFT
returns a list of two elements. The first
one is a list (of length |x|
) of results, the second one is the
(possibly updated)
cluster object.
performParallel
returns a list of results.
Number of cluster nodes. If count=0
, the process runs sequentially.
Cluster object.
Vector of values to be passed to function fun
.
Its length determines how many times fun
is to
be called. x[i]
is passed to fun
(as its first argument)
in the i-th call.
Function or character string naming a function.
Function or character string naming a
function with no
arguments that is to
be called on each node prior to the computation. It is passed to workers using clusterCall
.
It can be used for example for loading required libraries or sourcing data files.
Expression evaluated on workers at the time of node initialization. It corresponds to what would be passed to clusterEvalQ
before the computation. initfun
and initexpr
can be used for the same purpose, but initexpr
does not need to have a form of a function.
Character vector naming objects to be exported to workers.
Function or character string naming a function with no arguments that is to be called on each node after the computation is completed.
printfun
is a function or
character string naming a function that is to be called on the master
node after each
printrepl
completed replicates, and thus it can be used for accessing
intermediate results. Arguments passed to
printfun
are: a list (of length |x|
) of results (including
the non-finished
ones), the number of finished results,
and printargs
.
Character string that specifies cluster type (see
makeClusterFT
). Possible values are 'MPI' and 'SOCK' ('PVM' is currently not available).
List of arguments passed to the function makeClusterFT
. For the ‘SOCK’ layer, the most useful argument in this list is names
which can contain a vector of host names, or a list containing specification for each host (see Example in makeCluster
). Due to the dynamic resizing feature, the length of this vector (or list) does not need to match the size of the cluster - it is used as a pool from which hosts are taken as they are needed. Another useful argument is outfile
, specifying name of a file to which slave node output is to be directed.
Character string that specifies the type of the random number generator (RNG).
Possible values: "RNGstream" (L'Ecuyer's RNG),
"SPRNG", or "None", see
clusterSetupRNG.FT
. If
gentype="None"
, no RNG action is taken.
Seed, kind and parameters for the RNG (see
clusterSetupRNG.FT
). Seed can be an integer or a vector of six integers.
A character vector of length 3 containing names of
management files: mngtfiles[1]
for managing the
cluster size, mngtfiles[2]
for monitoring replicates
as they are processed, mngtfiles[3]
for monitoring failed
replicates. If any of these files equals an empty string, the
corresponding management actions (i.e. dynamic cluster resizing, outputting processed replicates, and cluster repair in case of failures) are not performed. If the files
already exist, their content
is overwritten. Note that the cluster repair action was only available for PVM which is switched off. Furthermore, the dynamic cluster resizing is not available for MPI.
If TRUE, debugging messages are sent to standard output.
Indices of cluster nodes.
Expression to evaluate.
Additional arguments to pass to function fun
.
Hana Sevcikova
clusterApplyFT
is a version of
clusterApplyLB
of the parallel/snow package with additional features, such as results
reproducibility, computation transparency and dynamic cluster
resizing. The master process does the management in its
waiting time.
The file mngtfiles[1]
(which defaults to ‘.clustersize’) is initially written by the master
prior to the computation and it contains a single integer value corresponding
to the number of cluster nodes. The value can be arbitrarily changed by
the user (but should remain in the same format). The master reads the
file in its waiting time. If the value in this file is larger than
the current
cluster size, new nodes are created and the computation is expanded on
them. If on the other hand the value is smaller, nodes are
successively discarded after they finish their current
computation.
The arguments initfun, initexpr, export
and exitfun
in the
clusterApplyFT
function are only used, if there are
changes in the cluster, i.e. if new nodes are added or if nodes are
removed from cluster.
The RNG uses
the scheme 'one stream per replicate', in contrary to 'one stream per
node' used by clusterApplyLB
. Therefore with each replicate, the
RNG is reset to the corresponding stream (identified by the replicate
number). Thus, the final results are reproducible regardless of how many nodes were used.
performParallel
is a wrapper function for
clusterApplyFT
and we recommend using this function rather than
using clusterApplyFT
directly. It creates a cluster of
count
nodes;
on all nodes it
calls initfun
, evaluates initexpr
and export
, and initializes the RNG. Then it calls
clusterApplyFT
. After the computation is finished, it calls
exitfun
on all nodes and stops the cluster. If count=0
, function fun
is invoked sequentially with the same settings (including random numbers) as it would in parallel. This mode can be used for debugging purposes.
clusterCallpart
calls a function fun
with identical arguments
...
on nodes
specified by indices nodes
in the cluster cl
and returns a list
of the results.
clusterEvalQpart
evaluates a literal expression on nodes
specified by indices nodes
.
printClusterInfo
prints out some basic information about the cluster.
if (FALSE) {
# generates n normally distributed random numbers in r replicates
# on p nodes and prints their mean after each r/10 replicate.
printfun <- function(res, n, args = NULL) {
res <- unlist(res)
res <- res[!is.null(res)]
print(paste("mean after:", n, "replicates:", mean(res),
"(from", length(res), "RNs)"))
}
r <- 1000; n <- 100; p <- 5
res <- performParallel(p, rep(n,r), fun = rnorm, seed = 1,
printfun = printfun)
# Setting p <- 0 will run the rnorm call above sequentially and
# should give exactly the same results
res.seq <- performParallel(0, rep(n,r), fun = rnorm, seed = 1,
printfun = printfun)
identical(res, res.seq)
# Example with worker initialization
mean <- 20
sd <- 10
myfun <- function(r) rdnorm(r, mean = mean, sd = sd)
res <- unlist(performParallel(p, rep(1000, 100), fun = myfun, seed = 123,
initexpr = library(extraDistr), export = c("mean", "sd")))
hist(res)
# See example in ?snowFT for plotting cluster usage.
}
Run the code above in your browser using DataLab