R parallel

From 太極
Revision as of 22:02, 4 July 2019 by Brb (talk | contribs) (Created page with "= Introduction = * [http://shop.oreilly.com/product/0636920021421.do Example code] for the book Parallel R by McCallum and Weston. * [http://www.win-vector.com/blog/2016/01/p...")
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Introduction

Security warning from Windows/Mac

It seems it is safe to choose 'Cancel' when Windows Firewall tried to block R program when we use makeCluster() to create a socket cluster.

library(parallel)
cl <- makeCluster(2)
clusterApply(cl, 1:2, get("+"), 3)
stopCluster(cl)

WindowsSecurityAlert.png RegisterDoParallel mac.png

If we like to see current firewall settings, just click Windows Start button, search 'Firewall' and choose 'Windows Firewall with Advanced Security'. In the 'Inbound Rules', we can see what programs (like, R for Windows GUI front-end, or Rserve) are among the rules. These rules are called 'private' in the 'Profile' column. Note that each of them may appear twice because one is 'TCP' protocol and the other one has a 'UDP' protocol.

Detect number of cores

parallel::detectCores()

Don't use the default option getOption("mc.cores", 2L) (PS it only returns 2.) in mclapply() unless you are a developer for a package.

However, it is a different story when we run the R code in HPC cluster. Read the discussion Whether to use the detectCores function in R to specify the number of cores for parallel processing?

On NIH's biowulf, even I specify an interactive session with 4 cores, the parallel::detectCores() function returns 56. This number is the same as the output from the bash command grep processor /proc/cpuinfo or (better) lscpu. The free -hm also returns a full 125GB size instead of my requested size (4GB by default). The future::availableCores() fixes the problem. See Biowulf's R webpage for a detailed instructure.

Socket or Fork

Parallel R: Socket or Fork

  • For the fork, each parallel thread is a complete duplication of the master process with the shared environment, including objects or variables defined prior to the kickoff of parallel threads. Therefore, it runs fast. However, the major limitation is that the fork doesn’t work on the Windows system.
  • The socket works on all operating systems. Each thread runs separately without sharing objects or variables, which can only be passed from the master process explicitly. As a result, it runs slower due to the communication overhead.
install.packages(c("rbenchmark", "nycflights13"))
data(flights, package = 'nycflights13')
dim(flights) # [1] 336776     19
ex <- expression(carrier == "UA" & origin == "EWR" & day == 1 & is.na(arr_time))

parFilter <- function(df, ex, type) {
  cn <- parallel::detectCores() - 1
  cl <- parallel::makeCluster(cn, type = type)
  ### DIVIDE THE DATAFRAME BASED ON # OF CORES
  sp <- parallel::parLapply(cl, parallel::clusterSplit(cl, seq(nrow(df))),
                            function(c_) df[c_,])
  ### PASS THE OBJECT FROM MASTER PROCESS TO EACH NODE
  parallel::clusterExport(cl, "ex")
  ### EXTRACT ROW INDEX ON EACH NODE
  id <- Reduce(c, parallel::parLapply(cl, sp,
                                      function(s_) with(s_, eval(ex))))
  parallel::stopCluster(cl)
  return(df[which(id),])
}

rbenchmark::benchmark(replications = 10, order = "elapsed", relative = "elapsed",
                      columns = c("test", "replications", "elapsed", "relative"),
                      "  FORK" = parFilter(flights, ex, "FORK"),
                      "SOCKET" = parFilter(flights, ex, "PSOCK")
)  # Phenom(tm) II X6 1055T
    test replications elapsed relative
1   FORK           10  42.154    1.000
2 SOCKET           10  68.901    1.635

How can I print when using %dopar%

https://stackoverflow.com/a/15078540

Create and register your cluster with something like:

library(doParallel)
cl<-makeCluster(4, outfile = "debugParallel.txt")
registerDoParallel(cl)
foreach(x=list(1, 2, "a"))  %dopar%  
{
  print(x)
}
stopCluster(cl)

