Part 8 Appendix

8.1 Appendix: Exception handling by other parallel map-reduce APIs

The regular sqrt() function gives a warning if we try with a negative number, e.g.

sqrt(-1)
#> [1] NaN
#> Warning message:
#> In sqrt(-1) : NaNs produced

If we want to be more strict, we can define:

strict_sqrt <- function(x) {
  if (x < 0) {
    stop(errorCondition(
      paste("sqrt(x) with x < 0 not allowed:", x), 
      call = sys.call(),
      class = "strict_error")
    )
  }
  sqrt(x)
}

which gives:

strict_sqrt(-1)
#> Error in strict_sqrt(-1) : sqrt(x) with x < 0 not allowed: -1

Let’s see how this behaves with different map-reduce functions.

Our reference, base-R lapply():

X <- -1:2
y <- lapply(X, strict_sqrt)
#> Error in FUN(X[[i]], ...) : sqrt(x) with x < 0 not allowed: -1

Details:

tryCatch(y <- lapply(X, strict_sqrt), error = str)
#> List of 2
#>  $ message: chr "sqrt(x) with x < 0 not allowed: -1"
#>  $ call   : language FUN(X[[i]], ...)
#>  - attr(*, "class")= chr [1:3] "strict_error" "error" "condition"

future.apply (as expected):

library(future.apply)
plan(multisession, workers = 2)

X <- -1:2
y <- future_lapply(X, strict_sqrt)
#> Error in ...future.FUN(...future.X_jj, ...) : 
#>   sqrt(x) with x < 0 not allowed: -1

Details:

tryCatch(y <- future_lapply(X, strict_sqrt), error = str)
#> List of 2
#>  $ message: chr "sqrt(x) with x < 0 not allowed: -1"
#>  $ call   : language ...future.FUN(...future.X_jj, ...)
#>  - attr(*, "class")= chr [1:3] "strict_error" "error" "condition"

Note how the error message is preserved and also the error class (strict_error).

furrr (as expected):

library(furrr)
plan(multisession, workers = 2)
y <- future_map(X, strict_sqrt)
#> Error in ...furrr_fn(...) : sqrt(x) with x < 0 not allowed: -1

Details:

tryCatch(y <- future_map(X, strict_sqrt), error = str)
#> List of 2
#>  $ message: chr "sqrt(x) with x < 0 not allowed: -1"
#>  $ call   : language ...furrr_fn(...)
#>  - attr(*, "class")= chr [1:3] "strict_error" "error" "condition"

Note how the error message is preserved and also the error class (strict_error).

In contrast, parallel and parLapply():

library(parallel)
cl <- makePSOCKcluster(2)
clusterExport(cl, "strict_sqrt")

y <- parLapply(X, strict_sqrt, cl = cl)
#> Error in checkForRemoteErrors(val) : 
#>   one node produced an error: sqrt(x) with x < 0 not allowed: -1

Details:

tryCatch(y <- parLapply(X, strict_sqrt, cl = workers), error = str)
#> List of 2
#>  $ message: chr "one node produced an error: sqrt(x) with x < 0 not allowed: -1"
#>  $ call   : language checkForRemoteErrors(val)
#>  - attr(*, "class")= chr [1:3] "simpleError" "error" "condition"

Note how:

  1. the error message has changed
  2. we lost the information on error class, i.e. strict_error

parallel and mclapply():

library(parallel)
options(mc.cores = 2)

y <- parallel::mclapply(X, strict_sqrt)
#> Warning message:
#> In parallel::mclapply(X, strict_sqrt) :
#>   all scheduled cores encountered errors in user code

Here we didn’t even get an error - only a warning. We have to inspect the results to detect errors, e.g.

str(y)
#> List of 4
#>  $ : 'try-error' chr "Error in FUN(X[[i]], ...) : sqrt(x) with x < 0 not allowed: -1\n"
#>   ..- attr(*, "condition")=List of 2
#>   .. ..$ message: chr "sqrt(x) with x < 0 not allowed: -1"
#>   .. ..$ call   : language FUN(X[[i]], ...)
#>   .. ..- attr(*, "class")= chr [1:3] "strict_error" "error" "condition"
#>  $ : num 0
#>  $ : 'try-error' chr "Error in FUN(X[[i]], ...) : sqrt(x) with x < 0 not allowed: -1\n"
#>   ..- attr(*, "condition")=List of 2
#>   .. ..$ message: chr "sqrt(x) with x < 0 not allowed: -1"
#>   .. ..$ call   : language FUN(X[[i]], ...)
#>   .. ..- attr(*, "class")= chr [1:3] "strict_error" "error" "condition"
#>  $ : num 1.41

