2014年5月29日木曜日

いよいよ最終モジュール!

いよいよ5月中旬から最終モジュールが始まりました。私の留学生活も残すところ1ヶ月です。
あっという間だったような、長かったような、不思議な感じです。しかし、多くのことを勉強しましたし、何より旅行ではなく、実際に住んで、サンフランシスコのライフスタイルに触れて、アメリカのことや、日本のことについて考えるようになりました。そのことが大きな収穫だと思っています。

さて、最終モジュールのラインナップについて書きたいと思います。

Application of Analytics
シミュレーションのクラスです。モンテカルロシミュレーションに始まり、MCMCなどをやります。わりと数学重視のクラスです。最終モジュールで疲れてきている身体には応えますが、トピックは実用的かつ重要なので、頑張ってついていきます。


Marketing Analytics
マーケティングに纏わる分析手法を勉強します。コンジョイント分析、LTVの求め方など。先生は元ニールセンの方で、統計学的にどうこうというよりは、統計分析の結果をどうビジネスクライアントに説明するかに力点が置かれています。そして、このクラスの特徴は、SASを使用すること。クラス内では、賛否両論ありますが、私個人としては、SASはRと違って有償ですし、まだまだ大手企業で使っているメジャーなソフトウェアで覚えておいて損はないと思っています。

Web Analytics
これが本モジュール一番期待している授業です。自分でウェブサイトを作って、それにGoogle Analytics等の分析ソフトを導入して、実際に分析していくという流れです。先生は、元Googleのエンジニアの方でこれらのツールを熟知しているのはもちろん、とてもナイスガイなので、多くを学びたいと思います。

Practicum
インターンですね。いよいよ最後!!


最終モジュールで、気が抜けそうですが、なかなかのラインナップだと思います。最後まで楽しんで勉強できそうです。



2014年5月1日木曜日

備忘) Hadoop Streaming PythonでJOIN

もう少し調べておりましたら、Hadoop Streaming + Pythonで2つのファイルをJOINする方法を見つけました。参考にしたのは、こちらのブログ。忘れないようにメモしておきます。

  • http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/

1. まずはお試し用データを用意

今回は、あるカスタマーデータがあるとして(customerData.csv)、その中にある県のID(Pref_id) を、県の名前テーブル(pref.csv)を参照して、実際の県の名前(pref_name)をカスタマーデータにくっつける、ということを考えています。


customerData.csv
#ID, Name, Pref_id, good or bad id
1, Taro, 1, 0
2, Jiro, 2, 1
3, Saburo, 3, 0
4, Hanako, 1, 0
5, Ken, 3, 1
6, Nick, 2, 1
pref.csv
#Pref_id, Pref_name
1, Tokyo
2, Osaka
3, Hokaido

2. Hadoop hdfsにアップ

そんでもって、この2つのファイルをcustomerというフォルダにアップします。
$hadoop fs -put pref.csv /user/<your username>/customer/customerData.csv
$hadoop fs -put pref.csv /user/<your username>/customer/pref.csv



3. MapperとReducerを用意

参考にしたブログにもあるように、1つのフォルダに格納されている2つのファイルを出力します。

問題は、ある出力があったときに、それが、どちらのファイルから来たものなのかを判別することが必要になります。ここでは、カンマで区切って配列にしたときの長さで判別しています。

また、Mapperから出力される文字列ですが、すべての要素(この例では、共通項目のPref_id、customerData.csvにあるID, Name, good or bad idの3つ + pref.csvにあるPref_nameの合計5つ)、をくっつけて出力します。

その際、該当する要素がない場合は、NAを意味するフラグを立てて出力させるのがポイントのようです。

Mapper.py
#!/usr/bin/python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    line = line.strip()
    line = line.split(",")

    id = "-1"
    name = "-1"
    pref = "-1"
    pref_name = "-1"
    good = "-1"


    #If length of the line is four, the data comes from customerData
    if len(line) ==4:
        id = line[0]
        name = line[1]
        pref = line[2]
        good = line[3]

    else:
        pref = line[0]
        pref_name = line[1]


    print '%s\t%s\t%s\t%s\t%s' % (id, name, pref, pref_name, good)