And the debugParallel.txt will look like

starting worker pid=17718 on localhost:11704 at 15:10:29.665
starting worker pid=17733 on localhost:11704 at 15:10:29.850
starting worker pid=17747 on localhost:11704 at 15:10:30.025
starting worker pid=17761 on localhost:11704 at 15:10:30.183
[1] 1
[1] 2
[1] "a"

Setting outfile to the empty string ("") will prevent doParallel (or snow whatever) from redirecting the output, often resulting in the output from your print messages showing up on the terminal of the master process. Your foreach loop doesn't need to change at all.

parallel package (including parLapply, parSapply)

Parallel package was included in R 2.14.0. It is derived from the snow and multicore packages and provides many of the same functions as those packages.

The parallel package provides several *apply functions for R users to quickly modify their code using parallel computing.

  • makeCluster(makePSOCKcluster, makeForkCluster), stopCluster. Other cluster types are passed to package snow.
  • clusterCall, clusterEvalQ: source R files and/or load libraries
  • clusterSplit
  • clusterApply, clusterApplyLB (vs the foreach package)
  • clusterExport: export variables
  • clusterMap
  • parallel::mclapply() and parallel::parLapply()
  • parLapply, parSapply, parApply, parRapply, parCapply. Note that
    • parSapply() can be used to as a parallel version of the replicate() function. See this example.
    • An iteration parameter needs to be added to the first parameter of the main function.
  • parLapplyLB, parSapplyLB (load balance version)
  • clusterSetRNGStream, nextRNGStream, nextRNGSubStream

Examples (See ?clusterApply)

library(parallel)
cl <- makeCluster(2, type = "SOCK")
clusterApply(cl, 1:2, function(x) x*3)    # OR clusterApply(cl, 1:2, get("*"), 3)
# [[1]]
# [1] 3
#
# [[2]]
# [1] 6
parSapply(cl, 1:20, get("+"), 3)
#  [1]  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
stopCluster(cl)

An example of using clusterCall() or clusterEvalQ()

library(parallel)

cl <- makeCluster(4)
clusterCall(cl, function() { 
  source("test.R") 
})
# clusterEvalQ(cl, {
#    source("test.R")
# })

## do some parallel work
stopCluster(cl)

mclapply() from the 'parallel' package is a mult-core version of lapply()

  • Be providing the number of cores in mclapply() using mc.cores argument (2 is used by default)
  • Be careful on the need and the side-effect of using "L'Ecuyer-CMRG" seed.
  • R doesn't reset the seed when “L'Ecuyer-CMRG” RNG is used?
    library(parallel)
    system.time(mclapply(1:1e4L, function(x) rnorm(x)))
    system.time(mclapply(1:1e4L, function(x) rnorm(x), mc.cores = 4))
    
    set.seed(1234)
    mclapply(1:3, function(x) rnorm(x))
    set.seed(1234)
    mclapply(1:3, function(x) rnorm(x)) # cannot reproduce the result
    
    set.seed(123, "L'Ecuyer")
    mclapply(1:3, function(x) rnorm(x))
    mclapply(1:3, function(x) rnorm(x)) # results are not changed once we have run set.seed( , "L'Ecuyer")
    
    set.seed(1234)                      # use set.seed() in order to get a new reproducible result
    mclapply(1:3, function(x) rnorm(x)) 
    mclapply(1:3, function(x) rnorm(x)) # results are not changed
  • An example of using parallel::mcMap().

