This wrapper for parallel::mclapply
adds the
following features:
reliably detect if a child process failed with a fatal error or if it was killed.
get tracebacks after non-fatal errors in child processes.
retry on fatal and non-fatal errors.
fail early after non-fatal errors in child processes.
get crash dumps from failed child processes.
capture output from child processes.
track warnings, messages and other conditions signaled in the child processes.
return results from child processes using POSIX shared memory to improve performance.
compress character vectors in results to improve performance.
reproducibly seed all function calls.
display a progress bar.
mclapply(
X,
FUN,
...,
mc.preschedule = TRUE,
mc.set.seed = NA,
mc.silent = FALSE,
mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE,
mc.allow.recursive = TRUE,
affinity.list = NULL,
mc.use.names = TRUE,
mc.allow.fatal = FALSE,
mc.allow.error = FALSE,
mc.retry = 0L,
mc.retry.silent = FALSE,
mc.retry.fixed.seed = FALSE,
mc.fail.early = isFALSE(mc.allow.error) && mc.retry == 0L,
mc.dump.frames = c("partial", "full", "full_global", "no"),
mc.dumpto = ifelse(interactive(), "last.dump", "file://last.dump.rds"),
mc.stdout = c("capture", "output", "ignore"),
mc.warnings = c("m_signal", "signal", "m_output", "output", "m_ignore", "ignore",
"stop"),
mc.messages = c("m_signal", "signal", "m_output", "output", "m_ignore", "ignore"),
mc.conditions = c("signal", "ignore"),
mc.system.time = FALSE,
mc.compress.chars = TRUE,
mc.compress.altreps = c("if_allocated", "yes", "no"),
mc.share.vectors = getOption("bettermc.use_shm", TRUE),
mc.share.altreps = c("no", "yes", "if_allocated"),
mc.share.copy = TRUE,
mc.shm.ipc = getOption("bettermc.use_shm", TRUE),
mc.force.fork = FALSE,
mc.progress = interactive()
)crash_dumps # environment with crash dumps created by mclapply (cf. mc.dumpto)
mclapply
returns a list of the same length as X and named by
X. In case of fatal/non-fatal errors and depending on
mc.allow.fatal
/mc.allow.error
/mc.fail.early
, some of
the elements might inherit from
"fatal-error"/"etry-error"/"fail-early-error" and "try-error"
or be NULL
.
crash_dumps
is an initially empty environment used to store
the return values of mclapply
(see below) including
crash dumps in case of non-fatal errors and if
mc.dump.frames != "no" & mc.allow.error == FALSE
.
a vector (atomic or list) or an expressions vector. Other
objects (including classed objects) will be coerced by
as.list
.
the function to be applied to (mclapply
) each
element of X
or (mcmapply
) in parallel to ...
.
For mclapply
, optional arguments to FUN
.
For mcmapply
and mcMap
, vector or list inputs: see
mapply
.
if set to TRUE
then the computation is
first divided to (at most) as many jobs are there are cores and then
the jobs are started, each job possibly covering more than one
value. If set to FALSE
then one job is forked for each value
of X
. The former is better for short computations or large
number of values in X
, the latter is better for jobs that
have high variance of completion time and not too many values of
X
compared to mc.cores
.
TRUE
or FALSE
are directly handled by
parallel::mclapply
. bettermc
also
supports two additional values: NA
(the default) - seed every
invocation of FUN
differently but in a reproducible way based on the
current state of the random number generator in the parent process.
integerish value - call set.seed(mc.set.seed)
in the parent and then
continue as if mc.set.seed
was NA
.
In both (NA
- and integerish-) cases, the state of the random number
generator, i.e. the object .Random.seed
in the global environment,
is restored at the end of the function to what it was when mclapply
was called. If the random number generator is not yet initialized in the
current session, it is initialized internally (by calling runif(1)
)
and the resulting state is what gets restored later. In particular, this
means that the seed supplied as mc.set.seed
does not seed the
code following the call to mclapply
. All this ensures that arguments
like mc.cores
, mc.force.fork
etc. can be adjusted without
affecting the state of the RNG outside of mclapply
.
if set to TRUE
then all output on
stdout
will be suppressed for all parallel processes forked
(stderr
is not affected).
The number of cores to use, i.e. at most how many
child processes will be run simultaneously. The option is
initialized from environment variable MC_CORES
if set. Must
be at least one, and parallelization requires at least two cores.
if set to TRUE
then all children that have
been forked by this function will be killed (by sending
SIGTERM
) before this function returns. Under normal
circumstances mclapply
waits for the children to deliver
results, so this option usually has only effect when mclapply
is interrupted. If set to FALSE
then child processes are
collected, but not forcefully terminated. As a special case this
argument can be set to the number of the signal that should be used
to kill the children instead of SIGTERM
.
Unless true, calling mclapply
in a
child process will use the child and not fork again.
a vector (atomic or list) containing the CPU
affinity mask for each element of X
. The CPU affinity mask
describes on which CPU (core or hyperthread unit) a given item is
allowed to run, see mcaffinity
. To use this parameter
prescheduling has to be deactivated (mc.preschedule = FALSE
).
if TRUE
and if X
is character, use X
as names for the result unless it had names already.
should fatal errors in child processes make
mclapply
fail (FALSE
, default) or merely trigger a warning
(TRUE
)?
TRUE
returns objects of classes c("fatal-error", "try-error")
for failed invocations. Hence, in contrast to
parallel::mclapply
, it is OK for
FUN
to return NULL
.
NA
returns the same as TRUE
, but without a warning.
mc.allow.fatal
can also be NULL
. In this case NULL
is
returned (and a warning is signaled), which corresponds to the behavior of
parallel::mclapply
.
should non-fatal errors in FUN
make
mclapply
fail (FALSE
, default) or merely trigger a warning
(TRUE
)? In the latter case, errors are stored as class
c("etry-error", "try-error")
objects, which contain full tracebacks
and potentially crash dumps (c.f. mc.dump.frames
and
etry
). NA
returns the same as TRUE
, but without
a warning.
abs(mc.retry)
is the maximum number of retries of
failed applications of FUN
in case of both fatal and non-fatal
errors. This is useful if we expect FUN
to fail either randomly
(e.g. non-convergence of a model) or temporarily (e.g. database
connections). Additionally, if mc.retry <= -1
, the value of
mc.cores
is gradually decreased with each retry to a minimum of 1 (2
if mc.force.fork = TRUE
). This is useful if we expect failures due
to too many parallel processes, e.g. the Linux Out Of Memory Killer
sacrificing some of the child processes.
The environment variable "BMC_RETRY" indicates the current retry. A value of "0" means first try, a value of "1" first retry, etc.
should the messages indicating both fatal and
non-fatal failures during all but the last retry be suppressed
(TRUE
) or not (FALSE
, default)?
should FUN
for a particular element of
X
always be invoked with the same fixed seed (TRUE
) or should
a different seed be used on each try (FALSE
, default)? Only
effective if mc.set.seed
is NA
or a number.
should we try to fail fast after encountering the first
(non-fatal) error in FUN
? Such errors will be recorded as objects of
classes c("fail-early-error", "try-error")
.
should we dump.frames
on non-fatal
errors in FUN
? The default "partial" omits the frames (roughly) up
to the call of FUN
. See etry
for the other options.
where to save the result including the dumped frames if
mc.dump.frames != "no" & mc.allow.error == FALSE
? Either the name of
the variable to create in the environment bettermc::crash_dumps
or a
path (prefixed with "file://") where to save the object.
how should standard output from FUN
be handled?
"capture" captures the output (in the child processes) and prints it in the
parent process after all calls of FUN
of the current try (cf.
mc.retry
), such that it can be captured, sinked etc. there. "output"
immediately forwards the output to stdout of the parent; it cannot
be captured, sinked etc. there. "ignore" means that the output is not
forwarded in any way to the parent process. For consistency, all of this
also applies if FUN
is called directly from the main process, e.g.
because mc.cores = 1
.
how should warnings, messages
and other conditions signaled by FUN
be handled? "signal" records
all warnings/messages/conditions (in the child processes) and signals them
in the master process after all calls of FUN
of the current
try (cf. mc.retry
). "stop" converts warnings (only) into non-fatal
errors in the child processes directly. "output" immediately
forwards the messages to stderr of the parent; no condition is signaled in
the parent process nor is the output capturable/sinkable. "ignore" means
that the conditions are not forwarded in any way to the parent process.
Options prefixed with "m" additionally try to invoke the
"muffleWarning"/"muffleMessage" restart in the child process. Note that, if
FUN
is called directly from the main process, conditions might be
signaled twice in the main process, depending on these arguments.
should system.time
be used to measure
CPU (and other) times used by the invocations of FUN
. If
TRUE
, the list returned will have an attribute "system_times", which
itself is a list of the same length as X
containing the time
measurements.
should character vectors be compressed using
char_map
before returning them from the child process? Can
also be the minimum length of character vectors for which to enable
compression. This generally increases performance because (de)serialization
of character vectors is particularly expensive.
should a character vector be compressed if it is an ALTREP? The default "if_allocated" only does so if the regular representation was already created. This was chosen as the default because in this case is is the regular representation which would be serialized.
should non-character atomic
vectors, S3 objects based hereon and factors be returned from the child
processes using POSIX shared memory (cf. copy2shm
)? Can also
be the minimum length of vectors for which to use shared memory. This
generally increases performance because shared memory is a much faster form
of inter process communication compared to pipes and we do not need to
serialize the vectors.
should a non-character vector be returned from the child process using POSIX shared memory if it is an ALTREP?
should the parent process use a vector placed in shared
memory due to mc.share.vectors
directly (FALSE
) or rather a
copy of it (TRUE
)? See copy2shm
for the implications.
should the results be returned from the child processes
using POSIX shared memory (cf. copy2shm
)?
should it be ensured that FUN
is always called in
a forked child process, even if length(X) == 1
? This is useful if we
use forking to protect the main R process from fatal errors, memory
corruption, memory leaks etc. occurring in FUN
. This feature
requires that mc.cores >= 2
and also ensures that the effective
value for mc.cores
never drops to less than 2 as a result of
mc.retry
being negative.
should a progress bar be printed to stderr of the parent
process (package progress
must be installed)?
The shared memory objects created by
mclapply
are named as follows (this may be subject to change):
/bmc_ppid_timestamp_idx_cntr
(e.g.
/bmc_21479_1601366973201_16_10
), with
the time at which
mclapply
was invoked (in milliseconds since epoch; on macOS: seconds
since epoch, due to its 31-character limit w.r.t. POSIX
names).
the index of the current element of X
(1-based).
an internal counter (1-based) referring to all the
objects created due to mc.share.vectors
for the current value of
X
; a value of 0
is used for the object created due to
mc.shm.ipc
.
bettermc::mclapply
does not err if copying data to shared memory
fails. It will rather only print a message and return results the usual
way.
POSIX shared memory has (at least) kernel persistence, i.e. it is not
automatically freed due to process termination, except if the object is/was
unlinked. bettermc
tries hard to not leave any byte behind, but it
could happen that unlinking is incomplete if the parent process is
terminated while bettermc::mclapply
is running.
On Linux you can generally inspect the (not-unlinked) objects currently stored in shared memory by listing the files under /dev/shm.
On Linux, POSIX shared memory
is implemented using a
tmpfs
typically mounted under /dev/shm
. If not changed by the
distribution, the default size of it is 50% of physical RAM. It can be
changed (temporarily) by remounting it with a different value for the
size option, e.g. mount -o "remount,size=90%" /dev/shm
.
When
allocating a shared memory object of at least
getOption("bettermc.hugepage_limit", 104857600)
bytes of size
(default is 100 MiB), we use
madvise
(...,
MADV_HUGEPAGE)
to request the allocation of
(transparent)
huge pages. For this to have any effect, the
tmpfs
used to implement POSIX shared memory on Linux (typically mounted under
/dev/shm
) must be (re)mounted with option huge=advise, i.e.
mount -o remount,huge=advise /dev/shm
. (The default is
huge=never
, but this might be distribution-specific.)
On Windows, otherwise valid values for various arguments are silently replaced as follows:
mc.cores <- 1L
mc.share.vectors <- Inf
mc.shm.ipc <- FALSE
mc.force.fork <- FALSE
mc.progress <- FALSE
if (mc.stdout == "output") mc.stdout <- "ignore"
if (mc.warnings == "output") mc.warnings <- "ignore"
if (mc.messages == "output") mc.messages <- "ignore"
Note: parallel::mclapply
demands
mc.cores
to be exactly 1 on Windows; bettermc::mclapply
sets
it to 1 on Windows.
Furthermore, parallel::mclapply
ignores
the following arguments on Windows: mc.preschedule, mc.silent,
mc.cleanup, mc.allow.recursive, affinity.list
. For mc.set.seed
,
only the values TRUE
and FALSE
are ignored (by
parallel::mclapply
); the other values are
handled by bettermc::mclapply
as documented above.
copy2shm
, char_map
,
parallel::mclapply