Do detect the errors, we have to scan the results;

## Check for errors
is_error <- vapply(y, inherits, "try-error", FUN.VALUE = NA)
if (any(is_error)) {
  ## error objects are stored in attributes
  first_error <- attr(y[is_error][[1]], "condition")
  stop("Detected one or more errors: ", conditionMessage(first_error))
}

This a low-level, powerful feature, but it adds lots of friction and requires much more work to make sure things are correct. We cannot just use the value of mclapply(...) as-is, but we need to postprocess it to make sure we catch any errors and handle them correctly.

Also:

  1. Note how elements X[2] and X[4] where processed successfully. This is because they were process together on one of the parallel workers.
  2. In contrast, X[1] and X[3] where processed by another worker, both together as lapply(X[c(1,3)], strict_sqrt), which results in a “combined” for both.

Confusing? Yes!

foreach:

library(foreach)
doFuture::registerDoFuture()
plan(multisession, workers = 2)

y <- foreach(x = X) %dopar% strict_sqrt(x)
#> Error in { : task 1 failed - "sqrt(x) with x < 0 not allowed: -1"

Just like with parallel::parLapply(), we lose important information on the error:

tryCatch(y <- foreach(x = X) %dopar% strict_sqrt(x), error = str)
#> List of 2
#>  $ message: chr "task 1 failed - \"sqrt(x) with x < 0 not allowed: -1\""
#>  $ call   : language {     doFuture::registerDoFuture() ...
#>  - attr(*, "class")= chr [1:3] "simpleError" "error" "condition"

Note how:

  1. the error message has changed
  2. we lost the information on error class, i.e. strict_error

This is per design of %dopar% of the foreach package.

8.2 Appendix: Condition handling by other parallel map-reduce APIs

Here will use warnings to illustrate how conditions are handled by the future framework, and how none of the other parallel frameworks handles them. However, everything in this section apply also to messages, and any other non-error condition type signalled by R.

The regular sqrt() function gives a warning if we try with a negative number. Let’s see how this behaves with different map-reduce functions.

Our reference, base-R lapply():

X <- -1:2
y <- lapply(X, sqrt)
#> Warning message:
#> In FUN(X[[i]], ...) : NaNs produced

Details:

tryCatch(y <- lapply(X, sqrt), warning = str)
#> List of 2
#>  $ message: chr "NaNs produced"
#>  $ call   : language FUN(X[[i]], ...)
#>  - attr(*, "class")= chr [1:3] "simpleWarning" "warning" "condition"

future.apply (works as expected):

library(future.apply)
plan(multisession, workers = 2)

X <- -1:2
y <- future_lapply(X, sqrt)
#> Warning message:
#> In ...future.FUN(...future.X_jj, ...) : NaNs produced

Details:

tryCatch(y <- future_lapply(X, sqrt), warning = str)
#> List of 2
#>  $ message: chr "NaNs produced"
#>  $ call   : language ...future.FUN(...future.X_jj, ...)
#>  - attr(*, "class")= chr [1:3] "simpleWarning" "warning" "condition"

furrr (works as expected):

library(furrr)
plan(multisession, workers = 2)

X <- -1:2
y <- future_map(X, sqrt)
#> Warning message:
#> In .Primitive("sqrt")(x) : NaNs produced

Details:

tryCatch(y <- future_map(X, sqrt), warning = str)
#> List of 2
#>  $ message: chr "NaNs produced"
#>  $ call   : language .Primitive("sqrt")(x)
#>  - attr(*, "class")= chr [1:3] "simpleWarning" "warning" "condition"

foreach with doFuture (works as expected):

library(foreach)
doFuture::registerDoFuture()
plan(multisession, workers = 2)

