SparkR本來是由UC BERKELEY 的AMPLab(https://amplab.cs.berkeley.edu/) 所開發的一個R的套件(package)。
首先在2014年的1月發佈,後來在Open Source的社群內茁壯成長。
直到最近,SparkR套件正式被Apache Saprk納入到1.4版本裡。
因此,現在只要下載Spark,便可直接使用R語言來操作Spark。(過去的作法,要另外從Github下載SparkR套件)
要使用SparkR之前,需要先進行兩個步驟:
(1) 安裝Spark
(2) 安裝R (Rstudio)
Spark的安裝網址如下:https://spark.apache.org/downloads.html
並且依照下圖進行設定,下載Spark壓縮檔,並將檔案解壓縮。
Rstudio 的安裝網址如下:https://www.rstudio.com/
R/Rstudio 安裝教學:https://www.dotblogs.com.tw/michael80321/2014/12/15/147656
當Spark與R安裝好以後,基本工作已經完成80%了!
至於剩下的20%也不用想得太難:「要從Spark的檔案中,找到SparkR的套件來使用。」
要達到以上目標,需要先在R裡面進行一些設定:使用的Sys.setenv(),並在裡面進行路徑設定(也就是解壓縮後的Spark檔案位置)。
以下圖為例,因為Spark的檔案放在C槽,因此路徑(SPARK_HOME)就設定為 “C:\spark-1.5.2-bin-hadoop2.6”:
# Set the system environment variables
Sys.setenv(SPARK_HOME = "C:\\spark-1.5.2-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) #為了使用Spark所提供的SparkR的package
在使用Spark前,得先利用sparkR.init() 建立Spark的連結。
而sparkRSQL.init()會建立一個Spark的DataFrame,可以想成在Spark裡,資料是以R的data.frame方式呈現。
#load the Sparkr library
require(SparkR)
# Create a spark context and a SQL context
sc <- sparkR.init(master = "local")
sqlContext <- sparkRSQL.init(sc)
SparkR可以讓我們把R的data.frame,丟到Spark裡進行運算。
例如,我在下面的程式碼中,先建立一個exercise.df(data frame):
#### Create a sparkR DataFrame from a R DataFrame ####
exercise.df <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18)) # a R_dataFrame
exercise.df
## name age
## 1 John 19
## 2 Smith 23
## 3 Sarah 18
然後利用createDataFrame() ,把R的DataFrame,轉變成spark的DataFrame形式。
spark.df <- createDataFrame(sqlContext, exercise.df) # convert R_dataFrame to spark_sql_dataFrame
head(spark.df)
## name age
## 1 John 19
## 2 Smith 23
## 3 Sarah 18
printSchema(spark.df) # print out the spark_sql_dataFrame's schema
## root
## |-- name: string (nullable = true)
## |-- age: double (nullable = true)
class(spark.df)
## [1] "DataFrame"
## attr(,"package")
## [1] "SparkR"
當然,我們也可以把spark的DataFrame,變回R的DataFrame
r.df <- collect(spark.df)
class(r.df)
## [1] "data.frame"
此外,我們可以利用sql的語法,對Spark內的資料進行查詢。
這裡要注意的是,使用sql語法查詢之前,必須先用registerTempTable(),在spark裡建立一個可查詢的table:
# Running SQL Queries from SparkR
registerTempTable(spark.df, "test")
sql_result <- sql(sqlContext, "SELECT name FROM test WHERE age > 19 AND age < 24")
head(sql_result)
## name
## 1 Smith
在資料處理上面,SparkR 支援許多R的function。
這裡以黃石公園的老忠實間歇泉資料為例子:
head(faithful)
## eruptions waiting
## 1 3.600 79
## 2 1.800 54
## 3 3.333 74
## 4 2.283 62
## 5 4.533 85
## 6 2.883 55
spark.df <- createDataFrame(sqlContext, faithful)
(1)挑選特定的欄位(Column):
# Select only the "eruptions" column
head(select(spark.df, "eruptions"))
## eruptions
## 1 3.600
## 2 1.800
## 3 3.333
## 4 2.283
## 5 4.533
## 6 2.883
(2)根據條件式,篩選特定的資料集:
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(spark.df, spark.df$waiting < 50))
## eruptions waiting
## 1 1.750 47
## 2 1.750 47
## 3 1.867 48
## 4 1.750 48
## 5 2.167 48
## 6 2.100 49
(3)新增欄位/對欄位進行運算:
# Convert waiting time from hours to seconds.
spark.df$waiting_secs <- spark.df$waiting * 60
head(spark.df)
## eruptions waiting waiting_secs
## 1 3.600 79 4740
## 2 1.800 54 3240
## 3 3.333 74 4440
## 4 2.283 62 3720
## 5 4.533 85 5100
## 6 2.883 55 3300
(4)分組計算(Grouping/Aggregate):
waiting_freq <- summarize(groupBy(spark.df, spark.df$waiting), count = n(spark.df$waiting))
head(waiting_freq)
## waiting count
## 1 81 13
## 2 60 6
## 3 93 2
## 4 68 1
## 5 47 4
## 6 80 8
(5)排序(sort):
sort_waiting_freq <- arrange(waiting_freq, desc(waiting_freq$count))
head(sort_waiting_freq)
## waiting count
## 1 78 15
## 2 83 14
## 3 81 13
## 4 77 12
## 5 82 12
## 6 84 10
SparkR也支援許多Machine Learning相關的模型,主要來自於Spark內建的MLlib(Machine Learning Library)。
這裡拿R內建的鳶尾花資料(iris),進行回歸分析:
# Create the DataFrame
df <- createDataFrame(sqlContext, iris)
# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")
# Model coefficients are returned in a similar format to R's native glm().
summary(model)
## $coefficients
## Estimate
## (Intercept) 2.2513930
## Sepal_Width 0.8035609
## Species__versicolor 1.4587432
## Species__virginica 1.9468169
# Make predictions based on the model.
predictions <- predict(model, newData = df)
head(select(predictions, "Sepal_Length", "prediction"))
## Sepal_Length prediction
## 1 5.1 5.063856
## 2 4.9 4.662076
## 3 4.7 4.822788
## 4 4.6 4.742432
## 5 5.0 5.144212
## 6 5.4 5.385281
#### Stop the SparkContext ####
sparkR.stop()
本文是參考Spark官網上的範例進行練習,Reference from https://spark.apache.org/docs/latest/sparkr.html
由於以上練習是在單機(windows 7)上進行,因此並沒有納入平行處理的例子。
日後若有機會在多機台上架設Spark,會再補齊相關的程式碼。(2015/12/09)