Learn R Programming

maestro

maestro is a lightweight framework for creating and orchestrating data pipelines in R. At its core, maestro is an R script scheduler that is unique in two ways:

  1. Stateless: It does not need to be continuously running - it can be run in a serverless architecture
  2. Use of rounded scheduling: The timeliness of pipeline executions depends on how often you run your orchestrator

In maestro you create pipelines (functions) and schedule them using roxygen2 tags - these are special comments (decorators) above each function. Then you create an orchestrator containing maestro functions for scheduling and invoking the pipelines.

Installation

maestro is available on CRAN and can be installed via:

install.packages("maestro")

Or, try out the development version via:

devtools::install_github("https://github.com/whipson/maestro")

Project Setup

A maestro project needs at least two components:

  1. A collection of R pipelines (functions) that you want to schedule
  2. A single orchestrator script that kicks off the scripts when they’re scheduled to run

The project file structure will look like this:

sample_project
├── orchestrator.R
└── pipelines
    ├── my_etl.R
    ├── pipe1.R
    └── pipe2.R

Use maestro::create_maestro() to easily create this project structure in a blank R project.

Let’s look at each of these in more detail.

Pipelines

A pipeline is task we want to run. This task may involve retrieving data from a source, performing cleaning and computation on the data, then sending it to a destination. maestro is not concerned with what your pipeline does, but rather when you want to run it. Here’s a simple pipeline in maestro:

#' Example ETL pipeline
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-03-25 12:30:00
my_etl <- function() {
  
  # Pretend we're getting data from a source
  message("Get data")
  extracted <- mtcars
  
  # Transform
  message("Transforming")
  transformed <- extracted |> 
    dplyr::mutate(hp_deviation = hp - mean(hp))
  
  # Load - write to a location
  message("Writing")
  # write.csv(transformed, file = paste0("transformed_mtcars_", Sys.Date(), ".csv"))
}

What makes this a maestro pipeline is the use of special roxygen-style comments above the function definition:

  • #' @maestroFrequency 1 day indicates that this function should execute at a daily frequency.

  • #' @maestroStartTime 2024-03-25 12:30:00 denotes the first time it should run.

In other words, we’d expect it to run every day at 12:30 starting the 25th of March 2024. There are more maestro tags than these ones and all follow the camelCase convention established by roxygen2.

Orchestrator

The orchestrator is a script that checks the schedules of all the pipelines in a maestro project and executes them. The orchestrator also handles global execution tasks such as collecting logs and managing shared resources like global objects and custom functions.

You have the option of using Quarto, RMarkdown, or a straight-up R script for the orchestrator, but the former two have some advantages with respect to deployment on Posit Connect.

A simple orchestrator looks like this:

library(maestro)

# Look through the pipelines directory for maestro pipelines to create a schedule
schedule <- build_schedule(pipeline_dir = "pipelines")

# Checks which pipelines are due to run and then executes them
output <- run_schedule(
  schedule, 
  orch_frequency = "1 day"
)

The function build_schedule() scours through all the pipelines in the project and builds a schedule. Then run_schedule() checks each pipeline’s scheduled time against the system time within some margin of rounding and calls those pipelines to run.

Copy Link

Version

Install

install.packages('maestro')

Monthly Downloads

508

Version

0.5.3

License

MIT + file LICENSE

Issues

Pull Requests

Stars

Forks

Maintainer

Will Hipson

Last Published

April 1st, 2025

Functions in maestro (0.5.3)

maestro

maestro package
last_run_warnings

Retrieve latest maestro pipeline warnings
invoke

Manually run a pipeline regardless of schedule
get_status

Get the statuses of the pipelines in a MaestroSchedule object
last_run_messages

Retrieve latest maestro pipeline messages
last_run_errors

Retrieve latest maestro pipeline errors
is_valid_dag

Checks whether a DAG is valid (no loops)
run_schedule

Run a schedule
parse_rounding_unit

Parse a time string
maestro_tags

Maestro Tags
get_pipeline_run_sequence

Generate a sequence of run times for a pipeline
get_schedule

Get the schedule from a MaestroSchedule object
maestro_parse_cli

cli output for generate schedule table
last_build_errors

Retrieve latest maestro build errors
show_network

Visualize the schedule as a DAG
suggest_orch_frequency

Suggest orchestrator frequency based on a schedule
MaestroSchedule

Class for a schedule of pipelines
create_maestro

Creates a new maestro project
create_pipeline

Create a new pipeline in a pipelines directory
build_schedule

Build a schedule table
MaestroPipeline

Class for an individual maestro pipeline A pipeline is defined as a single R script with a schedule or input
MaestroPipelineList

Class for a list of MaestroPipelines A MaestroPipelineList is created when there are multiple maestro pipelines in a single script
create_orchestrator

Create a new orchestrator
get_artifacts

Get the artifacts (return values) of the pipelines in a MaestroSchedule object.
convert_to_seconds

Convert a duration string to number of seconds
build_schedule_entry

Parse and validate tags then create and populate MaestroPipelineList