Parallel processing and sparklyr

· by Jo Hardin · Read in about 8 min · (1514 words) ·

Today’s blog entry is on parallel and grid computing. As a data science education blog, our focus is more on how to discuss ways to help students learn about high performance computing in the classroom rather than parallel computing for particular research projects (for a recent example see “Ambitious data science can be painless”). Early on in data science education it’s important to develop a foundation and precursors for future work.

In this blog, we’ll explore ways to perform tasks that are embarrassingly parallel which means there is no dependency or communication between the parallel tasks. More sophisticated computing certainly has its place (see for example Majd Sakr’s Cloud Computing class at CMU). But we believe that as a first pass at teaching parallel computing, we should teach the parallel structure before bringing in dependence across the parallel tasks. Examples of embarrassingly parallel algorithms include: Monte Carlo analysis, bootstrapping, growing trees for Random Forests, group_by analyses, and cross-validation. Additionally, data science methods increasingly use randomized algorithms which can often be written in parallel.

Indeed, it isn’t always easy to know when to use a parallel construction. Because of existing overhead processes (e.g., copying data across many threads, bring results together, etc.) an algorithm run on 10 parallel strands will not reduce an original (non-parallel) run time by 10-fold. Figuring out when a parallel implementation is appropriate is beyond the scope of this blog but should be carefully considered before embarking on large projects.

Parallel and cluster computing

The R package parallel is designed to send tasks to each of multiple cores. Today’s computers (even small laptops!) typically have multiple cores, and any server or cloud computing infrastructure can easily handle dozens or hundreds of parallel tasks. The structure of the R parallel implementation sends tasks to workers that don’t talk to one another until compiling their results at the end. In her 2017 UseR! tutorial, Hana Sevcikova describes the function of workers which run code/functions/iterations separately before results are subsequently combined.

master-slave
Image from Sevcikova UseR! 2017 tutorial on parallel computing

As computing infrastructure becomes more sophisticated, it is important for our students to have the language to describe how connected components work. Parallel processing allows for a conversation on the differences between distributed computing, cluster computing, and grid computing, introducing students to the larger framework of high performance computing. The benefit of parallel computing to introduce students to the larger infrastructure is that the task of each worker is clear, important, and easy to describe. The student can focus more on the larger architecture and describing how the pieces fit together.

Some parallel examples

Before running code in parallel, it is valuable to know how many cores your computer has to work with. Note that the detectCores function will provide information about the specific device you are using (logical = FALSE tells you only the physical cores which is likely what you want). Note that after makeCluster the separate threads have information. After stopCluster, the code is no longer connecting to the cluster structure.

library(parallel)
P <- detectCores(logical=FALSE)
P
## [1] 4
cl <- makeCluster(P)
cl[[1]]
## node of a socket cluster on host 'localhost' with pid 10288

stopCluster(cl)
cl[[1]]
## Error in summary.connection(connection): invalid connection

Embarrassingly embarrassing example

In the example below, we generate some data from a Cauchy distribution and find the max of each sample. Note that for the current device there are 4 cores, so the process will happen 100/P = 25 times on each core. The second argument of clusterApply is a sequence of numbers that gets passed to each worker as the (first) argument of func1. Below, I’ve specified that the value 50 (number of reps) should be passed separately to 100 different workers.

W <- 100
P <- detectCores(logical=FALSE)
cl <- makeCluster(P)

func1 <- function(reps){
  max(rcauchy(reps))
}


clusterApply(cl, rep(50,W), fun = func1) %>% head(3)
## [[1]]
## [1] 4.876291
## 
## [[2]]
## [1] 25.30384
## 
## [[3]]
## [1] 68.19195

stopCluster(cl)

There are many R functions which implement parallel processing. For example, the same code from above can be processed using foreach.

library(doParallel)
## Loading required package: foreach
## Loading required package: iterators
cl <- parallel::makeCluster(P)

doParallel::registerDoParallel(cl)
foreach(reps = rep(50, 100), .combine = 'c') %dopar% {
  max(rcauchy(reps))
       } %>% head(3)
## [1] 210.93402  12.23342 116.20735

stopCluster(cl)

Example bootstrapping

A slightly less embarrassingly parallel example comes with bootstrapping. Below we have used parallel implementation to bootstrap the mean of the iris data petal length (Virginica only).

data(iris)