X <- -1:2
y <- foreach(x = X) %dopar% sqrt(x)
#> Warning message:
#> In sqrt(x) : NaNs produced
tryCatch(y <- foreach(x = X) %dopar% sqrt(x), warning = str)
#> List of 2
#>  $ message: chr "NaNs produced"
#>  $ call   : language sqrt(x)
#>  - attr(*, "class")= chr [1:3] "simpleWarning" "warning" "condition"

foreach with doParallel (does not work):

library(doParallel)
cl <- parallel::makeCluster(2)
registerDoParallel(cl)

X <- -1:2
y <- foreach(x = X) %dopar% sqrt(x)

Note, warnings are not signalled.

foreach with doMC (does not work):

library(doMC)
registerDoMC(2)

X <- -1:2
y <- foreach(x = X) %dopar% sqrt(x)

Note, warnings are not signalled.

parallel and parLapply() (does not work):

library(parallel)
cl <- makePSOCKcluster(2)

X <- -1:2
y <- parLapply(X, sqrt, cl = cl)

Note, warnings are not signalled.

parallel and mclapply() (does not work):

library(parallel)
options(mc.cores = 2)

X <- -1:2
y <- mclapply(X, sqrt)

Note, warnings are not signalled.

8.3 Appendix: Standard output by other parallel map-reduce APIs

TL;DR: It’s only the future framework that captures and relays standard output in the main R session.

Our reference, base-R lapply():

X <- 1:3
void <- lapply(X, print)
#> [1] 1
#> [1] 2
#> [1] 3
output <- capture.output(void <- lapply(X, print))
output
#> [1] "[1] 1" "[1] 2" "[1] 3"

future.apply (works as expected):

library(future.apply)
plan(multisession, workers = 2)

X <- 1:3
void <- future_lapply(X, print)
#> [1] 1
#> [1] 2
#> [1] 3
output <- capture.output(void <- future_lapply(X, print))
output
#> [1] "[1] 1" "[1] 2" "[1] 3"

furrr (works as expected):

library(furrr)
plan(multisession, workers = 2)

X <- 1:3
void <- future_map(X, print)
#> [1] 1
#> [1] 2
#> [1] 3
output <- capture.output(void <- future_map(X, print))
output
#> [1] "[1] 1" "[1] 2" "[1] 3"

foreach w/ doFuture (works as expected):

library(doFuture)
registerDoFuture()
plan(multisession, workers = 2)

X <- 1:3
void <- foreach(x = X) %dopar% print(x)
#> [1] 1
#> [1] 2
#> [1] 3
output <- capture.output({
  void <- foreach(x = X) %dopar% print(x)
})
output
#> [1] "[1] 1" "[1] 2" "[1] 3"

foreach w/ doParallel (doesn’t work):

library(doParallel)
cl <- parallel::makeCluster(2)
registerDoParallel(cl)

X <- 1:3
void <- foreach(x = X) %dopar% print(x)
output <- capture.output({
  void <- foreach(x = X) %dopar% print(x)
})
output
#> character(0)

foreach w/ doMC (doesn’t work):

library(doMC)
registerDoMC(2)

X <- 1:3
void <- foreach(x = X) %dopar% print(x)
#> [1] 1
#> [1] 3
#> [1] 2

We did get some output, but not in order. As we will see next, it’s actually only output to the same terminal but not to the R main session. If you run this in RStudio, you may not see anything. This means, there is nothing to capture;

output <- capture.output({
  void <- foreach(x = X) %dopar% print(x)
})
output
#> character(0)

Thus, the output we see above, does not end up in our main R session.

parallel and mclapply() (doesn’t work):

library(parallel)
options(mc.cores = 2)

X <- 1:3
void <- mclapply(X, print)
#> [1] 1
#> [1] 3
#> [1] 2

We did get some output, but not in order. As we will see next, it’s actually only output to the same terminal but not to the R main session. If you run this in RStudio, you may not see anything. This means, there is nothing to capture;

output <- capture.output({
  void <- mclapply(X, print)
})
output
#> character(0)

Thus, the output we see above, does not end up in our main R session.

parallel and parLapply() (doesn’t work):

library(parallel)
cl <- makePSOCKcluster(2)

X <- 1:3
void <- parLapply(X, print, cl = cl)

No output, and nothing to capture;

output <- capture.output({
  void <- parLapply(X, print, cl = cl)
})
output
#> character(0)

What about the outfile = "" trick?

