Reproducibility of parallel tasks in R

Introduction: Reproducibility in R

Reproducibility is important. More important than ever.

However, making a project reproducible is not as trivial as it sounds. It involves quite some effort which goes beyond “doing it programmatically”:

  • Making the raw data available

  • providing the scripts to process

  • ensuring that stochastic processes return the same results when being run in a fresh environment.

For the first two points, efforts like the Zenodo project, containerization (containerit), and makefile-based approaches (drake, workflowr) provide helpful resources.

Despite all of these efforts, there is another point that needs to be added to the list: Ensuring that parallel stochastic computations are reproducible.

The idea of this post originated from this Stackoverflow question. In particular by the answer of Ralf Stubner which got me started thinking about all the differences between parallel modes in R and how complicated it is for users to get an overview of all the available choices.

Today parallelization is used in many studies. Especially when it comes to modeling (machine learning / statistical), parallelization is practically indispensable. There is a variety of choices in R to go parallel. Speaking from my practical experience over the last years, the most widely known (and used?) approach is foreach with the help of the %dopar% operator. Probably because it is based on the good old for()-loop which is usually taught in intro lessons of any programming language. Next comes the “multicore” aka “apply” mode in R, with functions lapply() (and friends) and mclappy().

When it comes to reproducibility of stochastic processes, most people are aware that one needs to use set.seed(). While this holds true for sequential processes, it does not for parallel ones. The default RNG kind used in R (“Mersenne-Twister”) does not work with parallel backends. When reading ?set.seed(), I see people often closing the page quickly after having taken a first look. Why? There is a lot of information in this help page - however, most of it is also mildly complicated and very theoretical. Also, there is only one sentence which relates to parallelization:

This is not particularly interesting of itself, but provides the basis for the multiple streams used in package parallel.

This sentence belongs to the description of the "L'Ecuyer-CMRG" RNG kind. When only reading this sentence, there is usually still confusion about what action needs to be taken (if at all).

As you might already suspect at this point, action needs to be taken to make parallel streams reproducible. And unfortunately, this actions differs based on the backend / package used. To clear some of this confusion, I’ve put together a few minimal reproducible examples to show how to make stochastic processes reproducible for various parallel backends:

  • “multicore” backend
  • “socket” backend
  • “callr” backend