Note

  1. R doesn't reset the seed when “L'Ecuyer-CMRG” RNG is used?
  2. Windows OS can not use mclapply(). The mclapply() implementation relies on forking and Windows does not support forking. mclapply from the parallel package is implemented as a serial function on Windows systems. The parallelsugar package was created based on the above idea.
  3. Another choice for Windows OS is to use parLapply() function in parallel package.
  4. Understanding the differences between mclapply and parLapply in R You don't have to worry about reproducing your environment on each of the cluster workers if mclapply() is used.
    ncores <- as.integer( Sys.getenv('NUMBER_OF_PROCESSORS') )
    cl <- makeCluster(getOption("cl.cores", ncores))
    LLID2GOIDs2 <- parLapply(cl, rLLID, function(x) {
                                        library(org.Hs.eg.db); get("org.Hs.egGO")[[x]]} 
                            )
    stopCluster(cl)
    It does work. Cut the computing time from 100 sec to 29 sec on 4 cores.

mclapply() vs foreach()

https://stackoverflow.com/questions/44806048/r-mclapply-vs-foreach

parallel vs doParallel package

doParallel is a foreach parallel adaptor for the parallel package.

doParallel provides a parallel backend for the %dopar% function using the parallel package.

doParallel depends on foreach, iterators, parallel and utils packages.

parallelsugar package

If we load parallelsugar, the default implementation of parallel::mclapply, which used fork based clusters, will be overwritten by parallelsugar::mclapply, which is implemented with socket clusters.

library(parallel) 

system.time( mclapply(1:4, function(xx){ Sys.sleep(10) }) )
##    user  system elapsed 
##    0.00    0.00   40.06 

library(parallelsugar)
## 
## Attaching package: ‘parallelsugar’
## 
## The following object is masked from ‘package:parallel’:
## 
##     mclapply

system.time( mclapply(1:4, function(xx){ Sys.sleep(10) }) )
##    user  system elapsed 
##    0.04    0.08   12.98

snow package

Supported cluster types are "SOCK", "PVM", "MPI", and "NWS".

multicore package

This package is removed from CRAN.

Consider using package ‘parallel’ instead.

foreach package

This package depends on one of the following

  • doParallel - Foreach parallel adaptor for the parallel package
  • doSNOW - Foreach parallel adaptor for the snow package
  • doMC - Foreach parallel adaptor for the multicore package. Used in glmnet vignette.
  • doMPI - Foreach parallel adaptor for the Rmpi package
  • doRedis - Foreach parallel adapter for the rredis package

as a backend.

library(foreach)
library(doParallel)

m <- matrix(rnorm(9), 3, 3)

cl <- makeCluster(2, type = "SOCK")
registerDoParallel(cl) # register the parallel backend with the foreach package
foreach(i=1:nrow(m), .combine=rbind) %dopar%
  (m[i,] / mean(m[i,]))

stopCluster(cl)

See also this post Updates to the foreach package and its friends on Oct 2015.

combine list of lists

comb <- function(...) {
  mapply('cbind', ..., SIMPLIFY=FALSE)
}

library(foreach)
library(doParallel)

cl <- makeCluster(2)
registerDoParallel(cl) # register the parallel backend with the foreach package

m <- rbind(rep(1,3), rep(2,3))

# nrow(m) can represents number of permutations (2 in this toy example)
tmp <- foreach(i=1:nrow(m)) %dopar% {
  a <- m[i, ]
  b <- a * 10
  list(a, b)
}; tmp
# [[1]]
# [[1]][[1]]
# [1] 1 1 1
#
# [[1]][[2]]
# [1] 10 10 10
#
#
# [[2]]
# [[2]][[1]]
# [1] 2 2 2
#
# [[2]][[2]]
# [1] 20 20 20

foreach(i=1:nrow(m), .combine = "comb") %dopar% {
  a <- m[i,]
  b <- a * 10
  list(a, b)
}
# [[1]]
#      [,1] [,2]
# [1,]    1    2
# [2,]    1    2
# [3,]    1    2
#
# [[2]]
#      [,1] [,2]
# [1,]   10   20
# [2,]   10   20
# [3,]   10   20
stopCluster(cl)

Replacing double loops

library(foreach)
library(doParallel)

nc <- 4
nr <- 2

