Lab: Futureverse 2

Speed up your code through parallel computing
Author

Henrik Bengtsson

Note

This is the second of two parallelization labs. It will take you through Futureverse functions that you and others are likely to use to parallelize R code. We will cover the future.apply package, furrr package, and foreach with the doFuture package. In your R career, you can pick whichever you prefer - they are all equally good.

You are highly encouraged to test things out yourself and tweak things to figure out how these methods behave.

Slides: ‘Futureverse: A Unifying Parallelization Framework in R for Everyone - Part 2’

Install

It is assumed that you have already installed common Futureverse packages in Lab 1. If not, install them now as:

install.packages("futureverse")

We will also use a less-slow version of slow_sum();

library(future)
library(progressr)

slow_sum <- function(x) {
  sum <- 0
  for (value in x) {
    Sys.sleep(0.1)     ## 0.1 second slowdown per value
    sum <- sum + value
  }
  sum
}

Exercises

Recap from Lab 1

In the first part - Lab 1 - we learned about the future() and value() functions part of the future package. They allow us to run independent tasks like:

xs <- list(1:25, 26:50, 51:75, 76:100)
a <- slow_sum(xs[[1]])
b <- slow_sum(xs[[2]])
c <- slow_sum(xs[[3]])
d <- slow_sum(xs[[4]])
y <- a + b + c + d
y
[1] 5050

in parallel. We learned how to do:

library(future)
plan(multisession, workers = 4)

xs <- list(1:25, 26:50, 51:75, 76:100)
fa <- future(slow_sum(xs[[1]]))
fb <- future(slow_sum(xs[[2]]))
fc <- future(slow_sum(xs[[3]]))
fd <- future(slow_sum(xs[[4]]))
y <- value(fa) + value(fb) + value(fc) + value(fd)
y
[1] 5050

We then learned how to generalize this to a for-loop, by realizing we can do:

library(future)
plan(multisession, workers = 4)

xs <- list(1:25, 26:50, 51:75, 76:100)

fs <- list()
fs[[1]] <- future(slow_sum(xs[[1]]))
fs[[2]] <- future(slow_sum(xs[[2]]))
fs[[3]] <- future(slow_sum(xs[[3]]))
fs[[4]] <- future(slow_sum(xs[[4]]))

ys <- list()
ys[[1]] <- value(fs[[1]])
ys[[2]] <- value(fs[[2]])
ys[[3]] <- value(fs[[3]])
ys[[4]] <- value(fs[[4]])

ys <- unlist(ys)
y <- sum(ys)
y
[1] 5050

and then simplify as:

library(future)
plan(multisession, workers = 4)

xs <- list(1:25, 26:50, 51:75, 76:100)

fs <- list()
for (ii in seq_along(xs)) {
  fs[[ii]] <- future(slow_sum(xs[[ii]]))
}

ys <- list()
for (ii in seq_along(fs)) {
  ys[[ii]] <- value(fs[[ii]])
}

ys <- unlist(ys)
y <- sum(ys)
y
[1] 5050

We then got rid of the for-loops in the auxillary index ii, by using lapply():

library(future)
plan(multisession, workers = 4)

xs <- list(1:25, 26:50, 51:75, 76:100)

fs <- lapply(xs, function(x) { future(slow_sum(x)) })
ys <- lapply(fs, value)

ys <- unlist(ys)
y <- sum(ys)
y
[1] 5050

Finally, we turned this into a utility function:

parallel_lapply <- function(X, FUN) {
  fs <- lapply(X, function(x) {
    future(FUN(x))
  })
  lapply(fs, value)
}

such that we can do:

library(future)
plan(multisession, workers = 4)

xs <- list(1:25, 26:50, 51:75, 76:100)
ys <- parallel_lapply(xs, slow_sum)
ys <- unlist(ys)
y <- sum(ys)
y
[1] 5050

Parallel versions of purrr::map()

Task 1:

Write a parallel_map() function that emulates what the map() function of the purrr package does, while at the same time running in parallel using futures. We want to create a parallel version of:

library(purrr)
xs <- list(1:25, 26:50, 51:75, 76:100)
ys <- map(xs, slow_sum)
ys <- unlist(ys)
y <- sum(ys)
y
[1] 5050

We want to use the same argument names as map();

