parallel replaces two historical packages–the multicore and snow packages, and the functions in parallel have overlapping names with those older packages.
Useful Functions
mclapply()
: Fork Style, not available on Windows. When you specify mc.cores > 1, the code will fail.
mclapply()
function (and related mc*
functions) works via the fork mechanism on Unix-style operating systems. Briefly, your R session is the main process and when you call a function like mclapply()
, you fork a series of sub-processes that operate independently from the main process (although they share a few low-level features). These sub-processes then execute your function on their subsets of the data, presumably on separate cores of your CPU. Once the computation is complete, each sub-process returns its results and then the sub-process is killed. The parallel
package manages the logistics of forking the sub-processes and handling them once they’ve finished.mclapply()
parallelizes calls to lapply()
. The first two arguments to mclapply()
are exactly the same as they are for lapply()
. However, mclapply()
has further arguments (that must be named), the most important of which is the mc.cores
argument which you can use to specify the number of processors/cores you want to split the computation across. For example, if your machine has 4 cores on it, you might specify mc.cores = 4
to break your parallelize your operation across 4 cores (although this may not be the best idea if you are running other operations in the background besides R).mclapply
is that the worker processes are all created as clones of the master right at the point that mclapply
is called, so you don’t have to worry about reproducing your environment on each of the cluster workers. Unfortunately, that isn’t possible on Windows.detectCores()
makeCluster()
makeCluster()
function has a type argument that allows for different types of clusters beyond using sockets (although the default is a socket cluster).stopCluster()
clusterEvalQ()
clusterEvalQ
evaluates a literal expression on each cluster node. It is a parallel version of evalq
, and is a convenience function invoking clusterCall.parLapply()
lapply()
operation over a socket cluster we can use the parLapply()
function.clusterExport()
to export the data to the child process. The need to export data is a key difference in behavior between the “multicore” approach and the “socket” approach.clusterApply()
.clusterExport()
clusterExport()
is a character vector, and so you can export an arbitrary number of R objects to the child processes. You should be judicious in choosing what you export simply because each R object will be replicated in each of the child processes, and hence take up memory on your computer.The doParallel
package is a merger of doSNOW
and doMC
, much as parallel
is a merger of snow
and multicore.
It tells foreach
’s %dopar%
to use parallel
as backend, which means it is a interface between foreach
and parallel
.
foreach
package offers %do% and %dopar% to do the forloop-like jobs. It returns a list of results, whereas a for loop has no value and uses side effects to convey its result. Because of this, foreach loops have a few advantages over for loops when the purpose of the loop is to create a data structure such as a vector, list, or matrix: First, there is less code duplication, and hence, less chance for an error because the initialization of the vector or matrix is unnecessary. Second, a foreach loop may be easily parallelized by changing only a single keyword.
foreach
uses iterators
package to introduce Python-style data feeding which may save memory during computation.
Much of parallel computing comes to doing three things: splitting the problem into pieces, executing the pieces in parallel, and combining the results back together. Using the foreach
package, the iterators
help you to split the problem into pieces, the %dopar%
function executes the pieces in parallel, and the specified .combine
function puts the results back together.
when
and %:%
make doing nested loops easier.
For nested loops, an important issue is deciding which loop to use the parallelization.
The doMC package is a “parallel backend” for the foreach package. It provides a mechanism needed to execute foreach loops in parallel. The foreach package must be used in conjunction with a package such as doMC in order to execute code in parallel. The user must register a parallel backend to use, otherwise foreach will execute tasks sequentially, even when the %dopar% operator is used.
To get information of the parallel workers, you may use,
getDoParName()
getDoParWorkers()
getDoParVersion()
getDoParRegistered()
TRUE
/FALSE
on whether the backend is registered.
R version 3.6.3 (2020-02-29) -- "Holding the Windsock"
Copyright (C) 2020 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)
R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.
Natural language support but running in an English locale
R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.
Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
[Previously saved workspace restored]
> ### Load Package
> library(foreach)
> library(parallel)
> library(doParallel)
Loading required package: iterators
> library(tidyverse)
── Attaching packages ─────────────────────────────────────── tidyverse 1.3.0 ──
✔ ggplot2 3.3.2 ✔ purrr 0.3.4
✔ tibble 3.0.4 ✔ dplyr 1.0.2
✔ tidyr 1.1.2 ✔ stringr 1.4.0
✔ readr 1.4.0 ✔ forcats 0.5.0
── Conflicts ────────────────────────────────────────── tidyverse_conflicts() ──
✖ purrr::accumulate() masks foreach::accumulate()
✖ dplyr::filter() masks stats::filter()
✖ dplyr::lag() masks stats::lag()
✖ purrr::when() masks foreach::when()
>
> ### Use 4 cores for parallel
> cl <- makeCluster(4)
> registerDoParallel(cl)
>
> ### Spawn information
> getDoParName()
[1] "doParallelSNOW"
> getDoParWorkers()
[1] 4
>
> ### How many cores does the node have
> detectCores()
[1] 64
>
> # Our model
> x <- iris[which(iris[,5] != "setosa"), c(1,5)]
>
> summary(glm(x[,2]~x[,1], family=binomial(logit)))
Call:
glm(formula = x[, 2] ~ x[, 1], family = binomial(logit))
Deviance Residuals:
Min 1Q Median 3Q Max
-1.85340 -0.90001 -0.04717 0.96861 2.35458
Coefficients:
Estimate Std. Error z value Pr(>|z|)
(Intercept) -12.5708 2.9068 -4.325 1.53e-05 ***
x[, 1] 2.0129 0.4654 4.325 1.53e-05 ***
---
Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1
(Dispersion parameter for binomial family taken to be 1)
Null deviance: 138.63 on 99 degrees of freedom
Residual deviance: 110.55 on 98 degrees of freedom
AIC: 114.55
Number of Fisher Scoring iterations: 4
>
> # The number of Bootstrap sample
> trials <- 100000
>
> ## foreach sequential (Not parallel result)
>
> boot.time.foreach.seq <- system.time({
+ boot.res.foreach.seq <-
+ foreach(i = icount(trials), .combine=rbind) %do% {
+ set.seed(i)
+ ind <- sample(nrow(iris), replace=TRUE)
+ result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
+ coefficients(result1)
+ }
+ })
>
>
> ## foreach parallel
>
>
> boot.time.foreach <- system.time({
+ boot.res.foreach <-
+ foreach(i = icount(trials), .combine=rbind) %dopar% {
+ set.seed(i)
+ ind <- sample(nrow(iris), replace=TRUE)
+ result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
+ coefficients(result1)
+ }
+ })
>
>
>
> ## parallel::mclapply
> boot.time.mclapply <- system.time({
+ boot.res.mclapply <-
+ mclapply(
+ 1:trials,
+ function(i){
+ set.seed(i)
+ ind <- sample(nrow(iris), replace=TRUE)
+ result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
+ coefficients(result1)
+ }, mc.cores = 4
+ )
+ })
>
>
>
> ## parallel::parLapply
> clusterExport(cl, "x")
>
> boot.time.parLapply <- system.time({
+ boot.res.parLapply <-
+ parLapply(cl, 1:trials,
+ function(i){
+ set.seed(i)
+ ind <- sample(nrow(iris), replace=TRUE)
+ result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
+ coefficients(result1)
+ }
+ )
+ })
>
>
>
> ## Time consumption
> boot.time.foreach.seq
user system elapsed
389.632 0.841 390.518
> boot.time.foreach
user system elapsed
68.835 5.455 165.378
> boot.time.mclapply
user system elapsed
209.926 2.507 106.313
> boot.time.parLapply
user system elapsed
0.150 0.012 101.723
>
> ## Intercept
> mean(boot.res.foreach.seq[, 1])
[1] -13.15341
> sd(boot.res.foreach.seq[, 1])
[1] 3.152096
>
> mean(boot.res.foreach[, 1])
[1] -13.15341
> sd(boot.res.foreach[, 1])
[1] 3.152096
>
> mean(map_dbl(boot.res.mclapply, ~.x[1]))
[1] -13.15341
> sd(map_dbl(boot.res.mclapply, ~.x[1]))
[1] 3.152096
>
> mean(map_dbl(boot.res.parLapply, ~.x[1]))
[1] -13.15341
> sd(map_dbl(boot.res.parLapply, ~.x[1]))
[1] 3.152096
>
> ## Slope
> mean(boot.res.foreach.seq[, 2])
[1] 2.106673
> sd(boot.res.foreach.seq[, 2])
[1] 0.5044038
>
> mean(boot.res.foreach[, 2])
[1] 2.106673
> sd(boot.res.foreach[, 2])
[1] 0.5044038
>
> mean(map_dbl(boot.res.mclapply, ~.x[2]))
[1] 2.106673
> sd(map_dbl(boot.res.mclapply, ~.x[2]))
[1] 0.5044038
>
> mean(map_dbl(boot.res.parLapply, ~.x[2]))
[1] 2.106673
> sd(map_dbl(boot.res.parLapply, ~.x[2]))
[1] 0.5044038
>
> proc.time()
user system elapsed
885.105 12.103 773.158
The “multicore” approach, which makes use of the mclapply() function is perhaps the simplest and can be implemented on just about any multi-core system (which nowadays is any system).
The “socket” approach, parLapply()
like, is a bit more general and can be implemented on systems where the fork-ing mechanism is not available. The approach used in the “socket” type cluster can also be extended to other parallel cluster management systems.