library(parallel)
cl <- makePSOCKcluster(2, outfile = "")

X <- 1:3
void <- parLapply(X, print, cl = cl)
#> [1] 1
#> [1] 2
#> [1] 3

It turns out also this only outputs to the terminal in which R is running. If you run this in RStudio, you may not see anything. This means, there is nothing to capture;

output <- capture.output({
  void <- parLapply(X, print, cl = workers)
})
output
#> character(0)

8.4 Appendix: Not everything can be parallelized

As explained in https://future.futureverse.org/articles/future-4-non-exportable-objects.html, not all types of objects can be sent to parallel workers. Some objects only work in the R process they were first created in. Below are example of object types from different R packages that cannot be exported to, or returned from, parallel workers.

Package Examples of non-exportable types or classes
base connection (externalptr)
DBI DBIConnection (externalptr)
inline CFunc (externalptr of class DLLHandle)
keras keras.engine.sequential.Sequential (externalptr)
magick magick-image (externalptr)
ncdf4 ncdf4 (custom reference; non-detectable)
parallel cluster and cluster nodes (connection)
raster RasterLayer (externalptr; not all)
Rcpp NativeSymbol (externalptr)
reticulate python.builtin.function (externalptr), python.builtin.module (externalptr)
rJava jclassName (externalptr)
ShortRead FastqFile, FastqStreamer, FastqStreamerList (connection)
sparklyr tbl_spark (externalptr)
terra SpatRaster, SpatVector (externalptr)
udpipe udpipe_model (externalptr)
xgboost xgb.DMatrix (externalptr)
xml2 xml_document (externalptr)

8.4.1 Example: R connections can be exported to parallel workers

library(future)
plan(multisession, workers = 2)

file <- tempfile()
con <- file(file, open = "wb")
cat("hello\n", file = con)
readLines(file)
#> [1] "hello"

f <- future({ cat("world\n", file = con); 42 })
v <- value(f)
readLines(file)
#> [1] "hello"    # <= Huh, where did 'world' end up?!?

It turns out that we are actually silently writing to another R connection on the parallel worker with the same connection index as our temporary file:

as.integer(con)
#> [1] 3
> showConnections()
  description                             class      mode  text     isopen  
3 "/tmp/hb/RtmpZMAQG0/file1ec6b03b87be50" "file"     "wb"  "binary" "opened"
4 "<-localhost:11362"                     "sockconn" "a+b" "binary" "opened"
5 "<-localhost:11362"                     "sockconn" "a+b" "binary" "opened"
  can read can write
3 "no"     "yes"    
4 "yes"    "yes"    
5 "yes"    "yes"  

This is really bad, because we might end up overwriting another file. This is a limitation in R. Here, R should ideally detect this and give an error.

If we create yet another connection, with a higher connection index:

con2 <- file(file, open = "wb")
as.integer(con2)
#> [1] 6

and try to use that in parallel, we get:

f <- future({ cat("world\n", file = con2); 42 })
v <- value(f)
#> Error in cat("world\n", file = con2) : invalid connection

This is because there is no connection on the worker with index 6.

Either, this is not good. This is a problem for all parallelization frameworks in R. There’s no solution to this.

However, for troubleshooting, we can ask the future framework to look for non-exportable objects:

options(future.globals.onReference = "error")

f <- future({ cat("world", file = con2); 42 })
Error: Detected a non-exportable reference ('externalptr') in one of
the globals ('con2' of class 'file') used in the future expression

Note how the problem was detected when creating the future. This prevents damage from happening.

This option is disabled by default, because:

  1. there are some false positive, and
  2. check is expensive

8.4.2 Example: xml2 objects cannot be exported

library(xml2)
xml <- read_xml("<body></body>")

f <- future({ xml_children(xml) })
value(f)
## Error: external pointer is not valid

str(xml)
## List of 2
##  $ node:<externalptr> 
##  $ doc :<externalptr> 
##  - attr(*, "class")= chr [1:2] "xml_document" "xml_node"

As before, we can set an R options for future to look for and report on these problems before trying to parallelize:

library(future)
options(future.globals.onReference = "error")

library(xml2)
xml <- read_xml("<body></body>")

f <- future({ xml_children(xml) })
Error: Detected a non-exportable reference ('externalptr') in one of the
globals ('xml' of class 'xml_document') used in the future expression