args(map)
function (.x, .f, ..., .progress = FALSE) 
NULL

so that users of our parallel_map() will feel at home. For simplicity, you can ignore arguments ... and .progress. So, let’s create a function:

parallel_map <- function(.x, .f) {
  ## something here
}

I recommend that you modify the existing parallel_lapply(). Verify that it works with:

library(future)
plan(multisession, workers = 4)

xs <- list(1:25, 26:50, 51:75, 76:100)
ys <- parallel_map(xs, slow_sum)
ys <- unlist(ys)
y <- sum(ys)
y
Solution
library(purrr)

parallel_map <- function(.x, .f) {
  fs <- map(.x, function(x) {
    future(.f(x))
  })
  map(fs, value)
}

Task 2:

Just like lapply() and map() return list, parallel_lapply() and parallel_map() return lists. But, as in our example, it’s common that one wants the atomic vector version of it, which is why we do:

ys <- unlist(ys)
ys
[1]  325  950 1575 2200

Having to call this each time is tedious and adds friction and noise to our code. When not parallelizing, we can use purrr’s map_dbl() to achieve the same in a one go;

library(purrr)
xs <- list(1:25, 26:50, 51:75, 76:100)
ys <- map_dbl(xs, slow_sum)
y <- sum(ys)
y
[1] 5050

Write your own parallel_map_dbl() that achieves the same, but via futures, so that you can run:

library(purrr)
xs <- list(1:25, 26:50, 51:75, 76:100)
ys <- parallel_map_dbl(xs, slow_sum)
y <- sum(ys)
y

Hint: Don’t use unlist() - instead make use of map_dbl(). But think carefully where in your function you want to use map_dbl().

Solution
library(purrr)

parallel_map_dbl <- function(.x, .f) {
  fs <- map(.x, function(x) {
    future(.f(x))
  })
  map_dbl(fs, value)
}

By now, you probably have one map() and one map_dbl() inside your function. It is helpful to point out that it is the map_dbl() one that makes parallel_map_dbl() emulate what purrr::map_dbl() does. The other map() is just used to create our futures and put them in a list. We could equally well use lapply() for that. We could even use a for loop as we used in Lab 1. Because of this, all of the following alternative solutions work equally well:

Solution 1
parallel_map_dbl <- function(.x, .f) {
  fs <- purrr::map(.x, function(x) {
    future(.f(x))
  })
  purrr::map_dbl(fs, value)
}
Solution 2
parallel_map_dbl <- function(.x, .f) {
  fs <- lapply(.x, function(x) {
    future(.f(x))
  })
  purrr::map_dbl(fs, value)
}
Solution 3
parallel_map_dbl <- function(.x, .f) {
  fs <- list()
  for (ii in seq_along(X)) {
    x <- .x[[ii]]
    fs[[ii]] <- future(.f(x))
  }
  purrr::map_dbl(fs, value)
}

Things that are problematic

Task 3:

Run the following:

xs <- list(1:25, 26:50, 51:75, 76:100)
ys <- list()
purrr::map(seq_along(xs), function(ii) {
  ys[[ii]] <- slow_sum(xs[[ii]])
})
ys

Why doesn’t it work?

Task 4:

Do you think the following can be parallelized?

ys <- list(0)   ## initialize with zero
for (ii in 2:length(xs)) {
  x <- xs[[ii]]
  y <- ys[[ii - 1]]
  ys[[ii]] <- slow_sum(x + y)
}

Errors and parallel processing

The Futureverse has been designed such that your experience running parallel code will be as close as possible to when you run regular, sequential code. For example, if we call:

x <- "1.2"
y <- log(x)
Error in log(x): non-numeric argument to mathematical function

we get an error.

Task 5:

Try the with a future() call and a value() call. Start by calling:

f <- future(log(x))

Did you get an error or not? What could be the reason for that?

Next, ask for the value of the future;

y <- value(f)

What happens?

Task 6:

Ask for the value one more time;

y <- value(f)

What happens now? What if you keep calling value(f) over and over?

Task 7:

If we use purrr as in:

library(purrr)

xs <- list("1.2", 42, 3.14)
y <- map_dbl(xs, log)
Error in `map_dbl()`:
ℹ In index: 1.
Caused by error:
! non-numeric argument to mathematical function

we get an error, because the first element of the xs list holds a string instead of a numeric value. That is what the error message tries to explain to us.

