昨日の続き。
NoSQL会@博多でお見せしたかったMapReduceによる集計が3つほどありました。
昨日からデータが溜まってきていることでしょうから、ぜひ集計を実行してみてください。
今回利用するプログラムも、
で利用できます。
また https://github.com/ueshin/hbase-twitter/tree/hbase-twitter-0.0.1でブラウズできます。
NoSQL会@博多でお見せしたかったMapReduceによる集計が3つほどありました。
- 言語(
user:lang
)で集計 - ソース(
status#source
)で集計 - ハッシュタグ(
status#text
から抽出)で集計
昨日からデータが溜まってきていることでしょうから、ぜひ集計を実行してみてください。
今回利用するプログラムも、
$ git clone https://github.com/ueshin/hbase-twitter.git
$ cd hbase-twitter
$ git checkout hbase-twitter-0.0.1
で利用できます。
また https://github.com/ueshin/hbase-twitter/tree/hbase-twitter-0.0.1でブラウズできます。
言語(user:lang
)で集計
まずはシンプルな方から。格納されているユーザーデータの内、言語(
user:lang
)で集計します。Twitterでは、6つの言語を設定できますが、充分なデータが収集できていればそれぞれの言語のユーザーがどのように分布しているのかを見ることができます。
ソースコードは LangCounter.scala です。
Scan
で絞り込み
TableMapReduceUtil#initTableMapperJob
(41行目) で、MapRecuceの対象となるテーブル(第1引数)と、データ範囲(第2引数のScan
オブジェクト)を指定します。これにより、MapReduceで処理対象でないカラムファミリーをスキャンしなくなり、また、同じカラムファミリーでも必要でないカラムがMapperに渡されなくなります。
LangCountMapper
Mapperには、行キー -> 行データ の形で指定したカラムデータが渡されますので、これを処理します。今回は、
user:lang
カラムのデータをキー、1L
を値として出力します (30行目)。この辺のMapper出力〜Reducerの流れは、よく見るワードカウントのサンプルと同じです。
CountReducer
Reducerでは、渡ってきたキーに対する値を集計していきます。ソースコードは CountReducer.scala です。
このReducerはあとの2つの集計でも利用します。
なお、同様の処理をする
org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer
がHadoopに標準で入っていますので、普通はこちらを使うとよいと思います。実行
それでは実行してみます。$ mvn clean package
$ sh target/appassembler/bin/count-lang <output-dir>
$ cat <output-dir>/part-r-00000
de 25129
en 2835610
es 303250
fr 21047
it 11486
ja 664244
無事に集計できました!
日本語設定のユーザー数は、英語に続いて2位のようです!
ソース(status#source
)で集計
次に、ソース(status#source
)で集計します。ここでいうソースというのは、ツイッタークライアントのことです。
よく利用されているツイッタークライアントは何か? を調べるための集計ですね。
今度は、各ツイートに対する属性になるので、
Status
オブジェクトのリストを取り出す必要があります。ソースコードは SourceCounter.scala です。
Scan
設定
今回は、status
カラムファミリーに含まれるカラムを全て取得します (44行目)。status
カラムファミリーに対するqualifier
が、statusId
(の16進16桁表記)を表していましたので、status
カラムファミリー全体がそのユーザーのツイート群になります。SourceCountMapper
値として渡されたResult
オブジェクトからStatus
オブジェクトを復元して (31行目)、status.source
をキーとして出力しています (32行目)。Status
オブジェクトの復元はScalaのパターンマッチを利用しています (StatusWritable#unapply)。Reducerは先程と同じものを利用します。
実行
それでは実行してみます。$ sh target/appassembler/bin/count-source <output-dir>
ちゃんと集計できましたか?
ハッシュタグ(status#text
から抽出)で集計
最後にハッシュタグの集計を行ないます。ソースでの集計と同じようにStatusオブジェクトを利用しますが、さらにツイート本文からハッシュタグを抽出する処理が入ってきます。
ソースコードは TagCounter.scala です。
TagCountMapper
Scan
設定は先程と同様です (48行目)。Status
オブジェクトの復元も先程と同様ですが、今度は status.user.lang == "ja"
という条件を付けて、日本語設定したユーザーのみを集計しています (33行目)。ハッシュタグの抽出は、正規表現を使いました (28行目)。
抽出されたハッシュタグをキーとして出力しています (35行目)。
実行
それでは実行してみます。$ sh target/appassembler/bin/count-tag <output-dir>
やってみていただけると分かりますが、見知らぬタグや奇妙なタグが出てきて楽しいです。
おまけ
Scan設定について
MapReduceの集計対象は、Scan
の設定によって変わります。Scan
では、行キーの範囲を指定したり、バージョン(タイムスタンプ)の範囲を指定したりすることができます。なので、例えば直近1時間のツイートのみを集計対象とする、といったことがMapper/Reducerを修正することなく、設定パラメータを送り込む方法を考えることなく、容易に実現できてしまいます。
設定ファイルについて
Hadoopを擬似分散モード、完全分散モードで実行出来るように、設定ファイルの雛形を準備してあります。擬似分散モードは
HDFS NameNode
が 9000番ポート、 JobTracker
が 9001番ポートで動作している想定です。必要に応じで
src/pseudo/resources
以下の設定ファイルを修正してください。コンパイルは以下のとおり。
$ mvn -P pseudo clean package
完全分散モードは
HDFS NameNode
が node0
サーバーの 9000番ポート、 JobTracker
が node0
サーバーの 9001番ポート、 ZooKeeper Quorum
が node0
サーバーで動作している想定です。こちらも必要に応じて
src/production/resources
以下の設定ファイルを修正してください。コンパイルは以下のとおり。
$ mvn -P production clean package