當 R 尬上 Hadoop,到底會激盪出什麼樣的火花呢?下面實際跑過書上的範例來為您介紹,三種不一樣的整合方式任君挑選。
目錄
Hadoop & R
R and Streaming
Rhipe
RHadoop
安裝 R
想知道它是 Fedora, Debian, 還是Ubuntu 等等的 Linux distribution version:
1
2
3
$ cat /proc /version
Linux version 2 .6 .32 -358 .0 .1 .el6.x86_64 (mockbuild@c6b10.bsys.dev.centos.org) (gcc version 4 .4 .7 20120313 (Red Hat 4 .4 .7 -3 ) (GCC ) )
所以選擇安裝 Red Hat el6 => 官網 竟然沒有,只好自己谷歌 安裝 R-2.15.2-1.el6.x86_64.rpm 。
安裝 Hadoop
參考 Hadoop 第一次安裝就上手 CDH3 。
範例資料來源
下載 hadoop book example: https://github.com/alexholmes/hadoop-book :
1
$ git clone git@github .com: alexholmes/hadoop-book.git
整合一:R and Streaming
可以使用任何程式語言來實作 map/reduce,只要規定好標準的input/output。 好處:在本機端就可以很簡單的做測試,不需要真正去涉及 MapReduce 就能 test,並且透過 cmd 來執行 hadoop job。 這邊透過兩個例子(map-only、Full MapReduce)來演示。
一、map-only job, problem: 計算 stocks 每日的平均
不 care join or group data in reducer。
其中 input csv file 的 element 為 Symbol,Date,Open,High,Low,Close,Volume,Adj Close
日平均算法:開盤(open)跟收盤(close)的平均。
step1: 首先先在本機端(Mac)上跑看看:
1
2
3
4
5
6
7
$ cat test-data/stocks.txt | src/main/R/ch8/stock_day_avg.R
AAPL 2009 -01 -02 88.315
AAPL 2008 -01 -02 197.055
AAPL 2007 -01 -03 85.045
...
step2: 接著丟上 hadoop 來看看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ export HADOOP_HOME =/usr/java /hadoop-1.0.4/
$ ${ HADOOP_HOME }/bin/hadoop fs -rmr output
$ ${ HADOOP_HOME }/bin/hadoop fs -put test-data/stocks.txt \stocks.txt
$ ${ HADOOP_HOME }/bin/hadoop fs -ls
$ ${ HADOOP_HOME }/bin/hadoop \
jar ${ HADOOP_HOME }/contrib/streaming/*.jar \
-D mapreduce.job.reduces=0 \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-input stocks.txt \
-output output \
-mapper `路徑要設`/src/main/R /ch8/stock_day_avg.R \
-file `路徑要設`/src/main/R /ch8/stock_day_avg.R
$ /usr/java/hadoop-1.0 .4 /bin/hadoop jar /usr/java/hadoop-1.0 .4 /contrib/streaming/hadoop-streaming-1.0 .4 .jar -D mapreduce.job.reduces=0 -inputformat org.apache.hadoop.mapred.TextInputFormat -input stocks.txt -output output -mapper src/main/R /ch8/stock_day_avg.R -file src/main/R /ch8/stock_day_avg.R
$ ${ HADOOP_HOME }/bin/hadoop fs -cat output/part*
AAPL 2008 -01 -02 197.055
AAPL 2009 -01 -02 88.315
AAPL 2007 -01 -03 85.045
AAPL 2006 -01 -03 73.565
AAPL 2005 -01 -03 64.035
...
結果與本機端相同。
step3: 來看看 R 的 code on github :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
options(warn=-1 )
sink("/dev/null" )
input <- file("stdin" , "r" )
while (length(currentLine <- readLines(input, n=1 , warn=FALSE )) > 0 ) {
fields <- unlist(strsplit(currentLine, "," ))
lowHigh <- c(as.double(fields[3 ]), as.double(fields[6 ]))
stock_mean <- mean(lowHigh)
sink()
cat(fields[1 ], fields[2 ], stock_mean, "\n" , sep="\t" )
sink("/dev/null" )
}
close(input)
二、Full MapReduce job, problem: 計算 stocks 累積移動平均(CMA)
step1: 同樣地先在本機端(Mac)上跑看看:
1
2
3
4
5
6
7
8
9
# cat | 表示 把前一次運行的結果往後面丟。
# map | runtime sort (用 linux cmd 指令模仿的) | reduce
$ cat test-data/stocks.txt | src/main/R/ch8/stock_day_avg.R | sort --key 1 ,1 | src/main/R/ch8/stock_cma.R
# =>
AAPL 68.997
CSCO 30.8985
GOOG 419.943
MSFT 44.6725
YHOO 70.971
step2: 接著丟上 hadoop 來看看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ export HADOOP_HOME=/usr/java/hadoop-1.0.4/
$ ${HADOOP_HOME}/bin/hadoop fs -rmr output
$ ${HADOOP_HOME}/bin/hadoop fs -put test-data/stocks.txt \stocks.txt
$ ${HADOOP_HOME}/bin/hadoop \
jar ${HADOOP_HOME}/contrib/streaming/*.jar \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-input stocks.txt \
-output output \
-mapper src/main/R /ch8/stock_day_avg.R \
-reducer src/main/R /ch8/stock_cma.R \
-file src/main/R /ch8/stock_day_avg.R \
-file src/main/R /ch8/stock_cma.R
$ ${HADOOP_HOME}/bin/hadoop fs -cat output/part*
# =>
AAPL 68.997
CSCO 49.94775
GOOG 123.9468
MSFT 101.297
YHOO 94.55789
但其實,答案跟預期不同,似乎算錯了!
step3: mapper 的 code 同 problem 1 的 mapper,所以我們來看看 R reducer
的 code on github :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
options(warn=-1 )
sink("/dev/null" )
outputMean <- function (stock, means) {
stock_mean <- mean(means)
sink()
cat(stock, stock_mean, "\n" , sep="\t" )
sink("/dev/null" )
}
input <- file("stdin" , "r" )
prevKey <- ""
means <- numeric(0 )
while (length(currentLine <- readLines(input, n=1 , warn=FALSE )) > 0 ) {
fields <- unlist(strsplit(currentLine, "\t" ))
key <- fields[1 ]
mean <- as.double(fields[3 ])
if ( identical(prevKey, "" ) || identical(prevKey, key)) {
prevKey <- key
means <- c(means, mean)
} else {
outputMean(prevKey, means)
prevKey <- key
means <- numeric(0 ) vector
means <- c(means, mean)
}
}
if (!identical(prevKey, "" )) {
outputMean(prevKey, means)
}
close(input)
注意 line 33,更改後才會得到正確的答案。
整合二:Rhipe
Client-side R and Hadoop working together。 「R and Hadoop Integrated Processing Envirnment」的縮寫,在 R 裡面運行 Hadoop job。
先安裝 Protocol Buffers ,下載protobuf-2.4.1.tar.gz
,一定要這個版本,記得看 readme 安裝方法。
Protocol Buffers are a way of encoding structured data in an efficient yet extensible format. Google uses Protocol Buffers for almost all of its internal RPC protocols and file formats. think XML, but smaller, faster, and simpler.
1
2
3
4
5
$ cd protobuf-2.4 .1
$ ./configure
$ make
$ make check
$ make install
挑一個最多人下載的版本 Rhipe_0.69.tar.gz 。
1
2
3
4
5
6
7
8
9
10
$ export PKG_CONFIG_PATH= /usr/local/lib/pkgconfig
$ export HADOOP= /usr/java/hadoop-1.0 .4
$ export HADOOP_LIB= $HADOOP /lib
$ export HADOOP_CONF_DIR= $HADOOP /conf
$ /sbin/ldconfig
$ R CMD INSTALL /root/Downloads/Rhipe_0. 69 .tar.gz
$ R
> library(Rhipe)
二、同 problem 2 計算 stocks 累積移動平均(CMA)
1
2
3
4
5
$ export HADOOP_HOME =/usr/java /hadoop-1.0.4/
$ ${ HADOOP_HOME }/bin/hadoop fs -rmr output
$ ${ HADOOP_HOME }/bin/hadoop fs -put test-data/stocks.txt /tmp/stocks.txt
$ export HADOOP_BIN =/usr/java /hadoop-1.0.4/bin
$ hadoop-book-master/src/main/R /ch8/stock_cma_rhipe.R
接著看一下 整合 rhipe 的 R code on github
三、問題討論
亂碼?
結果數據與第一個整合不一樣:已解決,問題出在整合一的 sample code 算錯。
整合三:RHadoop
相較簡單的整合 Client-side R and Hadoop,同Rhipe可在 R 裡面運行 Hadoop job。
RHadoop其中包含了三個元件
rmr (整合 R、Hadoop) ,範例只專注在 rmr 上。
rdfs(R 的 HDFS 界面)
rhbase(R 的 Hbase 界面)
一、安裝
前置套件安裝
1
2
3
4
5
6
7
8
9
10
11
12
13
$ R
> install.packages("RJSONIO" )
> install.packages("itertools" )
> install.packages("digest" )
> q()
$ R CMD javareconf
$ R
> install.packages("rJava" )
> install.packages("Rcpp" )
> install.packages("functional" )
> install.packages("stringr" )
> install.packages("reshape2" )
安裝 rmr2,下載 rmr2-2.2.0 ,中途可能會需要你安裝 R 套件。
1
$ R CMD INSTALL rmr2_2.2.0 .tar.gz
二、同 problem 2 再來計算一次 stocks 累積移動平均(CMA)
1
2
3
4
$ export HADOOP_HOME =/usr/java /hadoop-1.0.4/
$ ${ HADOOP_HOME }/bin/hadoop fs -rmr output
$ $HADOOP_HOME /bin/hadoop fs -put test-data/stocks.txt stocks.txt
$ hadoop-book-master/src/main/R /ch8/stock_cma_rmr.R
遇到 argument ERROR:
1
2
3
Error in mapreduce(input = "stocks.txt" , output = "output" , textinputformat = rawtextinputformat, :
unused argument(s) (textinputformat = rawtextinputformat, textoutputformat = kvtextoutputformat)
Execution halted
因為新版 rmr2 format參數 改了,請改成 vi src/main/R/ch8/stock_cma_rmr.R
on github :
1
2
3
4
5
6
7
8
9
10
11
library (rmr2)
...
mapreduce(
input = "stocks.txt" ,
output = "output" ,
input.format = "text" ,
output.format = "text" ,
map = map,
reduce = reduce
)
最後,
1
2
3
4
5
$ ${ HADOOP_HOME }/bin/hadoop fs -cat output/part*
AAPL 88.315
GOOG 428.875
發現是 map 的 input 格式出錯,更改 sample code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#! /usr/bin/env Rscript
library(rmr2)
map <- function (k,v) {
fields <- unlist(strsplit(v , "," ))
keyval(fields[1 ], mean(as .double(c (fields[3 ], fields[6 ]))))
}
reduce <- function (k,vv) {
keyval(k , mean(as .numeric(unlist(vv))))
}
mapreduce(
input = "stocks.txt" ,
output = "output" ,
input .format = "text" ,
output.format = "text" ,
map = map ,
reduce = reduce
)
問題討論
為什麼只有兩筆資料? 是 input format 錯誤??
Reference