Coming on Stream その2

| # Comments | 3 Trackbacks
昨日の続き。

NoSQL会@博多でお見せしたかったMapReduceによる集計が3つほどありました。

  • 言語(user:lang)で集計
  • ソース(status#source)で集計
  • ハッシュタグ(status#textから抽出)で集計

昨日からデータが溜まってきていることでしょうから、ぜひ集計を実行してみてください。

今回利用するプログラムも、

$ git clone https://github.com/ueshin/hbase-twitter.git
$ cd hbase-twitter
$ git checkout hbase-twitter-0.0.1

で利用できます。

また https://github.com/ueshin/hbase-twitter/tree/hbase-twitter-0.0.1でブラウズできます。

言語(user:lang)で集計

まずはシンプルな方から。

格納されているユーザーデータの内、言語(user:lang)で集計します。
Twitterでは、6つの言語を設定できますが、充分なデータが収集できていればそれぞれの言語のユーザーがどのように分布しているのかを見ることができます。

ソースコードは LangCounter.scala です。

Scanで絞り込み

TableMapReduceUtil#initTableMapperJob (41行目) で、MapRecuceの対象となるテーブル(第1引数)と、データ範囲(第2引数のScanオブジェクト)を指定します。

これにより、MapReduceで処理対象でないカラムファミリーをスキャンしなくなり、また、同じカラムファミリーでも必要でないカラムがMapperに渡されなくなります。

LangCountMapper

Mapperには、行キー -> 行データ の形で指定したカラムデータが渡されますので、これを処理します。

今回は、user:langカラムのデータをキー、1Lを値として出力します (30行目)。
この辺のMapper出力〜Reducerの流れは、よく見るワードカウントのサンプルと同じです。

CountReducer

Reducerでは、渡ってきたキーに対する値を集計していきます。

ソースコードは CountReducer.scala です。
このReducerはあとの2つの集計でも利用します。

なお、同様の処理をする org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer がHadoopに標準で入っていますので、普通はこちらを使うとよいと思います。

実行

それでは実行してみます。

$ mvn clean package
$ sh target/appassembler/bin/count-lang <output-dir>
$ cat <output-dir>/part-r-00000
de      25129
en      2835610
es      303250
fr      21047
it      11486
ja      664244

無事に集計できました!
日本語設定のユーザー数は、英語に続いて2位のようです!

ソース(status#source)で集計

次に、ソース(status#source)で集計します。
ここでいうソースというのは、ツイッタークライアントのことです。
よく利用されているツイッタークライアントは何か? を調べるための集計ですね。

今度は、各ツイートに対する属性になるので、Statusオブジェクトのリストを取り出す必要があります。

ソースコードは SourceCounter.scala です。

Scan設定

今回は、statusカラムファミリーに含まれるカラムを全て取得します (44行目)。
statusカラムファミリーに対するqualifierが、statusId(の16進16桁表記)を表していましたので、statusカラムファミリー全体がそのユーザーのツイート群になります。

SourceCountMapper

値として渡されたResultオブジェクトからStatusオブジェクトを復元して (31行目)、status.sourceをキーとして出力しています (32行目)。

Statusオブジェクトの復元はScalaのパターンマッチを利用しています (StatusWritable#unapply)。

Reducerは先程と同じものを利用します。

実行

それでは実行してみます。

$ sh target/appassembler/bin/count-source <output-dir>

ちゃんと集計できましたか?

ハッシュタグ(status#textから抽出)で集計

最後にハッシュタグの集計を行ないます。

ソースでの集計と同じようにStatusオブジェクトを利用しますが、さらにツイート本文からハッシュタグを抽出する処理が入ってきます。

ソースコードは TagCounter.scala です。

TagCountMapper

Scan設定は先程と同様です (48行目)。

Statusオブジェクトの復元も先程と同様ですが、今度は status.user.lang == "ja" という条件を付けて、日本語設定したユーザーのみを集計しています (33行目)。

ハッシュタグの抽出は、正規表現を使いました (28行目)。
抽出されたハッシュタグをキーとして出力しています (35行目)。

実行

それでは実行してみます。

$ sh target/appassembler/bin/count-tag <output-dir>

やってみていただけると分かりますが、見知らぬタグや奇妙なタグが出てきて楽しいです。

おまけ

Scan設定について

MapReduceの集計対象は、Scanの設定によって変わります。
Scanでは、行キーの範囲を指定したり、バージョン(タイムスタンプ)の範囲を指定したりすることができます。
なので、例えば直近1時間のツイートのみを集計対象とする、といったことがMapper/Reducerを修正することなく、設定パラメータを送り込む方法を考えることなく、容易に実現できてしまいます。

設定ファイルについて

Hadoopを擬似分散モード、完全分散モードで実行出来るように、設定ファイルの雛形を準備してあります。

擬似分散モードは HDFS NameNode が 9000番ポート、 JobTracker が 9001番ポートで動作している想定です。
必要に応じで src/pseudo/resources 以下の設定ファイルを修正してください。

コンパイルは以下のとおり。

$ mvn -P pseudo clean package


完全分散モードは HDFS NameNodenode0サーバーの 9000番ポート、 JobTrackernode0サーバーの 9001番ポート、 ZooKeeper Quorumnode0サーバーで動作している想定です。
こちらも必要に応じて src/production/resources 以下の設定ファイルを修正してください。

コンパイルは以下のとおり。

$ mvn -P production clean package

トラックバック(3)

Coming on Streamシリーズやってます。その1その2前2回で、NoS... 続きを読む

Coming on Streamシリーズやってます。その1その2その3作ったもの... 続きを読む

Coming on Streamシリーズ ファーストシーズン最終回。その1その2... 続きを読む

comments powered by Disqus

Twitter Icon

AdSense

Creative Commons License
このブログはクリエイティブ・コモンズでライセンスされています。
Powered by Movable Type 5.14-ja

Google検索

カスタム検索

2013年10月

    1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31