8.5 Appendix: Careful with forked parallelization

The parallel::mclapply() function relies on forked parallel processing provided by the operating system;

  1. It works only on Linux and macOS
  2. It does not work on MS Windows, where it falls back to a regular lapply() call

Because it uses forks, parallel::mclapply() is extremely easy to use. For example, you never have to worry about global variables. “It just works!”

The corresponding future backend is multicore, which use the same underlying code base as mclapply(). In other words, using:

library(future.apply)
plan(multicore, workers = 4)

X <- 1:100
z <- future_lapply(X, FUN = slow_sqrt)

is almost the same as using:

library(parallel)
options(mc.cores = 4)

X <- 1:100
z <- mclapply(X, FUN = slow_sqrt)

but with the all the other benefits that comes with the future framework, e.g. errors, warnings, output, and random number generation.

However, it is not always safe to use forked parallelization; you really need to know when and when not to use it, which is complicated.

Simon Urbanek, author of mclapply() and R Core member, wrote:

“Do NOT use mcparallel() in packages except as a non-default option that user can set … Multicore is intended for HPC applications that need to use many cores for computing-heavy jobs, but it does not play well with RStudio and more importantly you [as the developer] don’t know the resource available so only the user can tell you when it’s safe to use.”

In other words, you can only use it reliably in code that you have 100% control over, which is rarely the case, especially not for package authors.

8.6 Appendix: Missing globals

Objects that needs to be exported to parallel workers are called “globals”. They are identified by automatically code inspection. This works most of the time, but there are cases were it might fail. For example, consider:

a <- 1:3
b <- 4:7
c <- 3:5

my_sum <- function(var) { sum(get(var)) }
z <- my_sum(var = "a")

In this case, it is impossible for R to know upfront that var = "a" is going to be used to retrieve the value of a global variable. Because of this, calling:

f <- future(my_sum(var = "a"))
z <- value(f)
#> Error in get(var) : object 'a' not found

fails.

The solution is to guide the future framework to identify a as a global variable. We can do this by adding a dummy use of a, e.g.

f <- future({
  a  ## fake use of 'a'
  
  my_sum(var = "a")
})
z <- value(f)

8.6.1 Example: glue::glue() - object not found

Here’s another, more common example:

library(glue)
a <- 42
s <- glue("The value of a is {a}.")
s
#> The value of a is 42.

If we run this in parallel as-is, the future framework won’t be able to identify a as a needed object;

library(glue)
library(future)
plan(multisession)

a <- 42
f <- future(glue("The value of a is {a}."))
s <- value(f)
Error in eval(parse(text = text, keep.source = FALSE), envir) : 
  object 'a' not found

As before, we can workaround it by:

f <- future({
  a  ## fake use of 'a'
  
  glue("The value of a is {a}.")
})

s <- value(f)
s
#> The value of a is 42.

8.6.2 Example: do.call()

Function do.call() can be used to call a function with a set of arguments. For example,

fcn <- sum
z <- do.call(fcn, args = list(1:10))
z
#> [1] 55

calls fcn(1:10) == sum(1:10). This works in parallel too:

library(future)
plan(multisession)

fcn <- sum
f <- future(do.call(fcn, args = list(1:10)))
z <- value(f)
z
#> [1] 55

As an alternative to a function, do.call() also takes the name of a function as input. For example, we can also do:

fcn <- sum
z <- do.call("fcn", args = list(1:10))
z
#> [1] 55

However, this is like the problem of using get(), as explained above. If we try this in parallel, we get:

library(future)
plan(multisession)

fcn <- sum
f <- future(do.call("fcn", args = list(1:10)))
z <- value(f)
#> Error in fcn(1:10) : could not find function "fcn"

We could declare by adding a dummy fcn, but it’s much better to never pass the name of a function ("fcn") to do.call(); it’s always much better to pass the function object (fcn) itself.

The same is true for apply functions, e.g. use:

z <- lapply(1:10, FUN = sum)

but avoid:

z <- lapply(1:10, FUN = "sum")

8.7 Appendix: Don’t assign to global environment

Assigning to variable outside of a function or in the global environments (e.g. <<- or assign()) does not work when running in parallel. However, it extremely rare you need to do that. Instead,

  • If you find yourself using <<-, it’s a strong hint that you should approach you problem in a different way!

