Part 2 The core Future API

2.1 Three atomic building blocks

There are three atomic building blocks that do everything we need:

  • f <- future(expr) : evaluates an expression via a future (non-blocking, if possible)

  • r <- resolved(f) : TRUE if future is resolved, otherwise FALSE (non-blocking)

  • v <- value(f) : the value of the future expression expr (blocking until resolved)

2.1.1 Mental model: The Future API decouples a regular R assignment into two parts

Let’s consider a regular assignment in R:

v <- expr

To us, this assignment is single operator, but internally it’s done in two steps:

  1. R evaluates the expression expr on the right-hand side (RHS), and
  2. assigns the resulting value to the variable v on the left-hand side (LHS).

We can think of the Future API as decoupling these two steps and giving us full access to them:

f <- future(expr)
v <- value(f)

This decoupling is the key feature of all parallel processing!

Let’s break this down using a simple example. Consider the following, very slow, implementation of sum();

slow_sum <- function(x) {
  sum <- 0
  for (kk in seq_along(x)) {
    sum <- sum + x[kk]
    Sys.sleep(0.1)  # emulate 0.1 second cost per addition
  }
  sum
}

For example, if we call:

x <- 1:100
v <- slow_sum(x)
v
#> [1] 5050

it takes ten seconds to complete.

We can evaluate this via a future running in the background as:

library(future)
plan(multisession) # evaluate futures in parallel

x <- 1:100
f <- future(slow_sum(x))
v <- value(f)

When we call:

f <- future(slow_sum(x))

then:

  1. a future is created, comprising:
    • the R expression slow_sum(x),
    • function slow_sum(), and
    • integer vector x
  2. These future components are sent to a parallel worker, which starts evaluating the R expression
  3. The future() function returns immediately a reference f to the future, and before the future evaluation is completed

When we call:

v <- value(f)

then:

  1. the future asks the worker if it’s ready or not (using resolved() internally)
  2. if it is not ready, then it waits until it’s ready (blocking)
  3. when ready, the results are collected from the worker
  4. the value of the expression is returned

As we saw before, there is nothing preventing us from doing other things in-between creating the future and asking for its value, e.g.

## Create future
f <- future(slow_sum(x))

## We are free to do whatever we want while future is running, e.g.
z <- sd(x)

## Wait for future to be done
v <- value(f)

2.1.2 Keep doing other things while waiting

We can use the resolved() function to check whether the future is resolved or not. If not, we can choose to do other things, e.g. output a message:

f <- future(slow_sum(x))

while (!resolved(f)) {
  message("Waiting ...")
  Sys.sleep(1.0)
}
message("Done!")

#> Waiting ...
#> Waiting ...
#> Waiting ...
#> ...
#> Waiting ...
#> Done!

v <- value(f)
v
#> [1] 5050

We can of course do other things than outputting messages, e.g. calculations and checking in on other futures.

2.1.3 Evaluate several things in parallel

There’s nothing preventing us from launching more than one future in the background. For example, we can split the summation of x into two parts, calculate the sum of each part, and then combine the results at the end:

x_head <- head(x, 50)
x_tail <- tail(x, 50)

v1 <- slow_sum(x_head)         ## ~5 secs (blocking)
v2 <- slow_sum(x_tail)         ## ~5 secs (blocking)
v <- v1 + v2

We can do the same in parallel:

f1 <- future(slow_sum(x_head)) ## ~5 secs (in parallel)
f2 <- future(slow_sum(x_tail)) ## ~5 secs (in parallel)

## Do other things
z <- sd(x)

v <- value(f1) + value(f2)     ## ready after ~5 secs

We can launch as manual parallel futures as we have parallel workers, e.g.

plan(multisession, workers = 8)
nbrOfWorkers()
#> [1] 8

plan(multisession, workers = 2)
nbrOfWorkers()
#> [1] 2

If we launch more than this, then the call to future() will block until one of the workers are free again, e.g.

plan(multisession, workers = 2)
nbrOfWorkers()
#> [1] 2

f1 <- future(slow_sum(x_head))
f2 <- future(slow_sum(x_tail))
f3 <- future(slow_sum(1:200))   ## <= blocks here

