ここ最近の授業でもPythonのMulti processingのモジュールを使って、Mapper, Partitioner, Reducerを組み合わせて、いろいろ集計したり、回帰分析のプログラムを書いたりしています。 徐々にですが、Map Reduceがどういうもので、どこをパラレルに計算させて、集計とるかみたいな勘がわかってきました。
で、今日は、1ヶ月くらい前にHadoopのセットアップをやりましたが、その後ほったらかしていたので、授業で勉強したことを活かすべく(?)、Hadoopの擬似分散モード上で動くPythonのM/Rのプログラムを書いてみようと思います。
参考にしたブログはこちら。
- http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
- http://blog.matthewrathbone.com/2013/11/17/python-map-reduce-on-hadoop---a-beginners-tutorial.html
まず下準備です。
1. 使用するデータを用意する
こちらのデータは、1行目から順に、ID, 性別, 年齢, 身長(インチ), 体重(ポンド), 自分の体重をどう思うか?(1=太っている、2=痩せている、3=普通), 体重を変えようとしているか?(1=太ろうとしている, 2=痩せようとしている, 3 = 維持しようとしている) というデータです(元データはこちら
:ftp://ftp.cdc.gov/pub/Health_Statistics/NCHS/nhanes/nhanes3/1A/adult.dat)。
このデータを、adult_data.csvと呼びます。
'00003', 1, 21, 72, 180, 3, 1
'00004', 2, 32, 63, 135, 1, 2
'00009', 2, 48, 61, 147, 1, 2
'00010', 1, 35, 70, 205, 1, 2
'00011', 1, 48, 67, 170, 3, 3
'00019', 1, 44, 70, 187, 3, 3
'00034', 2, 42, 63, 128, 1, 2
'00040', 2, 17, 60, 100, 3, 3
'00044', 2, 24, 66, 125, 3, 2
'00045', 2, 67, 64, 147, 3, 3
'00048', 2, 56, 68, 231, 1, 2
'00049', 2, 82, 73, 97, 2, 1
'00051', 1, 44, 71, 300, 1, 2
2. 使用するデータをローカルからHDFS上にアップする
#まずは、testフォルダを作ります
$ hadoop fs -mkdir /user/<your username>/test
#そんでもって、putでデータをローカルからHDFS上にコピーします
$
hadoop fs -put adult_data.csv /user/<your username>/test/adult_data.csv
3. PythonでMapperとReducerを書きます
今回は、性別ごとの平均年齢を出してみます。
まずは、Mapperから。
どうやら、Hadoopのstreaming-jarというもので、データを一行ずつPythonのプログラムに標準入力できるようです。なので、テキストとして読み込んだ一行ずつのデータをstripとsplit(",")でパースしています。例えば、最初の行は、以下のような配列になります。
"'00003', 1, 21, 72, 180, 3, 1" ->['00003', "1", "21", "72", "180", "3", "1"]
そして、1つめの要素(性別)と2つめの要素をピックして、標準出力していきます。 Key Valueのペアは、ただのタブ区切りのテキストとして出力されるというわけです。
Mapper.py
#!/usr/bin/python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
line = line.strip()
line = line.split(",")
if len(line) >=2:
sex = line[1]
age = line[2]
print '%s\t%s' % (sex, age)
Reducer.py
#!/usr/bin/python
#Reducer.py
import sys
sex_age = {}
#Partitoner
for line in sys.stdin:
line = line.strip()
sex, age = line.split('\t')
if sex in sex_age:
sex_age[sex].append(int(age))
else:
sex_age[sex] = []
sex_age[sex].append(int(age))
#Reducer
for sex in sex_age.keys():
ave_age = sum(sex_age[sex])*1.0 / len(sex_age[sex])
print '%s\t%s'% (sex, ave_age)
3. Pythonのプログラムの権限変更
作ったプログラムに実行権限を追加します。
$ chmod +x mapper.py
$ chmod +x reducer.py
4. テストしてみる
これって、ただのテキストを吐き出して、Mapperで読み込んで、また吐き出して、Reducerでも読み込んで吐き出しているだけなので、ターミナル上でテストできます。例えば、こんな感じで。
$ cat adult_data.csv | python mapper.py | python reducer.py 1 47.597301855 2 47.2906009245
5. 最後にMap Reduce on Hadoop!
ここで、hadoop-streaming.jarというJavaのプログラムをhadoop jarで実行します(参考にしたブログに書いてあった、hadoop-streaming.jarがあるディレクトリと私のディレクトリが違ったので、cdhのバージョ等ンに寄って違うのかもしれませんので、検索して確認したほうが良いかもしれません)。
mapper.pyとreducer.pyのあるディレクトリで以下を実行すると、Map Reduceが始まります(見やすくするために、複数行で書いていますが、実際にやるときは一行で書いてください)。
mapper.pyとreducer.pyのあるディレクトリで以下を実行すると、Map Reduceが始まります(見やすくするために、複数行で書いていますが、実際にやるときは一行で書いてください)。
$hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar
-mapper mapper.py
-reducer reducer.py
-input /user/<your username>/test/adult_data.csv
-output ave_age
-file mapper.py
-file reducer.py
6. めでたく結果をゲット
$hadoop fs -cat /user/<your username>/ave_age/part-00000
1 47.597301855
2 47.2906009245