昨日の続き。
NoSQL会@博多でお見せしたかったMapReduceによる集計が3つほどありました。
昨日からデータが溜まってきていることでしょうから、ぜひ集計を実行してみてください。
今回利用するプログラムも、
で利用できます。
また https://github.com/ueshin/hbase-twitter/tree/hbase-twitter-0.0.1でブラウズできます。
NoSQL会@博多でお見せしたかったMapReduceによる集計が3つほどありました。
- 言語(
user:lang)で集計 - ソース(
status#source)で集計 - ハッシュタグ(
status#textから抽出)で集計
昨日からデータが溜まってきていることでしょうから、ぜひ集計を実行してみてください。
今回利用するプログラムも、
$ git clone https://github.com/ueshin/hbase-twitter.git
$ cd hbase-twitter
$ git checkout hbase-twitter-0.0.1で利用できます。
また https://github.com/ueshin/hbase-twitter/tree/hbase-twitter-0.0.1でブラウズできます。
言語(user:lang)で集計
まずはシンプルな方から。格納されているユーザーデータの内、言語(
user:lang)で集計します。Twitterでは、6つの言語を設定できますが、充分なデータが収集できていればそれぞれの言語のユーザーがどのように分布しているのかを見ることができます。
ソースコードは LangCounter.scala です。
Scanで絞り込み
TableMapReduceUtil#initTableMapperJob (41行目) で、MapRecuceの対象となるテーブル(第1引数)と、データ範囲(第2引数のScanオブジェクト)を指定します。これにより、MapReduceで処理対象でないカラムファミリーをスキャンしなくなり、また、同じカラムファミリーでも必要でないカラムがMapperに渡されなくなります。
LangCountMapper
Mapperには、行キー -> 行データ の形で指定したカラムデータが渡されますので、これを処理します。今回は、
user:langカラムのデータをキー、1Lを値として出力します (30行目)。この辺のMapper出力〜Reducerの流れは、よく見るワードカウントのサンプルと同じです。
CountReducer
Reducerでは、渡ってきたキーに対する値を集計していきます。ソースコードは CountReducer.scala です。
このReducerはあとの2つの集計でも利用します。
なお、同様の処理をする
org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer がHadoopに標準で入っていますので、普通はこちらを使うとよいと思います。実行
それでは実行してみます。$ mvn clean package
$ sh target/appassembler/bin/count-lang <output-dir>
$ cat <output-dir>/part-r-00000
de 25129
en 2835610
es 303250
fr 21047
it 11486
ja 664244無事に集計できました!
日本語設定のユーザー数は、英語に続いて2位のようです!
ソース(status#source)で集計
次に、ソース(status#source)で集計します。ここでいうソースというのは、ツイッタークライアントのことです。
よく利用されているツイッタークライアントは何か? を調べるための集計ですね。
今度は、各ツイートに対する属性になるので、
Statusオブジェクトのリストを取り出す必要があります。ソースコードは SourceCounter.scala です。
Scan設定
今回は、statusカラムファミリーに含まれるカラムを全て取得します (44行目)。statusカラムファミリーに対するqualifierが、statusId(の16進16桁表記)を表していましたので、statusカラムファミリー全体がそのユーザーのツイート群になります。SourceCountMapper
値として渡されたResultオブジェクトからStatusオブジェクトを復元して (31行目)、status.sourceをキーとして出力しています (32行目)。Statusオブジェクトの復元はScalaのパターンマッチを利用しています (StatusWritable#unapply)。Reducerは先程と同じものを利用します。
実行
それでは実行してみます。$ sh target/appassembler/bin/count-source <output-dir>ちゃんと集計できましたか?
ハッシュタグ(status#textから抽出)で集計
最後にハッシュタグの集計を行ないます。ソースでの集計と同じようにStatusオブジェクトを利用しますが、さらにツイート本文からハッシュタグを抽出する処理が入ってきます。
ソースコードは TagCounter.scala です。
TagCountMapper
Scan設定は先程と同様です (48行目)。Statusオブジェクトの復元も先程と同様ですが、今度は status.user.lang == "ja" という条件を付けて、日本語設定したユーザーのみを集計しています (33行目)。ハッシュタグの抽出は、正規表現を使いました (28行目)。
抽出されたハッシュタグをキーとして出力しています (35行目)。
実行
それでは実行してみます。$ sh target/appassembler/bin/count-tag <output-dir>やってみていただけると分かりますが、見知らぬタグや奇妙なタグが出てきて楽しいです。
おまけ
Scan設定について
MapReduceの集計対象は、Scanの設定によって変わります。Scanでは、行キーの範囲を指定したり、バージョン(タイムスタンプ)の範囲を指定したりすることができます。なので、例えば直近1時間のツイートのみを集計対象とする、といったことがMapper/Reducerを修正することなく、設定パラメータを送り込む方法を考えることなく、容易に実現できてしまいます。
設定ファイルについて
Hadoopを擬似分散モード、完全分散モードで実行出来るように、設定ファイルの雛形を準備してあります。擬似分散モードは
HDFS NameNode が 9000番ポート、 JobTracker が 9001番ポートで動作している想定です。必要に応じで
src/pseudo/resources 以下の設定ファイルを修正してください。コンパイルは以下のとおり。
$ mvn -P pseudo clean package完全分散モードは
HDFS NameNode が node0サーバーの 9000番ポート、 JobTracker が node0サーバーの 9001番ポート、 ZooKeeper Quorum が node0サーバーで動作している想定です。こちらも必要に応じて
src/production/resources 以下の設定ファイルを修正してください。コンパイルは以下のとおり。
$ mvn -P production clean package