Reducerですが、ここでは、県名を入れておく辞書と、カスタマー情報を入れておく辞書を用意して、mapperから出てくる文字列を見て、各行を格納しておきます。

reducer.py
#!/usr/bin/python

import sys

pref_dict ={}
customer_dict={}
for line in sys.stdin:
    line = line.strip()
    id, name, pref, pref_name, good = line.split('\t')

    id = int(id)
    pref = int(pref)
    good = int(good)

    #if id ==-1, it means the data comes from Pref data
    if id ==-1:
        pref_dict[pref] = pref_name
    
    #Oterwise, the data comes from customer data
    else:
        customer_dict[id] = [name,pref,good]

for id in customer_dict.keys():
    pref_name = pref_dict[customer_dict[id][1]]
    name = customer_dict[id][0]
    good = customer_dict[id][2]

    print '%s\t%s\t%s\t%s'% (id, name, pref_name, good)


4. テスト

ターミナルでテストしておきます。
$cat customerData.csv pref.csv | python mapper.py | python reducer.py
1  Taro  Tokyo 0
2  Jiro  Osaka 1
3  Saburo  Hokaido 0
4  Hanako  Tokyo 0
5  Ken  Hokaido 1
6  Nick  Osaka 1


5. 権限を変更して、hadoop jarを実行

