Applies an R function to a Spark object (typically, a Spark DataFrame).
spark_apply(
x,
f,
columns = NULL,
memory = TRUE,
group_by = NULL,
packages = NULL,
context = NULL,
name = NULL,
barrier = NULL,
fetch_result_as_sdf = TRUE,
partition_index_param = "",
arrow_max_records_per_batch = NULL,
auto_deps = FALSE,
...
)
An object (usually a spark_tbl
) coercable to a Spark DataFrame.
A function that transforms a data frame partition into a data frame.
The function f
has signature f(df, context, group1, group2, ...)
where
df
is a data frame with the data to be processed, context
is an optional object passed as the context
parameter and group1
to
groupN
contain the values of the group_by
values. When
group_by
is not specified, f
takes only one argument.
Can also be an rlang
anonymous function. For example, as ~ .x + 1
to define an expression that adds one to the given .x
data frame.
A vector of column names or a named vector of column types for
the transformed object. When not specified, a sample of 10 rows is taken to
infer out the output columns automatically, to avoid this performance penalty,
specify the column types. The sample size is configurable using the
sparklyr.apply.schema.infer
configuration option.
Boolean; should the table be cached into memory?
Column name used to group by data frame partitions.
Boolean to distribute .libPaths()
packages to each node,
a list of packages to distribute, or a package bundle created with
spark_apply_bundle()
.
Defaults to TRUE
or the sparklyr.apply.packages
value set in
spark_config()
.
For clusters using Yarn cluster mode, packages
can point to a package
bundle created using spark_apply_bundle()
and made available as a Spark
file using config$sparklyr.shell.files
. For clusters using Livy, packages
can be manually installed on the driver node.
For offline clusters where available.packages()
is not available,
manually download the packages database from
https://cran.r-project.org/web/packages/packages.rds and set
Sys.setenv(sparklyr.apply.packagesdb = "<pathl-to-rds>")
. Otherwise,
all packages will be used by default.
For clusters where R packages already installed in every worker node,
the spark.r.libpaths
config entry can be set in spark_config()
to the local packages library. To specify multiple paths collapse them
(without spaces) with a comma delimiter (e.g., "/lib/path/one,/lib/path/two"
).
Optional object to be serialized and passed back to f()
.
Optional table name while registering the resulting data frame.
Optional to support Barrier Execution Mode in the scheduler.
Whether to return the transformed results in a Spark
Dataframe (defaults to TRUE
). When set to FALSE
, results will be
returned as a list of R objects instead.
NOTE: fetch_result_as_sdf
must be set to FALSE
when the transformation
function being applied is returning R objects that cannot be stored in a Spark
Dataframe (e.g., complex numbers or any other R data type that does not have an
equivalent representation among Spark SQL data types).
Optional if non-empty, then f
also receives
the index of the partition being processed as a named argument with this name, in
addition to all positional argument(s) it will receive
NOTE: when fetch_result_as_sdf
is set to FALSE
, object returned from the
transformation function also must be serializable by the base::serialize
function in R.
Maximum size of each Arrow record batch, ignored if Arrow serialization is not enabled.
[Experimental] Whether to infer all required R packages by
examining the closure f()
and only distribute required R and their
transitive dependencies to Spark worker nodes (default: FALSE).
NOTE: this option will only take effect if packages
is set to
TRUE
or is a character vector of R package names. If packages
is a character vector of R package names, then both the set of packages
specified by packages
and the set of inferred packages will be
distributed to Spark workers.
Optional arguments; currently unused.
spark_config()
settings can be specified to change the workers
environment.
For instance, to set additional environment variables to each
worker node use the sparklyr.apply.env.*
config, to launch workers
without --vanilla
use sparklyr.apply.options.vanilla
set to
FALSE
, to run a custom script before launching Rscript use
sparklyr.apply.options.rscript.before
.
# NOT RUN {
library(sparklyr)
sc <- spark_connect(master = "local[3]")
# creates an Spark data frame with 10 elements then multiply times 10 in R
sdf_len(sc, 10) %>% spark_apply(function(df) df * 10)
# using barrier mode
sdf_len(sc, 3, repartition = 3) %>%
spark_apply(nrow, barrier = TRUE, columns = c(id = "integer")) %>%
collect()
# }
# NOT RUN {
# }
Run the code above in your browser using DataLab