Part 2 The core Future API
2.1 Three atomic building blocks
There are three atomic building blocks that do everything we need:
f <- future(expr)
: evaluates an expression via a future (non-blocking, if possible)r <- resolved(f)
: TRUE if future is resolved, otherwise FALSE (non-blocking)v <- value(f)
: the value of the future expression expr (blocking until resolved)
2.1.1 Mental model: The Future API decouples a regular R assignment into two parts
Let’s consider a regular assignment in R:
To us, this assignment is single operator, but internally it’s done in two steps:
- R evaluates the expression
expr
on the right-hand side (RHS), and - assigns the resulting value to the variable
v
on the left-hand side (LHS).
We can think of the Future API as decoupling these two steps and giving us full access to them:
This decoupling is the key feature of all parallel processing!
Let’s break this down using a simple example. Consider the following, very slow, implementation of sum()
;
slow_sum <- function(x) {
sum <- 0
for (kk in seq_along(x)) {
sum <- sum + x[kk]
Sys.sleep(0.1) # emulate 0.1 second cost per addition
}
sum
}
For example, if we call:
it takes ten seconds to complete.
We can evaluate this via a future running in the background as:
library(future)
plan(multisession) # evaluate futures in parallel
x <- 1:100
f <- future(slow_sum(x))
v <- value(f)
When we call:
then:
- a future is created, comprising:
- the R expression
slow_sum(x)
, - function
slow_sum()
, and - integer vector
x
- the R expression
- These future components are sent to a parallel worker, which starts evaluating the R expression
- The
future()
function returns immediately a referencef
to the future, and before the future evaluation is completed
When we call:
then:
- the future asks the worker if it’s ready or not (using
resolved()
internally) - if it is not ready, then it waits until it’s ready (blocking)
- when ready, the results are collected from the worker
- the value of the expression is returned
As we saw before, there is nothing preventing us from doing other things in-between creating the future and asking for its value, e.g.
2.1.2 Keep doing other things while waiting
We can use the resolved()
function to check whether the future is
resolved or not. If not, we can choose to do other things,
e.g. output a message:
f <- future(slow_sum(x))
while (!resolved(f)) {
message("Waiting ...")
Sys.sleep(1.0)
}
message("Done!")
#> Waiting ...
#> Waiting ...
#> Waiting ...
#> ...
#> Waiting ...
#> Done!
v <- value(f)
v
#> [1] 5050
We can of course do other things than outputting messages, e.g. calculations and checking in on other futures.
2.1.3 Evaluate several things in parallel
There’s nothing preventing us from launching more than one future in the background. For example, we can split the summation of x
into two parts, calculate the sum of each part, and then combine the results at the end:
x_head <- head(x, 50)
x_tail <- tail(x, 50)
v1 <- slow_sum(x_head) ## ~5 secs (blocking)
v2 <- slow_sum(x_tail) ## ~5 secs (blocking)
v <- v1 + v2
We can do the same in parallel:
f1 <- future(slow_sum(x_head)) ## ~5 secs (in parallel)
f2 <- future(slow_sum(x_tail)) ## ~5 secs (in parallel)
## Do other things
z <- sd(x)
v <- value(f1) + value(f2) ## ready after ~5 secs
We can launch as manual parallel futures as we have parallel workers, e.g.
plan(multisession, workers = 8)
nbrOfWorkers()
#> [1] 8
plan(multisession, workers = 2)
nbrOfWorkers()
#> [1] 2
If we launch more than this, then the call to future()
will block
until one of the workers are free again, e.g.
2.2 Choosing parallel backend
plan()
- set how and where futures are evaluated
2.2.2 multisession: in parallel on local computer
plan(multisession, workers = 2)
f1 <- future(slow_sum(x_head)) # in the background
f2 <- future(slow_sum(x_tail)) # in the background
v <- value(f1) + value(f2)
What’s happening under the hood is(*):
which is very similar to:
(*) It actually does makeClusterPSOCK(2, rscript_libs = .libPaths())
, which gives a smoother ride in some R setups, e.g. RStudio Connect.
2.2.3 cluster: in parallel on multiple computers
If we have SSH access to other machines with R installed, we can do:
hostnames <- c("pi", "remote.server.org")
plan(cluster, workers = hostnames)
f1 <- future(slow_sum(x_head)) # on either 'pi' or 'remote.server.org'
f2 <- future(slow_sum(x_tail)) # on either 'pi' or 'remote.server.org'
v <- value(f1) + value(f2)
What’s happening under the hood is:
hostnames <- c("pi", "remote.server.org")
workers <- parallelly::makeClusterPSOCK(hostnames)
plan(cluster, workers = workers)
where makeClusterPSOCK()
connects to the different machines over SSH using pre-configured SSH keys and reverse tunneling of ports.
FYI, if you would try to do this with the parallel package, you need to:
- know your public IP number
- open up your incoming firewall (requires admin rights + security risk)
- configure port forwarding from public IP number to your local machine (requires admin rights)
e.g.
This is one reason why the parallelly package was created - “it just works”;
The parallelly package is a utility package, part of the futureverse.
2.2.4 There are other parallel backends and more to come
2.2.4.1 future.callr - parallelize locally using callr
The callr package can evaluate R expressions in the background on your local computer. The future.callr implements a future backend on top of callr, e.g.
This works similarly to plan(multisession, workers = 4)
, but has the benefit of being able to run more than 125 background workers, which is a limitation of R itself.
2.2.4.2 future.batchtools - parallelize using batchtools
The batchtools package is designed to evaluate R expressions via a, so called, job scheduler. Job schedulers are commonly used on high-performance compute (HPC) clusters, where many users run at the same time. The job scheduler allows them to request slots on the system, which often has tens or hundreds of compute nodes. Common job schedulers are Slurm, SGE, and Torque.
The future.batchtools implements a future backend on top of batchtools, e.g.
This will cause future()
to be submitted to the job scheduler’s queue. When a slot is available, the job is processed on one of the many compute nodes, and when done, the results are stored to file. Calling value()
will read the results back into R.
This future backend has a greater latency, because everything has to be queued on a shared job queue. This backend is useful for long running futures and for the huge throughput that an HPC environment can provide.
2.3 Motto and design philosophy
Maximize both the developer’s and the end-user’s control:
Developer decides what to parallelize, e.g.
future()
User decided what parallel backend to use, e.g.
plan()
Rule of thumb for developers: Don’t make assumptions about the user’s R environment, e.g. you might have a fast machine with 96 CPU cores, but they might have access to a large multi-machine compute cluster with thousands of cores. So, let the user decide on the plan()
and do not set it inside your functions or packages.