For example, if you find yourself turning:

res <- list()
for (ii in 1:3) {
  res[[ii]] <- letters[ii]
}

into:

res <- list()
lapply(1:3, FUN = function(ii) {
  res[[ii]] <<- letters[ii]
})

then you should stop and think. The correct solution is:

res <- lapply(1:3, FUN = function(ii) {
  letters[ii]
})

This can easily be parallelize by replace lapply() with future_lapply() from the future.apply package.

8.8 Appendix: foreach() is not a for-loop

An common example of the problem explained in Appendix: Don’t assign to global environment happens when using foreach to turn a for-loop into a foreach() call. As before, if you find yourself needing to use <<- in order to turn:

res <- list()
for (ii in 1:3) {
  res[[ii]] <- letters[ii]
}

into

library(doFuture)
registerDoFuture()
plan(multisession)

res <- list()
foreach(ii = 1:3) %dopar% {
  res[[ii]] <<- letters[ii]
}

then the <<- is a strong indication that this should not be done and it won’t work, especially when running in parallel. If you try to run the above in parallel, you will get:

Error in { : task 1 failed - "object 'res' not found"

If you try to export res, e.g.

res <- list()
foreach(ii = 1:3, .export = "res") %dopar% {
  res[[ii]] <<- letters[ii]
}

you’ll find that res is not populated;

res
#> list()

The mistake is believing that foreach() is a replacement to a for-loop. It is not. Repeat after me:

  • foreach() %dopar% { ... } is not a for-loop!
  • foreach() %dopar% { ... } is not a for-loop!
  • foreach() %dopar% { ... } is not a for-loop!

Don’t feel bad if you thought this - you’re not alone, not the first and not the last person to think this. It’s a very common misconception and it’s the name that makes it so tempting to believe it.

Instead, foreach() is much closer to an lapply() call;

  • foreach() %dopar% { ... } is just like lapply() or future_lapply()
  • foreach() %dopar% { ... } is just like lapply() or future_lapply()
  • foreach() %dopar% { ... } is just like lapply() or future_lapply()

What tricks us, is the %dopar% infix operator. It makes foreach() look like a for-loop, although it isn’t one. If the author of foreach wouldn’t have invented %dopar%, they would probably have written foreach() to work like:

res <- foreach(ii = 1:3, FUN = function(ii) {
  letters[ii]
}

which would make it clear that foreach() is just another map-reduce function very similar to lapply().

To further bring the message home, it wouldn’t be hard to imagine an implementation of lapply() that could be written as:

res <- lapply(ii = 1:3) %dopar% {
  letters[ii]
}

I hope that clarifies it.

8.9 Appendix: Debugging

For troubleshooting, call backtrace() (sic!), if there is an error when running in parallel. You can also retry with plan(sequential). If you still get an error, then use debug() with plan(sequential, split = TRUE) to interactively step through the problematic function.

Since future relay all output, you can also add print(), str(), and message() output to your functions, which is a common poor man’s debugging technique that actually works.

8.9.0.1 For package developers

  • Will my future code work anywhere regardless of where it runs?

    • If the answer is yes, then you’ve embraced the philosophy of futures to 100%.

    • If the answer is no, try to identify exactly what part of the future code won’t work everywhere, and see if it is necessary to have that constrain.

It’s always a good practice to never override users settings, including with foreach adapter they might already have registered. For instance, if you do:

llply_slow <- function(x) {
  doFuture::registerDoFuture()
  llply(x, slow, .parallel = TRUE)
}

you will break the user’s intentions if they use it as:

library(foreach)
doParallel::registerDoParallel(2)

y1 <- foreach(ii = 1:3) %dopar% { some_other_fcn(ii) }
y2 <- llply_slow(1:3)  ## here you change the adaptor
y3 <- foreach(ii = 3:1) %dopar% { some_other_fcn(ii) }

To avoid this, undo your adaptor changes as:

llply_slow <- function(x) {
  oldDoPar <- registerDoFuture()
  on.exit(with(oldDoPar, foreach::setDoPar(fun=fun, data=data, info=info)), add = TRUE)
  
  llply(x, slow, .parallel = TRUE)
}