簡介

SparkR本來是由UC BERKELEY 的AMPLab(https://amplab.cs.berkeley.edu/) 所開發的一個R的套件(package)。
首先在2014年的1月發佈,後來在Open Source的社群內茁壯成長。
直到最近,SparkR套件正式被Apache Saprk納入到1.4版本裡。
因此,現在只要下載Spark,便可直接使用R語言來操作Spark。(過去的作法,要另外從Github下載SparkR套件)

要使用SparkR之前,需要先進行兩個步驟:
(1) 安裝Spark
(2) 安裝R (Rstudio)


(1)

Spark的安裝網址如下:https://spark.apache.org/downloads.html
並且依照下圖進行設定,下載Spark壓縮檔,並將檔案解壓縮。

(2)

Rstudio 的安裝網址如下:https://www.rstudio.com/
R/Rstudio 安裝教學:https://www.dotblogs.com.tw/michael80321/2014/12/15/147656


‧SparkR基本設定

當Spark與R安裝好以後,基本工作已經完成80%了!
至於剩下的20%也不用想得太難:「要從Spark的檔案中,找到SparkR的套件來使用。」
要達到以上目標,需要先在R裡面進行一些設定:使用的Sys.setenv(),並在裡面進行路徑設定(也就是解壓縮後的Spark檔案位置)。
以下圖為例,因為Spark的檔案放在C槽,因此路徑(SPARK_HOME)就設定為 “C:\spark-1.5.2-bin-hadoop2.6”:

R程式碼(環境設定):

# Set the system environment variables
Sys.setenv(SPARK_HOME = "C:\\spark-1.5.2-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) #為了使用Spark所提供的SparkR的package

‧建立Spark連結與spark_sql的資料

在使用Spark前,得先利用sparkR.init() 建立Spark的連結。
sparkRSQL.init()會建立一個Spark的DataFrame,可以想成在Spark裡,資料是以R的data.frame方式呈現。

#load the Sparkr library
require(SparkR)
# Create a spark context and a SQL context
sc <- sparkR.init(master = "local")
sqlContext <- sparkRSQL.init(sc)

‧SparkR的基本操作

SparkR可以讓我們把R的data.frame,丟到Spark裡進行運算。
例如,我在下面的程式碼中,先建立一個exercise.df(data frame):

#### Create a sparkR DataFrame from a R DataFrame ####
exercise.df <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18)) # a R_dataFrame
exercise.df
##    name age
## 1  John  19
## 2 Smith  23
## 3 Sarah  18

然後利用createDataFrame() ,把R的DataFrame,轉變成spark的DataFrame形式。

spark.df <- createDataFrame(sqlContext, exercise.df) # convert R_dataFrame to spark_sql_dataFrame
head(spark.df)
##    name age
## 1  John  19
## 2 Smith  23
## 3 Sarah  18
printSchema(spark.df) # print out the spark_sql_dataFrame's schema
## root
##  |-- name: string (nullable = true)
##  |-- age: double (nullable = true)
class(spark.df)
## [1] "DataFrame"
## attr(,"package")
## [1] "SparkR"

當然,我們也可以把spark的DataFrame,變回R的DataFrame

r.df <- collect(spark.df)
class(r.df)
## [1] "data.frame"

此外,我們可以利用sql的語法,對Spark內的資料進行查詢。
這裡要注意的是,使用sql語法查詢之前,必須先用registerTempTable(),在spark裡建立一個可查詢的table:

# Running SQL Queries from SparkR
registerTempTable(spark.df, "test")
sql_result <- sql(sqlContext, "SELECT name FROM test WHERE age > 19 AND age < 24")
head(sql_result)
##    name
## 1 Smith

‧資料處理手法

在資料處理上面,SparkR 支援許多R的function。
這裡以黃石公園的老忠實間歇泉資料為例子:

head(faithful)
##   eruptions waiting
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85
## 6     2.883      55
spark.df <- createDataFrame(sqlContext, faithful)

(1)挑選特定的欄位(Column):

# Select only the "eruptions" column
head(select(spark.df, "eruptions"))
##   eruptions
## 1     3.600
## 2     1.800
## 3     3.333
## 4     2.283
## 5     4.533
## 6     2.883

(2)根據條件式,篩選特定的資料集:

# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(spark.df, spark.df$waiting < 50))
##   eruptions waiting
## 1     1.750      47
## 2     1.750      47
## 3     1.867      48
## 4     1.750      48
## 5     2.167      48
## 6     2.100      49

(3)新增欄位/對欄位進行運算:

# Convert waiting time from hours to seconds.
spark.df$waiting_secs <- spark.df$waiting * 60
head(spark.df)
##   eruptions waiting waiting_secs
## 1     3.600      79         4740
## 2     1.800      54         3240
## 3     3.333      74         4440
## 4     2.283      62         3720
## 5     4.533      85         5100
## 6     2.883      55         3300

(4)分組計算(Grouping/Aggregate):

waiting_freq <- summarize(groupBy(spark.df, spark.df$waiting), count = n(spark.df$waiting))
head(waiting_freq) 
##   waiting count
## 1      81    13
## 2      60     6
## 3      93     2
## 4      68     1
## 5      47     4
## 6      80     8

(5)排序(sort):

sort_waiting_freq <- arrange(waiting_freq, desc(waiting_freq$count))
head(sort_waiting_freq) 
##   waiting count
## 1      78    15
## 2      83    14
## 3      81    13
## 4      77    12
## 5      82    12
## 6      84    10

‧建立模型

SparkR也支援許多Machine Learning相關的模型,主要來自於Spark內建的MLlib(Machine Learning Library)
這裡拿R內建的鳶尾花資料(iris),進行回歸分析:

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)
# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")
# Model coefficients are returned in a similar format to R's native glm().
summary(model)
## $coefficients
##                      Estimate
## (Intercept)         2.2513930
## Sepal_Width         0.8035609
## Species__versicolor 1.4587432
## Species__virginica  1.9468169
# Make predictions based on the model.
predictions <- predict(model, newData = df)
head(select(predictions, "Sepal_Length", "prediction"))
##   Sepal_Length prediction
## 1          5.1   5.063856
## 2          4.9   4.662076
## 3          4.7   4.822788
## 4          4.6   4.742432
## 5          5.0   5.144212
## 6          5.4   5.385281

‧最後,別忘了關掉與Spark之間的連結噢!

#### Stop the SparkContext ####
sparkR.stop()

‧附註

本文是參考Spark官網上的範例進行練習,Reference from https://spark.apache.org/docs/latest/sparkr.html

由於以上練習是在單機(windows 7)上進行,因此並沒有納入平行處理的例子。
日後若有機會在多機台上架設Spark,會再補齊相關的程式碼。(2015/12/09)