6  The Future API

Previously, we saw several examples of how to use future assignments (%<-%), e.g.

y1 %<-% slow_sum(1:10)
y2 %<-% slow_sum(11:20)
y1
[1] 55
y2
[1] 155

for performing multiple tasks concurrently. That %<-% assignment operator doing a lot of things under the hood. A more explicit way of implementing this would be to use the future() and value() functions, as in:

f1 <- future(slow_sum(1:10))
f2 <- future(slow_sum(11:20))
y1 <- value(f1)
y1
[1] 55
y2 <- value(f2)
y2
[1] 155

The future() and value() are two of three core functions part of the Future API, which we will go into details next.

6.1 Three atomic building blocks

There are three atomic building blocks part of the Future API that do everything we need for performing tasks concurrently:

  • f <- future(expr) : evaluates an expression via a future (non-blocking, if possible)

  • r <- resolved(f) : TRUE if future is resolved, otherwise FALSE (non-blocking)

  • v <- value(f) : the value of the future expression expr (blocking until resolved)

where expr is an R expression. Here are three examples:

f <- future(1 + 2)

f <- future(slow_sum(1:10))

f <- future({
  x <- rnorm(10)
  sum(x)
})

6.1.1 Mental model: The Future API decouples a regular R assignment into two parts

Let’s consider a regular assignment in R:

v <- expr

To use, this assignment is single operator, but internally it’s done in two steps:

  1. R evaluates the expression expr on the right-hand side (RHS), and

  2. assigns the resulting value to the variable v on the left-hand side (LHS).

We can think of the Future API as decoupling these two steps and giving us full access to them:

f <- future(expr)
v <- value(f)

This decoupling is the key feature of all parallel processing!

Let’s break this down using our example very slow, implementation of sum();

slow_sum <- function(x) {
  sum <- 0
  
  for (value in x) {
    Sys.sleep(1.0)  ## one-second slowdown per value
    sum <- sum + value
  }
  
  sum
}

For example, if we call:

x <- 1:10
v <- slow_sum(x)
v
[1] 55

it takes ten seconds to complete.

We can evaluate this via a future running in the background as:

library(future)
plan(multisession) # evaluate futures in parallel

x <- 1:10
f <- future(slow_sum(x))
v <- value(f)

When we call:

f <- future(slow_sum(x))

then:

  1. a future is created, comprising:

    • the R expression slow_sum(x),

    • function slow_sum(), and

    • integer vector x

  2. These future components are sent to a parallel worker, which starts evaluating the R expression

  3. The future() function returns immediately a reference f to the future, and before the future evaluation is completed

When we call:

v <- value(f)

then:

  1. the future asks the worker if it’s ready or not (using resolved() internally)

  2. if it is not ready, then it waits until it’s ready (blocking)

  3. when ready, the results are collected from the worker

  4. the value of the expression is returned

As we saw before, there is nothing preventing us from doing other things in-between creating the future and asking for its value, e.g.

x <- 1:10

## Create future
f <- future(slow_sum(x))

## We are free to do whatever we want while future is running, e.g.
z <- sd(x)

## Wait for future to be done
v <- value(f)

6.1.2 Keep doing other things while waiting

We can use the resolved() function to check whether the future is resolved or not. If not, we can choose to do other things, e.g. output a message:

f <- future(slow_sum(x))

while (!resolved(f)) {
  message("Waiting ...")
  Sys.sleep(1.0)
}
Waiting ...
Waiting ...
Waiting ...
Waiting ...
Waiting ...
Waiting ...
Waiting ...
message("Done!")
Done!
v <- value(f)
v
[1] 55

We can of course do other things than outputting messages, e.g. calculations and checking in on other futures.

6.1.3 Evaluate several things in parallel

There’s nothing preventing us from launching more than one future in the background. For example, we can split the summation of x into two parts, calculate the sum of each part, and then combine the results at the end:

x_head <- head(x, 5)
x_tail <- tail(x, 5)

v1 <- slow_sum(x_head)         ## ~5 secs (blocking)
v2 <- slow_sum(x_tail)         ## ~5 secs (blocking)
v <- v1 + v2
v
[1] 55

We can do the same in parallel:

f1 <- future(slow_sum(x_head)) ## ~5 secs (in parallel)
f2 <- future(slow_sum(x_tail)) ## ~5 secs (in parallel)

## Do other things
z <- sd(x)

v <- value(f1) + value(f2)     ## ready after ~5 secs
v
[1] 55

We can launch as manual parallel futures as we have parallel workers, e.g.

plan(multisession, workers = 8)
nbrOfWorkers()
[1] 8
plan(multisession, workers = 2)
nbrOfWorkers()
[1] 2

If we launch more than this, then the call to future() will block until one of the workers are free again. For example,

