Learn R Programming

bettermc (version 1.2.1)

mclapply: parallel::mclapply Wrapper for Better Performance, Error Handling, Seeding and UX

Description

This wrapper for parallel::mclapply adds the following features:

  • reliably detect if a child process failed with a fatal error or if it was killed.

  • get tracebacks after non-fatal errors in child processes.

  • retry on fatal and non-fatal errors.

  • fail early after non-fatal errors in child processes.

  • get crash dumps from failed child processes.

  • capture output from child processes.

  • track warnings, messages and other conditions signaled in the child processes.

  • return results from child processes using POSIX shared memory to improve performance.

  • compress character vectors in results to improve performance.

  • reproducibly seed all function calls.

  • display a progress bar.

Usage

mclapply(
  X,
  FUN,
  ...,
  mc.preschedule = TRUE,
  mc.set.seed = NA,
  mc.silent = FALSE,
  mc.cores = getOption("mc.cores", 2L),
  mc.cleanup = TRUE,
  mc.allow.recursive = TRUE,
  affinity.list = NULL,
  mc.use.names = TRUE,
  mc.allow.fatal = FALSE,
  mc.allow.error = FALSE,
  mc.retry = 0L,
  mc.retry.silent = FALSE,
  mc.retry.fixed.seed = FALSE,
  mc.fail.early = isFALSE(mc.allow.error) && mc.retry == 0L,
  mc.dump.frames = c("partial", "full", "full_global", "no"),
  mc.dumpto = ifelse(interactive(), "last.dump", "file://last.dump.rds"),
  mc.stdout = c("capture", "output", "ignore"),
  mc.warnings = c("m_signal", "signal", "m_output", "output", "m_ignore", "ignore",
    "stop"),
  mc.messages = c("m_signal", "signal", "m_output", "output", "m_ignore", "ignore"),
  mc.conditions = c("signal", "ignore"),
  mc.system.time = FALSE,
  mc.compress.chars = TRUE,
  mc.compress.altreps = c("if_allocated", "yes", "no"),
  mc.share.vectors = getOption("bettermc.use_shm", TRUE),
  mc.share.altreps = c("no", "yes", "if_allocated"),
  mc.share.copy = TRUE,
  mc.shm.ipc = getOption("bettermc.use_shm", TRUE),
  mc.force.fork = FALSE,
  mc.progress = interactive()
)

crash_dumps # environment with crash dumps created by mclapply (cf. mc.dumpto)

Value

mclapply returns a list of the same length as X and named by X. In case of fatal/non-fatal errors and depending on

mc.allow.fatal/mc.allow.error/mc.fail.early, some of the elements might inherit from "fatal-error"/"etry-error"/"fail-early-error" and "try-error" or be NULL.

Format

crash_dumps is an initially empty environment used to store the return values of mclapply (see below) including crash dumps in case of non-fatal errors and if mc.dump.frames != "no" & mc.allow.error == FALSE.

Arguments

X

a vector (atomic or list) or an expressions vector. Other objects (including classed objects) will be coerced by as.list.

FUN

the function to be applied to (mclapply) each element of X or (mcmapply) in parallel to ....

...

For mclapply, optional arguments to FUN. For mcmapply and mcMap, vector or list inputs: see mapply.

mc.preschedule

if set to TRUE then the computation is first divided to (at most) as many jobs are there are cores and then the jobs are started, each job possibly covering more than one value. If set to FALSE then one job is forked for each value of X. The former is better for short computations or large number of values in X, the latter is better for jobs that have high variance of completion time and not too many values of X compared to mc.cores.

mc.set.seed

TRUE or FALSE are directly handled by parallel::mclapply. bettermc also supports two additional values: NA (the default) - seed every invocation of FUN differently but in a reproducible way based on the current state of the random number generator in the parent process. integerish value - call set.seed(mc.set.seed) in the parent and then continue as if mc.set.seed was NA.

In both (NA- and integerish-) cases, the state of the random number generator, i.e. the object .Random.seed in the global environment, is restored at the end of the function to what it was when mclapply was called. If the random number generator is not yet initialized in the current session, it is initialized internally (by calling runif(1)) and the resulting state is what gets restored later. In particular, this means that the seed supplied as mc.set.seed does not seed the code following the call to mclapply. All this ensures that arguments like mc.cores, mc.force.fork etc. can be adjusted without affecting the state of the RNG outside of mclapply.

mc.silent

if set to TRUE then all output on stdout will be suppressed for all parallel processes forked (stderr is not affected).

mc.cores