The examples of this post relate to the parallelMap and future packages. The first one is being used in the (retired) mlr package while future provides a generic approach to parallelization in R used by many packages (among other also by the mlr3 package (shameless self-promotion :o).

You will see that the solutions to ensure reproducibility differ for each backend / package. This is really unfortunate and requires users to change code for every backend. Unfortunately there is no other way to accomplish this right now (and possibly in the future). Maybe this post can help serving as a reference for parallel reproducibility in the R community.

The parallelMap package

Good to know: In all following examples set.seed(123456, "Mersenne-Twister") is explicitly used instead of just set.seed(123456). This is only for showcases purposes to recall that "Mersenne-Twister" is the default RNG kind used in R. Hence, set.seed(123456, "Mersenne-Twister") and set.seed(123456) are effectively the same when being called in a fresh R session.

Multicore backend

Let`s take a look at the first example. When using the “multicore” mode and the standard RNG kind, results are not reproducible.

Not reproducible

library("parallelMap")

suppressMessages({
  set.seed(123456, "Mersenne-Twister") # R default
  parallelStartMulticore(cpus = 2)
  r1 <- parallelMap(runif, rep(3, 2))
  parallelStop()
  
  set.seed(123456, kind = "Mersenne-Twister") # R default
  parallelStartMulticore(cpus = 2)
  r2 <- parallelMap(runif, rep(3, 2))
  parallelStop()
})
all.equal(r1, r2)
## [1] "Component 1: Mean relative difference: 1.816508" 
## [2] "Component 2: Mean relative difference: 0.8693197"

Reproducible

To ensure reproducibility for this mode, one needs to use the "L'Ecuyer-CMRG" RNG kind. This holds true for using mclapply() directly as well. The parallelMap packages essentially just wraps mclapply() in its parallelStartMulticore() function.

library("parallelMap")

suppressMessages({
  set.seed(123456, "L'Ecuyer-CMRG")
  parallelStartMulticore(cpus = 2)
  r3 <- parallelMap(runif, rep(3, 2))
  parallelStop()
  
  set.seed(123456, "L'Ecuyer-CMRG")
  parallelStartMulticore(cpus = 2)
  r4 <- parallelMap(runif, rep(3, 2))
  parallelStop()
})
all.equal(r3, r4)
## [1] TRUE

Socket backend

For a cluster-based approach, the approach differs. First, the non-reproducible way again.

Not reproducible

library("parallelMap")

suppressMessages({
  parallelStartSocket(cpus = 2)
  set.seed(123456, "Mersenne-Twister") # R default
  r5 <- parallelMap(runif, rep(3, 2))
  parallelStop()
  
  parallelStartSocket(cpus = 2)
  set.seed(123456, "Mersenne-Twister") # R default
  r6 <- parallelMap(runif, rep(3, 2))
  parallelStop()
})
all.equal(r5, r6)
## [1] "Component 1: Mean relative difference: 0.7527587"
## [2] "Component 2: Mean relative difference: 2.343758"

Reproducible

Instead of simply using the "L'Ecuyer-CMRG" RNG kind directly, one needs to use the function clusterSetRNGStream() as shown below. Behind the scenes, this function also uses the "L'Ecuyer-CMRG" RNG kind.

I cannot tell why the R-core guys decided to go this route instead of using the same approach as for the “multicore” mode and handling things behind the scenes. This would have prevented a lot of confusion and (possibly) non-reproducible results in the past (and future).

Important

The clusterSetRNGStream() call needs to come after the “PSOCK” cluster was initialized (here done within parallelStartSocket(), otherwise by calling makeCluster()).

library("parallelMap")

suppressMessages({
  parallelStartSocket(cpus = 2)
  parallel::clusterSetRNGStream(iseed = 123456)
  r7 <- parallelMap(runif, rep(3, 2))
  parallelStop()
  
  parallelStartSocket(cpus = 2)
  parallel::clusterSetRNGStream(iseed = 123456)
  r8 <- parallelMap(runif, rep(3, 2))
  parallelStop()
})
all.equal(r7, r8)
## [1] TRUE

If the order is reversed, clusterSetRNGStream() will error:

library("parallelMap")

suppressMessages({
  parallel::clusterSetRNGStream(iseed = 123456)
  parallelStartSocket(cpus = 2)
  r7b <- parallelMap(runif, rep(3, 2))
  parallelStop()
  
  parallel::clusterSetRNGStream(iseed = 123456)
  parallelStartSocket(cpus = 2)
  r8b <- parallelMap(runif, rep(3, 2))
  parallelStop()
})
## Error in defaultCluster(cl): no cluster 'cl' supplied and none is registered
all.equal(r7b, r8b)
## Error in all.equal(r7b, r8b): object 'r7b' not found

The future package

As of the time writing this post, the future package comes with support for several asynchronous backends:

  • multicore
  • multisession (“socket” mode)
  • multiprocess (multicore (if avail), fallback multisession)
  • callr

The following modes are derivatives from the multisession/socket mode tailored to be used on HPC and other remote machines:

  • cluster (“socket” mode)
  • batchtools (execution on HPC)

The callr backend is entirely different as it does not make use of forking (multicore) or cluster-based approaches. See the package’s README for a detailed comparison.

The following examples show how you can make parallelization relying on the future package and its friends reproducible.

Multicore backend

Not reproducible

library("future")
library("future.apply")

set.seed(123456, "Mersenne-Twister") # R default
plan(multicore, workers = 2)
r9 <- future_lapply(rep(3, 2), runif)

set.seed(123456, kind = "Mersenne-Twister") # R default
plan(multicore, workers = 2)
r10 <- future_lapply(rep(3, 2), runif)

all.equal(r9, r10)
## [1] "Component 1: Mean relative difference: 0.2236196"
## [2] "Component 2: Mean relative difference: 0.5047078"

Reproducible

In contrast to the parallelMap approach, the multicore backend uses the "L'Ecuyer-CMRG" RNG kind already internally. See ?future_lapply(). There are two ways to activate it:

    1. pass an integer value to the future.seed argument of the foreground function being used (here we use future_lapply() from the future.apply package)
    1. when setting the a "L'Ecuyer-CMRG" seed upfront, pass TRUE to the future.seed argument

Note that one could also use the furrr package with its function future_map() as equivalent substitute.

library("future")
library("future.apply")

plan(multicore, workers = 2)
r11a <- future_lapply(rep(3, 2), runif, future.seed = 123456)

plan(multicore, workers = 2)
r12a <- future_lapply(rep(3, 2), runif, future.seed = 123456)

all.equal(r11a, r12a)
## [1] TRUE
library("future")
library("future.apply")

set.seed(123456, "L'Ecuyer-CMRG")
plan(multicore, workers = 2)
r11b <- future_lapply(rep(3, 2), runif, future.seed = TRUE)

set.seed(123456, "L'Ecuyer-CMRG")
plan(multicore, workers = 2)
r12b <- future_lapply(rep(3, 2), runif, future.seed = TRUE)

all.equal(r11b, r12b)
## [1] TRUE

Socket backend

The following example uses the doFuture backend to parallelize a foreach loop.

Not reproducible

library("future")
library("doFuture")

set.seed(123456, "Mersenne-Twister") # R default
registerDoFuture()
plan(multisession, workers = 2)
r13 = foreach(i = rep(3, 2)) %dopar% {
  runif(i)
}

set.seed(123456, "Mersenne-Twister") # R default
registerDoFuture()
plan(multisession, workers = 2)
r14 = foreach(i = rep(3, 2)) %dopar% {
  runif(i)
}

all.equal(r13, r14)
## [1] "Component 1: Mean relative difference: 0.5416887"
## [2] "Component 2: Mean relative difference: 0.3570786"

Reproducible

When reading the RNG section of ?doFuture::doFuture, one discovers that the doFuture does not support reproducible parallel streams. Instead, the doRNG package is suggested for reproducible parallel foreach calls.

Subsequently, one needs to initialize a “PSOCK” cluster and call registerDoRNG() with the desired seed value.

library("future")
library("doRNG")

cl = makeCluster(2, type = "PSOCK")
setDefaultCluster(cl)
registerDoRNG(123456)
r15 = foreach(i = rep(3, 2)) %dopar% {
  runif(i)
}

cl = makeCluster(2, type = "PSOCK")
setDefaultCluster(cl)
registerDoRNG(123456)
r16 = foreach(i = rep(3, 2)) %dopar% {
  runif(i)
}
all.equal(r15, r16)
## [1] TRUE

callr backend

The “callr” backend luckily functions in the same way as the “multicore” one from the future package.

Not reproducible

library("future")
library("future.callr")
library("future.apply")

set.seed(123456, kind = "Mersenne-Twister") # R default
plan(callr, workers = 2)
r17 <- future_lapply(rep(3, 2), runif)

set.seed(123456, kind = "Mersenne-Twister") # R default
plan(callr, workers = 2)
r18 <- future_lapply(rep(3, 2), runif)

all.equal(r17, r18)
## [1] "Component 1: Mean relative difference: 0.3052933"
## [2] "Component 2: Mean relative difference: 0.3617484"

Reproducible

library("future")
library("future.callr")
library("future.apply")

set.seed(123456, "L'Ecuyer-CMRG")
plan(callr, workers = 2)
r19 <- future_lapply(rep(3, 2), runif, future.seed = TRUE)

set.seed(123456, "L'Ecuyer-CMRG")
plan(callr, workers = 2)
r20 <- future_lapply(rep(3, 2), runif, future.seed = TRUE)

all.equal(r19, r20)
## [1] TRUE

Summary

With all the examples shown above, it should be clear now that care needs to be taken to achieve reproducibility in R in parallel scenarios. Approaches differ not only by mode but also by packages interfacing the modes.

Sometimes package authors might take care of this internally but more often not. In the end it is your own responsibility to check your code for its reproducibility.

This post does not compare the strong and weak parts of all parallel modes shown. If you are curious about this and wonder which backend you should go with in the first place, I can recommend the overview vignette of the future package. However, do not expect that this document will tell you “the” best mode. There is no such thing (currently). ;)

comments powered by Disqus

Related