2014年5月1日木曜日

備忘) Hadoop Streaming PythonでJOIN

もう少し調べておりましたら、Hadoop Streaming + Pythonで2つのファイルをJOINする方法を見つけました。参考にしたのは、こちらのブログ。忘れないようにメモしておきます。

  • http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

1. まずはお試し用データを用意

今回は、あるカスタマーデータがあるとして(customerData.csv)、その中にある県のID(Pref_id) を、県の名前テーブル(pref.csv)を参照して、実際の県の名前(pref_name)をカスタマーデータにくっつける、ということを考えています。


customerData.csv
#ID, Name, Pref_id, good or bad id
1, Taro, 1, 0
2, Jiro, 2, 1
3, Saburo, 3, 0
4, Hanako, 1, 0
5, Ken, 3, 1
6, Nick, 2, 1
pref.csv
#Pref_id, Pref_name
1, Tokyo
2, Osaka
3, Hokaido

2. Hadoop hdfsにアップ

そんでもって、この2つのファイルをcustomerというフォルダにアップします。
$hadoop fs -put pref.csv /user/<your username>/customer/customerData.csv
$hadoop fs -put pref.csv /user/<your username>/customer/pref.csv



3. MapperとReducerを用意

参考にしたブログにもあるように、1つのフォルダに格納されている2つのファイルを出力します。

問題は、ある出力があったときに、それが、どちらのファイルから来たものなのかを判別することが必要になります。ここでは、カンマで区切って配列にしたときの長さで判別しています。

また、Mapperから出力される文字列ですが、すべての要素(この例では、共通項目のPref_id、customerData.csvにあるID, Name, good or bad idの3つ + pref.csvにあるPref_nameの合計5つ)、をくっつけて出力します。

その際、該当する要素がない場合は、NAを意味するフラグを立てて出力させるのがポイントのようです。

Mapper.py
#!/usr/bin/python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    line = line.strip()
    line = line.split(",")

    id = "-1"
    name = "-1"
    pref = "-1"
    pref_name = "-1"
    good = "-1"


    #If length of the line is four, the data comes from customerData
    if len(line) ==4:
        id = line[0]
        name = line[1]
        pref = line[2]
        good = line[3]

    else:
        pref = line[0]
        pref_name = line[1]


    print '%s\t%s\t%s\t%s\t%s' % (id, name, pref, pref_name, good)

Reducerですが、ここでは、県名を入れておく辞書と、カスタマー情報を入れておく辞書を用意して、mapperから出てくる文字列を見て、各行を格納しておきます。

reducer.py
#!/usr/bin/python

import sys

pref_dict ={}
customer_dict={}
for line in sys.stdin:
    line = line.strip()
    id, name, pref, pref_name, good = line.split('\t')

    id = int(id)
    pref = int(pref)
    good = int(good)

    #if id ==-1, it means the data comes from Pref data
    if id ==-1:
        pref_dict[pref] = pref_name
    
    #Oterwise, the data comes from customer data
    else:
        customer_dict[id] = [name,pref,good]

for id in customer_dict.keys():
    pref_name = pref_dict[customer_dict[id][1]]
    name = customer_dict[id][0]
    good = customer_dict[id][2]

    print '%s\t%s\t%s\t%s'% (id, name, pref_name, good)


4. テスト

ターミナルでテストしておきます。
$cat customerData.csv pref.csv | python mapper.py | python reducer.py
1  Taro  Tokyo 0
2  Jiro  Osaka 1
3  Saburo  Hokaido 0
4  Hanako  Tokyo 0
5  Ken  Hokaido 1
6  Nick  Osaka 1


5. 権限を変更して、hadoop jarを実行

chmod でmapper.pyとreducer.pyに実行権限を与えておきます(コマンドは、chmod +x mapper.pyもしくはreducer.py)。最後に、hadoop jar streamingです。
$hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -mapper mapper.py -reducer reducer.py -input /user/<your username>/customer/*.csv -output custoer_pref -file mapper.py -file reducer.py

6. 実行結果の確認
$ hadoop fs -cat /user/ykatada/customer_pref/part-00000
1  Taro  Tokyo 0
2  Jiro  Osaka 1
3  Saburo  Hokaido 0
4  Hanako  Tokyo 0
5  Ken  Hokaido 1
6  Nick  Osaka 1



0 件のコメント:

コメントを投稿