Let’s try with furrr and future_map_dbl() function from above.

library(furrr)
plan(multisession, workers = 4)

xs <- list("1.2", 42, 3.14)
y <- future_map_dbl(xs, log)

Does it behave as you expected? Do you notice anything different? If so, let’s talk about it.

Note

At first, it might appear obvious that we should get an error in these cases and that it will look the same as when running regular sequential code. But rest assured, Futureverse is the only parallel framework that behave this way. If you use one of the traditional frameworks you will get a different type of error, or not an error at all. This is the case for parLapply() and mclapply() of parallel.

Task 8:

Next, try the same but with mclapply() of the parallel package;

library(parallel)

xs <- list("1.2", 42)
ys <- mclapply(xs, log)
print(ys)

What happened - did you get an error? With the behavior you observed, would you be able figure out what is wrong? Also, what is the risk with the current behavior?

Task 9:

Next, try the same but with parLapply() of the parallel package;

library(parallel)
workers <- makeCluster(4)

xs <- list("1.2", 42)
ys <- parLapply(xs, log, cl = workers)
print(ys)

stopCluster(workers)

What happened - did you get an error? With the behavior you observed, would you be able figure out what is wrong?

Warnings and parallel processing

Just like errors, warnings are signalled as-is when parallelizing via futures.

Task 10:

library(furrr)
plan(multisession, workers = 4)

xs <- list(42, -1.2, 3.14)
ys <- future_map(xs, log)
ys

Did you get a warning?

Task 11:

Try the same with mclapply();

library(parallel)
xs <- list(42, -1.2, 3.14)
ys <- mclapply(xs, log)
ys

Did you get a warning?

Then, try with parLapply();

library(parallel)
workers <- makeCluster(4)
xs <- list(42, -1.2, 3.14)
ys <- parLapply(xs, log, cl = workers)
ys
stopCluster(workers)

Did you get a warning?

Note

Futureverse is the only parallel framework that relays errors, warnings, messages, and output from parallel workers wherever they run in the world back to your R console.

Progress updates

You can generate progress updates using the progressr package.

Task 12:

Create the following:

library(progressr)

slow_sum <- function(x) {
  p <- progressor(along = x)  ## create progressor of length(x)
  
  sum <- 0
  for (value in x) {
    p()                       ## signal progress
    Sys.sleep(1.0)
    sum <- sum + value
  }
  
  sum
}

Then call:

y <- slow_sum(1:5)

What happened?

Task 13:

Nothing happened, because we never told progressr we, as end-users, are interested in the progress updates. To do that, we need to “subscribe” to the progress events, which we can do by calling:

progressr::handlers(global = TRUE)

once at the top of our R script.

After this, retry with:

y <- slow_sum(1:5)

Task 14:

If you run R from RStudio, the default progress bar is reported using the built-in RStudio progress bar. If you run R from the terminal or in VSCode, the default progress report uses an old-fashioned progress bar that is built-in to R. We could tweak it to be a little bit more colorful:

progressr::handlers(
  progressr::handler_txtprogressbar(char = cli::col_red(cli::symbol$heart))
)

and call

y <- slow_sum(1:5)

Task 15:

There are other ways to report on progress too. The cli package generates colorful, nice looking progress bars in the terminal. Try with:

progressr::handlers("cli")

Task 16:

Let’s try to re-customize the default cli progress bar, e.g.

progressr::handlers(
  progressr::handler_cli(format = "{cli::pb_spin} {cli::pb_bar} {cli::pb_current}/{cli::pb_total} {cli::pb_status}")
)

and call

y <- slow_sum(1:5)

Task 17:

Thus far we have done progress reporting when running sequentially, but progressr works also when running in parallel using Futureverse.

Let’s start by creating a utility function:

slow_sum_all <- function(xs) {
  p <- progressr::progressor(along = xs)
  y <- furrr::future_map_dbl(xs, function(x) {
    sum <- slow_sum(x)
    p()
    sum
  })
  sum(y)
}

that we can use as:

xs <- list(1:10, 11:40, 41:60, 61:100)
y <- slow_sum_all(xs)
y

Now, run it in parallel with two parallel workers. Pay attention to processing time and progress bar.

Task 18:

Retry with four parallel workers. Then go back to sequential processing.