%<-% slow_sum(1:10)
y1 %<-% slow_sum(11:20) y2
6 The Future API
Previously, we saw several examples of how to use future assignments (%<-%
), e.g.
y1
[1] 55
y2
[1] 155
for performing multiple tasks concurrently. That %<-%
assignment operator doing a lot of things under the hood. A more explicit way of implementing this would be to use the future()
and value()
functions, as in:
<- future(slow_sum(1:10))
f1 <- future(slow_sum(11:20)) f2
<- value(f1)
y1 y1
[1] 55
<- value(f2)
y2 y2
[1] 155
The future()
and value()
are two of three core functions part of the Future API, which we will go into details next.
6.1 Three atomic building blocks
There are three atomic building blocks part of the Future API that do everything we need for performing tasks concurrently:
f <- future(expr)
: evaluates an expression via a future (non-blocking, if possible)r <- resolved(f)
: TRUE if future is resolved, otherwise FALSE (non-blocking)v <- value(f)
: the value of the future expression expr (blocking until resolved)
where expr
is an R expression. Here are three examples:
<- future(1 + 2)
f
<- future(slow_sum(1:10))
f
<- future({
f <- rnorm(10)
x sum(x)
})
6.1.1 Mental model: The Future API decouples a regular R assignment into two parts
Let’s consider a regular assignment in R:
<- expr v
To use, this assignment is single operator, but internally it’s done in two steps:
R evaluates the expression
expr
on the right-hand side (RHS), andassigns the resulting value to the variable
v
on the left-hand side (LHS).
We can think of the Future API as decoupling these two steps and giving us full access to them:
<- future(expr)
f <- value(f) v
This decoupling is the key feature of all parallel processing!
Let’s break this down using our example very slow, implementation of sum()
;
<- function(x) {
slow_sum <- 0
sum
for (value in x) {
Sys.sleep(1.0) ## one-second slowdown per value
<- sum + value
sum
}
sum }
For example, if we call:
<- 1:10
x <- slow_sum(x)
v v
[1] 55
it takes ten seconds to complete.
We can evaluate this via a future running in the background as:
library(future)
plan(multisession) # evaluate futures in parallel
<- 1:10
x <- future(slow_sum(x))
f <- value(f) v
When we call:
<- future(slow_sum(x)) f
then:
a future is created, comprising:
the R expression
slow_sum(x)
,function
slow_sum()
, andinteger vector
x
These future components are sent to a parallel worker, which starts evaluating the R expression
The
future()
function returns immediately a referencef
to the future, and before the future evaluation is completed
When we call:
<- value(f) v
then:
the future asks the worker if it’s ready or not (using
resolved()
internally)if it is not ready, then it waits until it’s ready (blocking)
when ready, the results are collected from the worker
the value of the expression is returned
As we saw before, there is nothing preventing us from doing other things in-between creating the future and asking for its value, e.g.
<- 1:10
x
## Create future
<- future(slow_sum(x))
f
## We are free to do whatever we want while future is running, e.g.
<- sd(x)
z
## Wait for future to be done
<- value(f) v
6.1.2 Keep doing other things while waiting
We can use the resolved()
function to check whether the future is resolved or not. If not, we can choose to do other things, e.g. output a message:
<- future(slow_sum(x))
f
while (!resolved(f)) {
message("Waiting ...")
Sys.sleep(1.0)
}
Waiting ...
Waiting ...
Waiting ...
Waiting ...
Waiting ...
Waiting ...
Waiting ...
Waiting ...
message("Done!")
Done!
<- value(f)
v v
[1] 55
We can of course do other things than outputting messages, e.g. calculations and checking in on other futures.
6.1.3 Evaluate several things in parallel
There’s nothing preventing us from launching more than one future in the background. For example, we can split the summation of x
into two parts, calculate the sum of each part, and then combine the results at the end:
<- head(x, 5)
x_head <- tail(x, 5)
x_tail
<- slow_sum(x_head) ## ~5 secs (blocking)
v1 <- slow_sum(x_tail) ## ~5 secs (blocking)
v2 <- v1 + v2
v v
[1] 55
We can do the same in parallel:
<- future(slow_sum(x_head)) ## ~5 secs (in parallel)
f1 <- future(slow_sum(x_tail)) ## ~5 secs (in parallel)
f2
## Do other things
<- sd(x)
z
<- value(f1) + value(f2) ## ready after ~5 secs
v v
[1] 55
We can launch as manual parallel futures as we have parallel workers, e.g.
plan(multisession, workers = 8)
nbrOfWorkers()
[1] 8
plan(multisession, workers = 2)
nbrOfWorkers()
[1] 2
If we launch more than this, then the call to future()
will block until one of the workers are free again. For example,
plan(multisession, workers = 2)
nbrOfWorkers()
[1] 2
<- future(slow_sum(x_head))
f1 <- future(slow_sum(x_tail)) f2
Immediately after these lines have been be completed by R, both these futures are still unresolved:
resolved(f1)
[1] TRUE
resolved(f2)
[1] FALSE
This is because they need about 5 seconds to complete. If we try to launch another future at this point;
<- future(slow_sum(rev(x))) ## <= blocks here f3
it will not return instantly, because it has to wait for one of the parallel workers to be available, i.e. that either of f1
and f2
, or both, are resolved.
Immediately after f3
is created, we will see which it was:
resolved(f1)
[1] TRUE
resolved(f2)
[1] TRUE
resolved(f3)
[1] FALSE
If we then call:
value(f1) + value(f2)
[1] 55
value(f3)
[1] 55
6.2 Choosing a parallel backend
Above I showed how the developer can use future()
, value()
and sometimes resolved()
to implement tasks in parallel.
There was also the plan()
function, which controls how and where these tasks are processed:
plan()
- set how and where futures are evaluated
We should the control of this to the end user.
Next, let’s look at a few different parallel backends we can set.
6.2.1 sequential (default)
plan(sequential)
<- future(slow_sum(x_head)) # blocks until done
f1 <- future(slow_sum(x_tail)) # blocks until done
f2
<- value(f1) + value(f2) v
6.2.2 multisession: in parallel on local computer
plan(multisession, workers = 2)
<- future(slow_sum(x_head)) # in the background
f1 <- future(slow_sum(x_tail)) # in the background
f2
<- value(f1) + value(f2) v
What’s happening under the hood is:
<- parallelly::makeClusterPSOCK(2)
workers plan(cluster, workers = workers)
which is very similar to:
<- parallel::makeCluster(2)
workers plan(cluster, workers = workers)
is you happened to have used the parallel package before.
6.2.3 cluster: in parallel on multiple computers
If we have SSH access to other machines with R installed, we can do:
<- c("pi", "remote.server.org")
hostnames plan(cluster, workers = hostnames)
<- future(slow_sum(x_head)) # on either 'pi' or 'remote.server.org'
f1 <- future(slow_sum(x_tail)) # on either 'pi' or 'remote.server.org'
f2
<- value(f1) + value(f2) v
What’s happening under the hood is:
<- c("pi", "remote.server.org")
hostnames <- parallelly::makeClusterPSOCK(hostnames)
workers plan(cluster, workers = workers)
where makeClusterPSOCK()
connects to the different machines over SSH using pre-configured SSH keys and reverse tunneling of ports.
The parallelly package is a utility package, part of the Futureverse.
6.2.4 There are other parallel backends and more to come
6.2.4.1 future.callr - parallelize locally using callr
The callr package can evaluate R expressions in the background on your local computer. The future.callr implements a future backend on top of callr, e.g.
plan(future.callr::callr, workers = 4)
This works similarly to plan(multisession, workers = 4)
, but has the benefit of being able to run more than 125 background workers, which is a limitation of R itself.
6.2.4.2 future.batchtools - parallelize using batchtools
The batchtools package is designed to evaluate R expressions via a, so called, job scheduler. Job schedulers are commonly used on high-performance compute (HPC) clusters, where many users run at the same time. The job scheduler allows them to request slots on the system, which often has tens or hundreds of compute nodes. Common job schedulers are Slurm, SGE, and Torque.
The future.batchtools implements a future backend on top of batchtools.
For example, if you are a user of the Sherlock HPC cluster at Stanford University, you can use:
plan(future.batchtools::batchtools_slurm)
This will cause future()
to be submitted to the Slurm job-scheduler queue. When a slot is available, the job is processed on one of the many compute nodes, and when done, the results are stored to file.
Calling resolved()
will query Slurm whether the job is completed or not. Internally, the batchtools_slurm
backend calls the squeue
command to check if the job is done or not.
Calling value()
will read the results back into R. The batchtools_slurm
backend relies on the file system for this. At the end of each job, the future framework saves the results to file, which then value()
reads back.
This future backend has a greater latency, because everything has to be queued on a shared job queue and data and results are communicated via the file system. This backend is useful for long running futures and for the huge throughput that an HPC environment can provide.
6.3 Revisiting the future assignment operator (%<-%
)
When we do:
%<-% expr v
the following is done under the hood1:
<- future(expr)
f delayedAssign("v", value(f))
where delayedAssign(name, value)
is a function part of R that assigned the value value
to variable name
, but not until the variable is used. In our case, this means that:
value(f)
is only called if, and only if, we “touch” variable v
. This is why:
%<-% expr v
can create a future without blocking.
There is no variable
f
created; instead it is hidden away using the name...future.v
.↩︎