resolved(f1)
#> [1] TRUE
resolved(f2)
#> [1] TRUE
resolved(f3)
#> [1] FALSE

2.2 Choosing parallel backend

  • plan() - set how and where futures are evaluated

2.2.1 sequential (default)

plan(sequential)

f1 <- future(slow_sum(x_head)) # blocks until done
f2 <- future(slow_sum(x_tail)) # blocks until done

v <- value(f1) + value(f2)

2.2.2 multisession: in parallel on local computer

plan(multisession, workers = 2)

f1 <- future(slow_sum(x_head)) # in the background
f2 <- future(slow_sum(x_tail)) # in the background

v <- value(f1) + value(f2)

What’s happening under the hood is(*):

workers <- parallelly::makeClusterPSOCK(2)
plan(cluster, workers = workers)

which is very similar to:

workers <- parallel::makePSOCKcluster(2)
plan(cluster, workers = workers)

(*) It actually does makeClusterPSOCK(2, rscript_libs = .libPaths()), which gives a smoother ride in some R setups, e.g. RStudio Connect.

2.2.3 cluster: in parallel on multiple computers

If we have SSH access to other machines with R installed, we can do:

hostnames <- c("pi", "remote.server.org")
plan(cluster, workers = hostnames)

f1 <- future(slow_sum(x_head)) # on either 'pi' or 'remote.server.org'
f2 <- future(slow_sum(x_tail)) # on either 'pi' or 'remote.server.org'

v <- value(f1) + value(f2)

What’s happening under the hood is:

hostnames <- c("pi", "remote.server.org")
workers <- parallelly::makeClusterPSOCK(hostnames)
plan(cluster, workers = workers)

where makeClusterPSOCK() connects to the different machines over SSH using pre-configured SSH keys and reverse tunneling of ports.

FYI, if you would try to do this with the parallel package, you need to:

  1. know your public IP number
  2. open up your incoming firewall (requires admin rights + security risk)
  3. configure port forwarding from public IP number to your local machine (requires admin rights)

e.g.

workers <- parallel::makePSOCKcluster(hostnames, master = my_public_ip)

This is one reason why the parallelly package was created - “it just works”;

workers <- parallelly::makeClusterPSOCK(hostnames)
The ‘parallelly’ hexlogo
The ‘parallelly’ hexlogo

The parallelly package is a utility package, part of the futureverse.

2.2.4 There are other parallel backends and more to come

2.2.4.1 future.callr - parallelize locally using callr

The callr package can evaluate R expressions in the background on your local computer. The future.callr implements a future backend on top of callr, e.g.

plan(future.callr::callr, workers = 4)

This works similarly to plan(multisession, workers = 4), but has the benefit of being able to run more than 125 background workers, which is a limitation of R itself.

2.2.4.2 future.batchtools - parallelize using batchtools

The batchtools package is designed to evaluate R expressions via a, so called, job scheduler. Job schedulers are commonly used on high-performance compute (HPC) clusters, where many users run at the same time. The job scheduler allows them to request slots on the system, which often has tens or hundreds of compute nodes. Common job schedulers are Slurm, SGE, and Torque.

The future.batchtools implements a future backend on top of batchtools, e.g.

plan(future.batchtools::batchtools_slurm)

This will cause future() to be submitted to the job scheduler’s queue. When a slot is available, the job is processed on one of the many compute nodes, and when done, the results are stored to file. Calling value() will read the results back into R.

This future backend has a greater latency, because everything has to be queued on a shared job queue. This backend is useful for long running futures and for the huge throughput that an HPC environment can provide.


2.3 Motto and design philosophy

Maximize both the developer’s and the end-user’s control:

  • Developer decides what to parallelize, e.g. future()

  • User decided what parallel backend to use, e.g. plan()

Rule of thumb for developers: Don’t make assumptions about the user’s R environment, e.g. you might have a fast machine with 96 CPU cores, but they might have access to a large multi-machine compute cluster with thousands of cores. So, let the user decide on the plan() and do not set it inside your functions or packages.

2.4 Demo: ggplot2 remotely

library(ggplot2)
library(future)
plan(cluster, workers = "remote.server.org")

f <- future({
  ggplot(mpg, aes(displ, hwy, colour = class)) + 
  geom_point()
})
 
gg <- value(f)
print(gg)