The number of cores to use, i.e. at most how many child processes will be run simultaneously. The option is initialized from environment variable MC_CORES if set. Must be at least one, and parallelization requires at least two cores.

mc.cleanup

if set to TRUE then all children that have been forked by this function will be killed (by sending SIGTERM) before this function returns. Under normal circumstances mclapply waits for the children to deliver results, so this option usually has only effect when mclapply is interrupted. If set to FALSE then child processes are collected, but not forcefully terminated. As a special case this argument can be set to the number of the signal that should be used to kill the children instead of SIGTERM.

mc.allow.recursive

Unless true, calling mclapply in a child process will use the child and not fork again.

affinity.list

a vector (atomic or list) containing the CPU affinity mask for each element of X. The CPU affinity mask describes on which CPU (core or hyperthread unit) a given item is allowed to run, see mcaffinity. To use this parameter prescheduling has to be deactivated (mc.preschedule = FALSE).

mc.use.names

if TRUE and if X is character, use X as names for the result unless it had names already.

mc.allow.fatal

should fatal errors in child processes make mclapply fail (FALSE, default) or merely trigger a warning (TRUE)?

TRUE returns objects of classes c("fatal-error", "try-error") for failed invocations. Hence, in contrast to parallel::mclapply, it is OK for FUN to return NULL.

NA returns the same as TRUE, but without a warning.

mc.allow.fatal can also be NULL. In this case NULL is returned (and a warning is signaled), which corresponds to the behavior of parallel::mclapply.

mc.allow.error

should non-fatal errors in FUN make mclapply fail (FALSE, default) or merely trigger a warning (TRUE)? In the latter case, errors are stored as class c("etry-error", "try-error") objects, which contain full tracebacks and potentially crash dumps (c.f. mc.dump.frames and etry). NA returns the same as TRUE, but without a warning.

mc.retry

abs(mc.retry) is the maximum number of retries of failed applications of FUN in case of both fatal and non-fatal errors. This is useful if we expect FUN to fail either randomly (e.g. non-convergence of a model) or temporarily (e.g. database connections). Additionally, if mc.retry <= -1, the value of mc.cores is gradually decreased with each retry to a minimum of 1 (2 if mc.force.fork = TRUE). This is useful if we expect failures due to too many parallel processes, e.g. the Linux Out Of Memory Killer sacrificing some of the child processes.

The environment variable "BMC_RETRY" indicates the current retry. A value of "0" means first try, a value of "1" first retry, etc.

mc.retry.silent

should the messages indicating both fatal and non-fatal failures during all but the last retry be suppressed (TRUE) or not (FALSE, default)?

mc.retry.fixed.seed

should FUN for a particular element of X always be invoked with the same fixed seed (TRUE) or should a different seed be used on each try (FALSE, default)? Only effective if mc.set.seed is NA or a number.

mc.fail.early

should we try to fail fast after encountering the first (non-fatal) error in FUN? Such errors will be recorded as objects of classes c("fail-early-error", "try-error").

mc.dump.frames

should we dump.frames on non-fatal errors in FUN? The default "partial" omits the frames (roughly) up to the call of FUN. See etry for the other options.

mc.dumpto

where to save the result including the dumped frames if mc.dump.frames != "no" & mc.allow.error == FALSE? Either the name of the variable to create in the environment bettermc::crash_dumps or a path (prefixed with "file://") where to save the object.

mc.stdout

how should standard output from FUN be handled? "capture" captures the output (in the child processes) and prints it in the parent process after all calls of FUN of the current try (cf. mc.retry), such that it can be captured, sinked etc. there. "output" immediately forwards the output to stdout of the parent; it cannot be captured, sinked etc. there. "ignore" means that the output is not forwarded in any way to the parent process. For consistency, all of this also applies if FUN is called directly from the main process, e.g. because mc.cores = 1.

mc.warnings, mc.messages, mc.conditions

how should warnings, messages and other conditions signaled by FUN be handled? "signal" records all warnings/messages/conditions (in the child processes) and signals them in the master process after all calls of FUN of the current try (cf. mc.retry). "stop" converts warnings (only) into non-fatal errors in the child processes directly. "output" immediately forwards the messages to stderr of the parent; no condition is signaled in the parent process nor is the output capturable/sinkable. "ignore" means that the conditions are not forwarded in any way to the parent process. Options prefixed with "m" additionally try to invoke the "muffleWarning"/"muffleMessage" restart in the child process. Note that, if FUN is called directly from the main process, conditions might be signaled twice in the main process, depending on these arguments.

mc.system.time