chmod でmapper.pyとreducer.pyに実行権限を与えておきます(コマンドは、chmod +x mapper.pyもしくはreducer.py)。最後に、hadoop jar streamingです。
$hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -mapper mapper.py -reducer reducer.py -input /user/<your username>/customer/*.csv -output custoer_pref -file mapper.py -file reducer.py

6. 実行結果の確認
$ hadoop fs -cat /user/ykatada/customer_pref/part-00000
1  Taro  Tokyo 0
2  Jiro  Osaka 1
3  Saburo  Hokaido 0
4  Hanako  Tokyo 0
5  Ken  Hokaido 1
6  Nick  Osaka 1



2014年4月30日水曜日

備忘) Pythonで書くMap Reduce on Hadoop


ここ最近の授業でもPythonのMulti processingのモジュールを使って、Mapper, Partitioner, Reducerを組み合わせて、いろいろ集計したり、回帰分析のプログラムを書いたりしています。 徐々にですが、Map Reduceがどういうもので、どこをパラレルに計算させて、集計とるかみたいな勘がわかってきました。


で、今日は、1ヶ月くらい前にHadoopのセットアップをやりましたが、その後ほったらかしていたので、授業で勉強したことを活かすべく(?)、Hadoopの擬似分散モード上で動くPythonのM/Rのプログラムを書いてみようと思います。

参考にしたブログはこちら。


  • http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
  • http://blog.matthewrathbone.com/2013/11/17/python-map-reduce-on-hadoop---a-beginners-tutorial.html


まず下準備です。


1. 使用するデータを用意する


こちらのデータは、1行目から順に、ID, 性別, 年齢, 身長(インチ), 体重(ポンド), 自分の体重をどう思うか?(1=太っている、2=痩せている、3=普通), 体重を変えようとしているか?(1=太ろうとしている, 2=痩せようとしている, 3 = 維持しようとしている) というデータです(元データはこちら
:ftp://ftp.cdc.gov/pub/Health_Statistics/NCHS/nhanes/nhanes3/1A/adult.dat)。

このデータを、adult_data.csvと呼びます。
'00003', 1, 21, 72, 180, 3, 1
'00004', 2, 32, 63, 135, 1, 2
'00009', 2, 48, 61, 147, 1, 2
'00010', 1, 35, 70, 205, 1, 2
'00011', 1, 48, 67, 170, 3, 3
'00019', 1, 44, 70, 187, 3, 3
'00034', 2, 42, 63, 128, 1, 2
'00040', 2, 17, 60, 100, 3, 3
'00044', 2, 24, 66, 125, 3, 2
'00045', 2, 67, 64, 147, 3, 3
'00048', 2, 56, 68, 231, 1, 2
'00049', 2, 82, 73, 97, 2, 1
'00051', 1, 44, 71, 300, 1, 2


2. 使用するデータをローカルからHDFS上にアップする

#まずは、testフォルダを作ります
$ hadoop fs -mkdir /user/<your username>/test
#そんでもって、putでデータをローカルからHDFS上にコピーします
$ hadoop fs -put adult_data.csv /user/<your username>/test/adult_data.csv

3. PythonでMapperとReducerを書きます

今回は、性別ごとの平均年齢を出してみます。
まずは、Mapperから。

どうやら、Hadoopのstreaming-jarというもので、データを一行ずつPythonのプログラムに標準入力できるようです。なので、テキストとして読み込んだ一行ずつのデータをstripとsplit(",")でパースしています。例えば、最初の行は、以下のような配列になります。

"'00003', 1, 21, 72, 180, 3, 1" ->['00003', "1", "21", "72", "180", "3", "1"]

そして、1つめの要素(性別)と2つめの要素をピックして、標準出力していきます。 Key Valueのペアは、ただのタブ区切りのテキストとして出力されるというわけです。

Mapper.py
#!/usr/bin/python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    line = line.strip()
    line = line.split(",")

    if len(line) >=2:
        sex = line[1]
        age = line[2]

        print '%s\t%s' % (sex, age)

続いて、Reducerです。 Reducer.pyには、PartitionerとReducerがセットになっています。 Mapperから吐き出されるKey Valueペアのテキストをパースして、辞書にValueを格納、最後に集計というのが大雑把な流れです。

Reducer.py
#!/usr/bin/python
#Reducer.py
import sys

sex_age = {}

#Partitoner
for line in sys.stdin:
    line = line.strip()
    sex, age = line.split('\t')

    if sex in sex_age:
        sex_age[sex].append(int(age))
    else:
        sex_age[sex] = []
        sex_age[sex].append(int(age))

#Reducer
for sex in sex_age.keys():
    ave_age = sum(sex_age[sex])*1.0 / len(sex_age[sex])
    print '%s\t%s'% (sex, ave_age)

3. Pythonのプログラムの権限変更

作ったプログラムに実行権限を追加します。
$ chmod +x mapper.py
$ chmod +x reducer.py

4. テストしてみる

これって、ただのテキストを吐き出して、Mapperで読み込んで、また吐き出して、Reducerでも読み込んで吐き出しているだけなので、ターミナル上でテストできます。例えば、こんな感じで。
$ cat adult_data.csv | python mapper.py | python reducer.py
1 47.597301855
2 47.2906009245


5. 最後にMap Reduce on Hadoop!

ここで、hadoop-streaming.jarというJavaのプログラムをhadoop jarで実行します(参考にしたブログに書いてあった、hadoop-streaming.jarがあるディレクトリと私のディレクトリが違ったので、cdhのバージョ等ンに寄って違うのかもしれませんので、検索して確認したほうが良いかもしれません)。

mapper.pyとreducer.pyのあるディレクトリで以下を実行すると、Map Reduceが始まります(見やすくするために、複数行で書いていますが、実際にやるときは一行で書いてください)。
$hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar 
 -mapper mapper.py 
 -reducer reducer.py 
 -input /user/<your username>/test/adult_data.csv 
 -output ave_age 
 -file mapper.py 
 -file reducer.py


6. めでたく結果をゲット

(よくわからないんですが)outputで指定したフォルダに、part-00000というテキストファイルが作られていて、そこに結果が入っています。
$hadoop fs -cat /user/<your username>/ave_age/part-00000 
1 47.597301855
2 47.2906009245