cores=detectCores()
cl <- makeCluster(cores[1]-1)
registerDoParallel(cl)
# set.seed(1234) # not work
# set.seed(1234, "L'Ecuyer-CMRG") # not work either
# library("doRNG")
# registerDoRNG(seed = 1985)    # not work with nested foreach
# Error in list(e1 = list(args = (1:nr)(), argnames = "i", evalenv = <environment>,  :
#  nested/conditional foreach loops are not supported yet.
m <- foreach (i = 1:nr, .combine='rbind') %:% # nesting operator
  foreach (j = 1:nc) %dopar% {
    rnorm(1, i*5, j) # code to parallelise
}
m
stopCluster(cl)

Note that since the random seed (see the next session) does not work on nested loop, it is better to convert nested loop (two indices) to a single loop (one index).

set seed and doRNG package

Export libraries, variables, functions

clusterEvalQ(cl, {
  library(biospear)
  library(glmnet)
  library(survival)
})
clusterExport(cl, list("var1", "foo2"))

Summary the result

foreach returns the result in a list. For example, if each component is a matrix we can use

  • Reduce("+", res)/length(res) # Reduce("+", res, na.rm = TRUE) not working
  • apply(simplify2array(res), 1:2, mean, na.rm = TRUE)

to get the average of matrices over the list.

Read a list files

Reading List Faster With parallel, doParallel, and pbapply

Don't use too many cores: Error in unserialize(socklist[[n]])

error reading from connection. See https://stackoverflow.com/a/24352032

Why is foreach() sometimes slower than for?

See https://stackoverflow.com/a/10414280 or https://stackoverflow.com/q/16963808

  • foreach is only advisable if you have relatively few rounds through very time consuming functions.
  • foreach() is best when the number of jobs does not hugely exceed the number of processors you will be using.
library(foreach)
library(parallel)
nCores <- future::availableCores()
cl <- makeCluster(nCores[[1]])
registerDoParallel(cl)

B<-10000

myFunc<-function() for(i in 1:B) sqrt(i)

myFunc2<-function() foreach(i = 1:B)  %do% sqrt(i)

myParFunc<-function() foreach(i = 1:B) %dopar% sqrt(i)

system.time(myFunc())
#   user  system elapsed 
#  0.001   0.002   0.004 
system.time(myFunc2())
#   user  system elapsed 
#  2.473   0.005   2.477 
system.time(myParFunc())
#   user  system elapsed 
#  3.560   0.242   3.829 

stopCluster(cl)

Another example: reduce the probe sets corresponding to the same gene symbol by taking the probe set with the largest gene expression. It is not just the speed problem; it runs out of memory on a 44928x393 matrix with a PC has 8GB RAM. On a smaller data with 5000 genes and 100 arrays it took 0.071 seconds using split() + sapply() but it took 39 seconds using the foreach method.

require(magrittr)
x <- matrix(rnorm(5000*100), nr=5000) # gene expression
y <- sample(3000, 5000, replace = TRUE) # gene symbol
rownames(x) <- names(y) <- 1:5000 # probe sets name

spt <- apply(x,1,mean,na.rm=TRUE) %>% split(., y)
ind <- sapply(spt, function(a) names(which.max(a)))
x.reduced <- x[ind, ]; rownames(x.reduced) <- names(spt)

Progress bar (doSNOW + foreach)

snowfall package

http://www.imbi.uni-freiburg.de/parallel/docs/Reisensburg2009_TutParallelComputing_Knaus_Porzelius.pdf

Rmpi package

Some examples/tutorials

OpenMP

BiocParallel

RcppParallel

future & future.apply & doFuture packages

Apache Spark

Microsoft R Server

GPU

Threads

Benchmark

Are parallel simulations in the cloud worth it? Benchmarking my MBP vs my Workstation vs Amazon EC2

R functions to run timing

# Method 1
system.time( invisible(rnorm(10000)))

# Method 2
btime <- Sys.time()
invisible(rnorm(10000))
Sys.time() - btime