A Compelling Case for SparkR
Spark is the ubiquitous Big Data framework that makes it easy to process data at scale. It is mostly used with Scala and Python, but the R based API is also gaining a lot of popularity.
In this article I plan on touching a few key points about using Spark with R, focusing on the Machine Learning part of it.
Target audience
When it comes to doing Machine Learning with Spark, this article targets two types of people:
- Data scientists and statisticians who prefer using R or do not have Python experience. Many are now facing the challenges of Big Data.
- Another audience of this article are software engineers used to working with Scala and Spark who need to implement custom machine learning projects at scale. They do not feel like applying ML algorithms from scratch and would more likely be interested in using libraries that someone else battle tested first.
API comparison
Spark features 4 API:
- Scala a.k.a. vanilla Spark
- Python a.k.a PySpark
- R a.k.a SparkR
- Java
Using Scala with Spark is consistent, reliable and easy. Moreover, the latest features are first available in the Scala API. However, I have discovered that when it comes to doing data science specific operations, Scala is not always the easiest language to work with.
The R binding for Spark is a more recent addition. Some of the niceties of Spark are not readily available in R. Also, SparkR is usually the last API to receive updates.
Python can also be used to work with Spark and is a good mix between the advantages of R and Scala. If you are using PySpark right now, there aren’t that many reasons to make you switch to either R or Scala.
There’s a Java API as well, not much to say over here. It’s not that widespread and very verbose.
Spark MLlib limitations
Spark exposes a robust framework for doing Machine Learning, called MLlib, that is available for all 4 API (to various degrees). The algorithms available are some of the most popular and established ones and cover a broad spectrum of needs. They have been carefully crafted so that they can efficiently run at scale.
However, if you need to run a more exotic algorithm or just one that is not available in the Spark MLlib, you pretty much out of luck. There might be some package available for you (like KNN which is not part of the main API), but that’s a slim chance.
At this point, if you are using Scala, your only option is to implement the algorithm yourself. Try searching for a JVM implementation of Divided-Conquer Support Vector Machines for example, there isn’t anything out there as far as I could find.
On the other hand, if you are interested exclusively in data pipelining operations (filter, select, group, etc.) you’re probably better off using Scala rather than PySpark or SparkR.
SparkR vs sparklyr
The R community is vibrant, and the rise of Spark has not gone unnoticed. The kind folks at RStudio have contributed the other popular Spark package called sparklyr. I have got into Spark with R because of this package.
The sparklyr package is excellent for filtering, grouping and pipelining operations. If you are familiar with dplyr, sparklyr will feel right at home. However, doing more advanced Machine Learning operations is difficult outside the functionality of MLlib, on which sparklyr depends.
I see no obvious advantage to using sparklyr as opposed to SparkR, other than the cases where it just feels more comfortable to do ETL that way. Databricks just published an article that refers to sparklyr as being complementary to SparkR, but as far as I can see, in term of actual features, SparkR overlaps sparklyr completely. I hope this will change in the future.
Update on 2017-08-01
With its 0.6 release, sparklyr has introduced distributed arbitrary code execution as referenced here. I have not had a chanche to try it in practice, but I am confident that it work’s similar to SparkR’s adpply
referenced over here.
The case for SparkR
When it comes to performance and features, SparkR has absolutely nothing going for it against Spark…except an incredible wealth of data science packages available via various repositories (CRAN being the most well known one).
SparkR users can immediately benefit from existing libraries that implement cutting-edge algorithms, all while being able to run at scale.
There is no waiting involved when trying to use that bleeding edge library which implements a novel approach to regression, described in a paper battle-tested on a poster four days ago at a conference in Canada and which seems to be precisely what your project needs.
SparkR provides a very appealing scenario to be in:
- Access to tons of ready-made libraries that work fine on small data.
- A framework that can efficiently leverage arbitrary algorithms against Big Data.
The architecture is based on splitting Big Data into many small data sets, which can actually be processed in many R runtime instances.
Applying regression with SparkR
To exemplify how SparkR can be used, I will look at a regression problem.
Let’s assume that Stack Overflow had a bug and stopped registering the view count for new questions. The task right now is to find a way to predict the missing values.
I have at my disposal 673.639 questions containing the view count. I also have 12.799.157 questions without the view count.
Let’s first look at the training data:
'data.frame': 673639 obs. of 6 variables:
$ has_accepted_answer: int 1 1 1 0 1 1 0 1 0 1 ...
$ answer_count : int 2 1 2 0 1 6 1 1 0 1 ...
$ comment_count : int 5 4 1 2 0 5 6 2 9 0 ...
$ favorite_count : int 6 0 0 0 0 0 0 0 0 1 ...
$ score : int 11 1 0 0 0 2 0 0 1 0 ...
$ view_count : int 716 113 72 13 27 1163 61 139 368 213 ...
For the regression model, I have chosen to use the Cubist package, which features a method not available in MLlib. If I would be using Scala and insisted on using the Cubist extension, I had to code the implementation myself.
Fitting a model
The code for fitting a model is short and sweet. I am using the excellent mlr library package that helps with creating consistent ML pipelines. In the end, we are left with a model that can be used for making predictions. The model will be heavy, in the sense that one prediction will take a long time to compute. This, of course, makes the challenge even more interesting.
library(readr) # For reading the .csv files
library(dplyr) # For working with data frames
library(mlr) # For ML pipelines
# Read the data
training_data <- read_csv("training_data.csv") %>% data.frame()
# Define the target task
task <- makeRegrTask(data = training_data, target = "view_count")
# Define the Cubist learner. You'll have to install the package
lrn <- makeLearner("regr.cubist", committees = 3, neighbors = 9)
# Test the learner with cross validation
rdesc = makeResampleDesc("CV")
resample(lrn, task, rdesc, keep.pred = FALSE)
# Fit a model when cross-validation gives good results
fit <- train(lrn, task)
# Save the mlr model to disk
saveRDS(fit, "cubist.model")
Making predictions with R alone
Having a model ready we can now make predictions on the incomplete dataset.
library(readr)
library(dplyr)
library(mlr)
incomplete_data <- read_csv("incomplete_data.csv") %>%
data.frame()
fit <- readRDS("cubist.model")
pred <- predict(fit, newdata = incomplete_data)
The execution takes 7.66 hours. That’s a lot of time to wait when predicting on a fairly small dataset.
Making predictions with SparkR
Here is the code for making the predictions in a parallel distributed fashion with SparkR.
library(magrittr) # Provides the pipe operator
# Load the library from where Spark is installed.
# There is no CRAN package
library(SparkR, lib.loc = c(file.path("/usr/lib/spark/R/lib")))
# Start a Spark session on the cluster
sparkR.session(master = "yarn",
sparkConfig = list(spark.executor.memory = "3500m",
spark.executor.cores = "1",
spark.executor.instances = "20"))
# This is a Spark data frame
sdf <- read.df("s3://my-bucket/incomplete_data.csv", "csv",
header = "true", inferSchema = "true") %>%
repartition(20)
sdf %>% cache() # Cache the data
sdf %>% count() # Materialize the cache
# Input to the function is an R data frame
getPredictions <- function(df) {
# These libraries need to be installed on all nodes in the cluster
library(dplyr) # For data frames
library(mlr) # For machine learning pipelines
# Load the mlr model distributed to the cluster
fit <- readRDS("/tmp/cubist.model")
# Make the predictions and return an R data frame
predict(fit, newdata = df)$data
}
# The schema of the data frame returned from the lambda function
outputSchema <- structType(structField("prediction", "double"))
# Make predictions on the dataset in a distributed manner
predictions <- sdf %>%
dapply(getPredictions, outputSchema) %>%
collect()
head(predictions)
# Stop the Spark session
sparkR.session.stop()
This takes 0.85 hours. That’s a whole 9 times faster.
There are some key points here:
- It is essential that the
getPredictions
outputs a data frame matching theoutputSchema
exactly. - The model
cubist.model
must be placed in the same location/tmp/cubist.model
on all the worker instances in the cluster. - The
dplyr
,mlr
andCubist
packages need to be installed on all of the worker instances in the cluster. - The financial cost of the SparkR implementation was lower than for the basic implementation.
Why was the SparkR implementation so much faster?
There is no magic in why the SparkR implementation is faster. The distributed architecture in which the workload is spread onto many workers drastically reduces the time it takes to make all the predictions. It’s mainly the map-reduce paradigm.
The code for making the same predictions with the help of SparkR is, in essence, the same. I just wrap it in Spark functionality. No need to modify something to make it work with SparkR, just add the SparkR functionality.
What are the cases where SparkR is the better choice?
SaprkR is an excellent choice when the ML algorithm is only available in R. After building the model. There are two ways it can be applied:
- Running the model per row, using the
SparkR::dapply
function. Scaling is done by spreading the whole data set in smaller data sets. Those smaller datasets are then worked on just like in a normal R script. This article illustrates this method. - Running the model per group, using the
SparkR::gapply
function. A clustering model is an excellent example of this type of approach is useful. Again, the workload is distributed.
Why should R users care about this?
The time it takes to iterate over models is dramatically decreased by using SparkR with large-scale data sets.
Why should Scala users care about this?
Without too much hassle I was able to apply a readily made regression algorithm for which I would have otherwise had to hand code a solution. If you use Scala to work on Spark and you reach the point when you need to do a little bit of ML, R might be a nice way to advance.
Final remarks
I hope the introduction, along with the practical example will get some people excited about using SparkR.
If you don’t have experience with both R and Spark, there might be some things that are harder to digest. Use the comments section for more information if needed. I have tried to focus on the code part, and have deliberately ignored setting up clusters and other operations required for the experiments.
Before you get started with Spark, know that distributed systems are not easy to handle. Spark makes a good effort to let you focus on the business side of things, but you will probably need to get your hands dirty with tuning memory, disk space and other things sooner or later.
For anyone who has made it this far, here is a link to the GitHub repository with the experiment along with the instructions on how to run it.
Comments