iris_bs <- iris %>%
  filter(Species == "virginica") %>%
  select(Petal.Length)
cl <- parallel::makeCluster(P)

doParallel::registerDoParallel(cl)
bsmean_PL <- foreach(i = 1:100, .combine = 'c') %dopar% {
  mean(sample(iris_bs$Petal.Length, replace = TRUE))
}
bootstrap <- tibble(bsmean_PL)
stopCluster(cl)

ggplot(bootstrap, aes(x = bsmean_PL)) + geom_histogram(bins = 25) + ggtitle("Histogram of 100 Bootstrapped Means using foreach")

Spark and sparklyr

Some of you may be familiar with Apache Spark which is an open-source product for distributed cluster-computing. You may want to learn more about its capabilities, including scheduling workflow, dispatching tasks, and consolidating end results. While incredibly powerful, there has historically been a steep learning curve to getting R to work smoothly with a Spark connection. Recently, RStudio has come out with a new package sparklyr which integrates R and Spark seamlessly. Note that in the example below, we’ve set up a local connection just for the purposes of the example. For your work, you may want to connect to a cluster or cloud space with many cores.

The RStudio sparklyr webpage provides a plethora of good examples demonstrating the sophistication and power of the technology. sparklyr has particularly strong connections to the suite of tidyverse functions. Indeed, the power of sparklyr is more about distributing the computing than about parallelizing it. For example, with sparklyr the computations are delayed until you need the results. Additionally, Spark is doing the heavy lifting and only at the very end (when your results are called) do you need to worry about the size of the table, results, or computational space. The example below repeats the bootstrapping work that was done previously.

Note, it is important to look at your data structures and variables names. For example, when copying the local dataframe iris_samps to the remote data source called iris_samps_tbl, the variable Petal.Length was changed to Petal_Length.

library(sparklyr)
spark_install()

sc <- spark_connect(master = "local")

n_sim = 100
iris_samps <- iris %>% dplyr::filter(Species == "virginica") %>%
  sapply(rep.int, times=n_sim) %>% cbind(replicate = rep(1:n_sim, each = 50)) %>% 
  data.frame() %>%
  dplyr::group_by(replicate) %>%
  dplyr::sample_n(50, replace = TRUE)

iris_samps_tbl <- copy_to(sc, iris_samps)

iris_samps_tbl %>% 
  spark_apply(function(x) {mean(x$Petal_Length)}, 
    group_by = "replicate") %>%
  ggplot(aes(x = result)) + geom_histogram(bins = 20) + ggtitle("Histogram of 100 Bootstrapped Means using sparklyr")


spark_disconnect(sc)

For our particular application, the adept reader has probably noticed that the average of a variable using group_by is a very quick and easy task for dplyr. Indeed, the use of sparklyr above is overkill and is presented only as a way to demonstrate using sparklyr. We recommend working through RStudio’s help pages on sparklyr if you or your students are working with big datasets that require large computing infrastructure. Additionally, there are many instances of working with Spark in the wild, and you might consider a class activity of working through someone else’s Spark analysis like this fantastic example on splitting up large amounts of raw DNA sequencing to get data for a given genetic location.

iris_samps %>% dplyr::group_by(replicate) %>%
  dplyr::summarize(result = mean(Petal.Length)) %>%
  ggplot(aes(x = result)) + geom_histogram(bins = 25) + ggtitle("Histogram of 100 Bootstrapped Means using dplyr")
## `summarise()` ungrouping output (override with `.groups` argument)

While we continue to believe that introducing students to modern technology early and often will help them become more adept and less apprehensive about using the tools, we also recognize that students need sufficient background in computer science to be able to fully engage with principles of high performance computing. You are likely to know your student audience well enough to determine the benefits of introducing ideas of parallel processing and distributed computing.

About this blog

Each day during the summer of 2019 we intend to add a new entry to this blog on a given topic of interest to educators teaching data science and statistics courses. Each entry is intended to provide a short overview of why it is interesting and how it can be applied to teaching. We anticipate that these introductory pieces can be digested daily in 20 or 30 minute chunks that will leave you in a position to decide whether to explore more or integrate the material into your own classes. By following along for the summer, we hope that you will develop a clearer sense for the fast moving landscape of data science. Sign up for emails at https://groups.google.com/forum/#!forum/teach-data-science (you must be logged into Google to sign up).

We always welcome comments on entries and suggestions for new ones.