plan(multisession, workers = 2)
nbrOfWorkers()
[1] 2
f1 <- future(slow_sum(x_head))
f2 <- future(slow_sum(x_tail))

Immediately after these lines have been be completed by R, both these futures are still unresolved:

resolved(f1)
[1] TRUE
resolved(f2)
[1] FALSE

This is because they need about 5 seconds to complete. If we try to launch another future at this point;

f3 <- future(slow_sum(rev(x)))   ## <= blocks here

it will not return instantly, because it has to wait for one of the parallel workers to be available, i.e. that either of f1 and f2, or both, are resolved.

Immediately after f3 is created, we will see which it was:

resolved(f1)
[1] TRUE
resolved(f2)
[1] TRUE
resolved(f3)
[1] FALSE

If we then call:

value(f1) + value(f2)
[1] 55
value(f3)
[1] 55

6.2 Choosing a parallel backend

Above I showed how the developer can use future(), value() and sometimes resolved() to implement tasks in parallel.

There was also the plan() function, which controls how and where these tasks are processed:

  • plan() - set how and where futures are evaluated

We should the control of this to the end user.

Next, let’s look at a few different parallel backends we can set.

6.2.1 sequential (default)

plan(sequential)

f1 <- future(slow_sum(x_head)) # blocks until done
f2 <- future(slow_sum(x_tail)) # blocks until done

v <- value(f1) + value(f2)

6.2.2 multisession: in parallel on local computer

plan(multisession, workers = 2)

f1 <- future(slow_sum(x_head)) # in the background
f2 <- future(slow_sum(x_tail)) # in the background

v <- value(f1) + value(f2)

What’s happening under the hood is:

workers <- parallelly::makeClusterPSOCK(2)
plan(cluster, workers = workers)

which is very similar to:

workers <- parallel::makeCluster(2)
plan(cluster, workers = workers)

is you happened to have used the parallel package before.

6.2.3 cluster: in parallel on multiple computers

If we have SSH access to other machines with R installed, we can do:

hostnames <- c("pi", "remote.server.org")
plan(cluster, workers = hostnames)

f1 <- future(slow_sum(x_head)) # on either 'pi' or 'remote.server.org'
f2 <- future(slow_sum(x_tail)) # on either 'pi' or 'remote.server.org'

v <- value(f1) + value(f2)

What’s happening under the hood is:

hostnames <- c("pi", "remote.server.org")
workers <- parallelly::makeClusterPSOCK(hostnames)
plan(cluster, workers = workers)

where makeClusterPSOCK() connects to the different machines over SSH using pre-configured SSH keys and reverse tunneling of ports.

The parallelly package is a utility package, part of the Futureverse.

6.2.4 There are other parallel backends and more to come

6.2.4.1 future.callr - parallelize locally using callr

The callr package can evaluate R expressions in the background on your local computer. The future.callr implements a future backend on top of callr, e.g.

plan(future.callr::callr, workers = 4)

This works similarly to plan(multisession, workers = 4), but has the benefit of being able to run more than 125 background workers, which is a limitation of R itself.

6.2.4.2 future.batchtools - parallelize using batchtools

The batchtools package is designed to evaluate R expressions via a, so called, job scheduler. Job schedulers are commonly used on high-performance compute (HPC) clusters, where many users run at the same time. The job scheduler allows them to request slots on the system, which often has tens or hundreds of compute nodes. Common job schedulers are Slurm, SGE, and Torque.

The future.batchtools implements a future backend on top of batchtools.

For example, if you are a user of the Sherlock HPC cluster at Stanford University, you can use:

plan(future.batchtools::batchtools_slurm)

This will cause future() to be submitted to the Slurm job-scheduler queue. When a slot is available, the job is processed on one of the many compute nodes, and when done, the results are stored to file.

Calling resolved() will query Slurm whether the job is completed or not. Internally, the batchtools_slurm backend calls the squeue command to check if the job is done or not.

Calling value() will read the results back into R. The batchtools_slurm backend relies on the file system for this. At the end of each job, the future framework saves the results to file, which then value() reads back.

This future backend has a greater latency, because everything has to be queued on a shared job queue and data and results are communicated via the file system. This backend is useful for long running futures and for the huge throughput that an HPC environment can provide.

6.3 Revisiting the future assignment operator (%<-%)

When we do:

v %<-% expr

the following is done under the hood1:

f <- future(expr)
delayedAssign("v", value(f))

where delayedAssign(name, value) is a function part of R that assigned the value value to variable name, but not until the variable is used. In our case, this means that:

value(f)

is only called if, and only if, we “touch” variable v. This is why:

v %<-% expr

can create a future without blocking.


  1. There is no variable f created; instead it is hidden away using the name ...future.v.↩︎