should system.time be used to measure CPU (and other) times used by the invocations of FUN. If TRUE, the list returned will have an attribute "system_times", which itself is a list of the same length as X containing the time measurements.

mc.compress.chars

should character vectors be compressed using char_map before returning them from the child process? Can also be the minimum length of character vectors for which to enable compression. This generally increases performance because (de)serialization of character vectors is particularly expensive.

mc.compress.altreps

should a character vector be compressed if it is an ALTREP? The default "if_allocated" only does so if the regular representation was already created. This was chosen as the default because in this case is is the regular representation which would be serialized.

mc.share.vectors

should non-character atomic vectors, S3 objects based hereon and factors be returned from the child processes using POSIX shared memory (cf. copy2shm)? Can also be the minimum length of vectors for which to use shared memory. This generally increases performance because shared memory is a much faster form of inter process communication compared to pipes and we do not need to serialize the vectors.

mc.share.altreps

should a non-character vector be returned from the child process using POSIX shared memory if it is an ALTREP?

mc.share.copy

should the parent process use a vector placed in shared memory due to mc.share.vectors directly (FALSE) or rather a copy of it (TRUE)? See copy2shm for the implications.

mc.shm.ipc

should the results be returned from the child processes using POSIX shared memory (cf. copy2shm)?

mc.force.fork

should it be ensured that FUN is always called in a forked child process, even if length(X) == 1? This is useful if we use forking to protect the main R process from fatal errors, memory corruption, memory leaks etc. occurring in FUN. This feature requires that mc.cores >= 2 and also ensures that the effective value for mc.cores never drops to less than 2 as a result of mc.retry being negative.

mc.progress

should a progress bar be printed to stderr of the parent process (package progress must be installed)?

POSIX Shared Memory

The shared memory objects created by mclapply are named as follows (this may be subject to change): /bmc_ppid_timestamp_idx_cntr (e.g. /bmc_21479_1601366973201_16_10), with

timestamp

the time at which mclapply was invoked (in milliseconds since epoch; on macOS: seconds since epoch, due to its 31-character limit w.r.t. POSIX names).

idx

the index of the current element of X (1-based).

cntr

an internal counter (1-based) referring to all the objects created due to mc.share.vectors for the current value of X; a value of 0 is used for the object created due to mc.shm.ipc.

bettermc::mclapply does not err if copying data to shared memory fails. It will rather only print a message and return results the usual way.

POSIX shared memory has (at least) kernel persistence, i.e. it is not automatically freed due to process termination, except if the object is/was unlinked. bettermc tries hard to not leave any byte behind, but it could happen that unlinking is incomplete if the parent process is terminated while bettermc::mclapply is running.

On Linux you can generally inspect the (not-unlinked) objects currently stored in shared memory by listing the files under /dev/shm.

(Linux) Size of POSIX Shared Memory

On Linux, POSIX shared memory is implemented using a tmpfs typically mounted under /dev/shm. If not changed by the distribution, the default size of it is 50% of physical RAM. It can be changed (temporarily) by remounting it with a different value for the size option, e.g. mount -o "remount,size=90%" /dev/shm.

(Linux) POSIX Shared Memory and Transparent Hugepage Support

When allocating a shared memory object of at least getOption("bettermc.hugepage_limit", 104857600) bytes of size (default is 100 MiB), we use madvise(..., MADV_HUGEPAGE) to request the allocation of (transparent) huge pages. For this to have any effect, the tmpfs used to implement POSIX shared memory on Linux (typically mounted under /dev/shm) must be (re)mounted with option huge=advise, i.e. mount -o remount,huge=advise /dev/shm. (The default is huge=never, but this might be distribution-specific.)

Windows Support

On Windows, otherwise valid values for various arguments are silently replaced as follows:

  mc.cores <- 1L
  mc.share.vectors <- Inf
  mc.shm.ipc <- FALSE
  mc.force.fork <- FALSE
  mc.progress <- FALSE
  if (mc.stdout == "output") mc.stdout <- "ignore"
  if (mc.warnings == "output") mc.warnings <- "ignore"
  if (mc.messages == "output") mc.messages <- "ignore"

Note: parallel::mclapply demands mc.cores to be exactly 1 on Windows; bettermc::mclapply sets it to 1 on Windows.

Furthermore, parallel::mclapply ignores the following arguments on Windows: mc.preschedule, mc.silent, mc.cleanup, mc.allow.recursive, affinity.list. For mc.set.seed, only the values TRUE and FALSE are ignored (by parallel::mclapply); the other values are handled by bettermc::mclapply as documented above.

Lifecycle

[Stable]

See Also

copy2shm, char_map, parallel::mclapply