The future.redis package implements the Future API using the Redis key/value database to define partially fault-tolerant task queues for elastic distributed computing.

Examples

# \donttest{
if (redux::redis_available()) {
## The example assumes that a Redis server is running on the local host
## and standard port.
plan(redis)
workers <- startLocalWorkers(2, linger = 1.0)
demo("mandelbrot", package = "future", ask = FALSE)
stopLocalWorkers(workers)
}
#> 
#> 
#> 	demo(mandelbrot)
#> 	---- ~~~~~~~~~~
#> 
#> > library("future")
#> 
#> > library("graphics")
#> 
#> > plot_what_is_done <- function(counts) {
#> +   for (kk in seq_along(counts)) {
#> +     f <- counts[[kk]]
#> + 
#> +     ## Already plotted?
#> +     if (!inherits(f, "Future")) next
#> + 
#> +     ## Not resolved?
#> +     if (!resolved(f)) next
#> + 
#> +     message(sprintf("Plotting tile #%d of %d ...", kk, n))
#> +     counts[[kk]] <- value(f)
#> +     screen(kk)
#> +     plot(counts[[kk]])
#> +   }
#> + 
#> +   counts
#> + }
#> 
#> > ## Options
#> > region <- getOption("future.demo.mandelbrot.region", 1L)
#> 
#> > if (!is.list(region)) {
#> +   if (region == 1L) {
#> +     region <- list(xmid = -0.75, ymid = 0.0, side = 3.0)
#> +   } else if (region == 2L) {
#> +     region <- list(xmid = 0.283, ymid = -0.0095, side = 0.00026)
#> +   } else if (region == 3L) {
#> +     region <- list(xmid = 0.282989, ymid = -0.01, side = 3e-8)
#> +   }
#> + }
#> 
#> > nrow <- getOption("future.demo.mandelbrot.nrow", 3L)
#> 
#> > resolution <- getOption("future.demo.mandelbrot.resolution", 400L)
#> 
#> > delay <- getOption("future.demo.mandelbrot.delay", interactive())
#> 
#> > if (isTRUE(delay)) {
#> +   delay <- function(counts) Sys.sleep(1.0)
#> + } else if (!is.function(delay)) {
#> +   delay <- function(counts) {}
#> + }
#> 
#> > ## Generate Mandelbrot tiles to be computed
#> > Cs <- mandelbrot_tiles(xmid = region$xmid, ymid = region$ymid,
#> +                        side = region$side, nrow = nrow,
#> +                        resolution = resolution)
#> 
#> > if (interactive()) {
#> +   dev.new()
#> +   plot.new()
#> +   split.screen(dim(Cs))
#> +   for (ii in seq_along(Cs)) {
#> +     screen(ii)
#> +     par(mar = c(0, 0, 0, 0))
#> +     text(x = 1 / 2, y = 1 / 2, sprintf("Future #%d\nunresolved", ii), cex = 2)
#> +   }
#> + } else {
#> +   split.screen(dim(Cs))
#> + }
#> [1] 1 2 3 4 5 6 7 8 9
#> 
#> > ## Create all Mandelbrot tiles via lazy futures
#> > n <- length(Cs)
#> 
#> > message(sprintf("Creating %d Mandelbrot tiles:", n), appendLF = FALSE)
#> Creating 9 Mandelbrot tiles:
#> 
#> > counts <- lapply(seq_along(Cs), FUN=function(ii) {
#> +   message(" ", ii, appendLF = FALSE)
#> +   C <- Cs[[ii]]
#> +   future({
#> +     message(sprintf("Calculating tile #%d of %d ...", ii, n), appendLF = FALSE)
#> +     fit <- mandelbrot(C)
#> + 
#> +     ## Emulate slowness
#> +     delay(fit)
#> + 
#> +     message(" done")
#> +     fit
#> +   }, lazy = TRUE)
#> + })
#>  1
#>  2
#>  3
#>  4
#>  5
#>  6
#>  7
#>  8
#>  9
#> 
#> > message(".")
#> .
#> 
#> > ## Calculate and plot tiles
#> > repeat {
#> +   counts <- plot_what_is_done(counts)
#> +   if (!any(sapply(counts, FUN = inherits, "Future"))) break
#> + }
#> Plotting tile #1 of 9 ...
#> Calculating tile #1 of 9 ...
#>  done
#> Plotting tile #2 of 9 ...
#> Calculating tile #2 of 9 ...
#>  done
#> Plotting tile #3 of 9 ...
#> Calculating tile #3 of 9 ...
#>  done
#> Plotting tile #4 of 9 ...
#> Calculating tile #4 of 9 ...
#>  done
#> Plotting tile #5 of 9 ...
#> Calculating tile #5 of 9 ...
#>  done
#> Plotting tile #6 of 9 ...
#> Calculating tile #6 of 9 ...
#>  done
#> Plotting tile #7 of 9 ...
#> Calculating tile #7 of 9 ...
#>  done
#> Plotting tile #8 of 9 ...
#> Calculating tile #8 of 9 ...
#>  done
#> Plotting tile #9 of 9 ...
#> Calculating tile #9 of 9 ...
#>  done
#> 
#> > close.screen()
#> [1] 1 2 3 4 5 6 7 8 9
#> 
#> > message("SUGGESTION: Try to rerun this demo after changing strategy for how futures are resolved, e.g. plan(multisession).\n")
#> SUGGESTION: Try to rerun this demo after changing strategy for how futures are resolved, e.g. plan(multisession).

# }