Coming on Streamシリーズやってます。
その1
その2
前2回で、NoSQL会@博多編を終わりまして、今回はその後に何をしたのか、をまとめていきます。
今回からバージョンをすすめまして、
を利用します。
また https://github.com/ueshin/hbase-twitter/tree/hbase-twitter-0.0.2でブラウズできます。
その1
その2
前2回で、NoSQL会@博多編を終わりまして、今回はその後に何をしたのか、をまとめていきます。
今回からバージョンをすすめまして、
$ git clone https://github.com/ueshin/hbase-twitter.git
$ cd hbase-twitter
$ git checkout hbase-twitter-0.0.2
を利用します。
また https://github.com/ueshin/hbase-twitter/tree/hbase-twitter-0.0.2でブラウズできます。
ハッシュタグを行キーにした転置テーブル
まずはハッシュタグを行キーにした転置テーブルを作ってみました。カラムファミリーをユーザー設定の言語毎に分けることでその後の集計を言語別で行えるようにします。
あとでそれぞれのハッシュタグにスコアを付けますので、それ用のカラムファミリーも準備しました。
create.rb
create 'tagtrend',
{ NAME => 'timeline_en', VERSIONS => java.lang.Integer::MAX_VALUE }, { NAME => 'score_en' },
{ NAME => 'timeline_ja', VERSIONS => java.lang.Integer::MAX_VALUE }, { NAME => 'score_ja' },
{ NAME => 'timeline_es', VERSIONS => java.lang.Integer::MAX_VALUE }, { NAME => 'score_es' },
{ NAME => 'timeline_de', VERSIONS => java.lang.Integer::MAX_VALUE }, { NAME => 'score_de' },
{ NAME => 'timeline_fr', VERSIONS => java.lang.Integer::MAX_VALUE }, { NAME => 'score_fr' },
{ NAME => 'timeline_it', VERSIONS => java.lang.Integer::MAX_VALUE }, { NAME => 'score_it' }
TagTransposer
TagTransposer.scala前回の処理がどこまで行っているのかを表す
configuration
テーブルを作成してあります。これから取り出した時刻(27行目)から、現時刻までを処理対象とします(39行目)。
また、パーティショナーに
HRegionPartitioner
を設定しています(44行目)。TagTransposeMapper
TagTransposeMapper.scalaqualifier
は、ユーザーID(16進数16桁)、値はステータスID(16進数16桁)とします。これだと同じユーザーIDでどんどん上書きされてしまいますので、バージョン(タイムスタンプ)にツイート時刻を入れておくことで別データとして取り出せるようにしておきます(21行目)。
出力キーにハッシュタグを指定することで、
HRegionPartitioner
によってそれぞれの出力先リージョンを処理するReducerへとデータが渡されるようになっています。出力先テーブルがすでに複数のリージョンに分かれていた場合には、
HRegionPartitioner
を利用したほうがいいと思います。Reducerは
IdentityTableReducer
ですので、Mapperからの出力がそのままジョブの出力となります。実行
それでは実行してみます。$ sh target/appassembler/bin/transpose-tag
ジョブが完了したら、HBase Shellでデータを確認してみます。
hbase(main):001:0> scan 'tagtrend', { COLUMNS => [ 'timeline_ja' ] }
ROW COLUMN+CELL
#000 column=timeline_ja:00000000064c8bf3, timestamp=1283642556685, value=000000055ba92
88c
#000 column=timeline_ja:000000000772b7bf, timestamp=1280923632610, value=00000004b9f07
6d0
#000037 column=timeline_ja:00000000094a6d5a, timestamp=1283559227519, value=0000000556dca
9a0
・・・
無事に転置テーブルができたようです!(よね?)
このテーブルがあれば、あるハッシュタグをつけたユーザー、その時のツイートを特定することができます。
例えば、先の結果でいうと、ユーザーIDが
"00000000064c8bf3"
、ステータスIDが "000000055ba9288c"
ということになります。確かめてみます。
hbase(main):002:0> get 'twitter', '00000000064c8bf3'
COLUMN CELL
status:000000055ba9288c timestamp=1283642530795, value=\x8A\x01*\xDF\x0E\xFE`\x8B\x05[\xA9(\x8C2\xE3\x83\
x8F\xE3\x83\x83\xE3\x83\x8F\xE3\x83\x83\xE3\x83\x8F\xEE\x9B\xB6\xE2\x80\xA6\xE3\x
81\xAA\xE3\x82\x93\xE3\x81\xA0\xE3\x81\x93\xE3\x81\xAE\xE5\xA4\x89\xE8\xBA\xAB\xE
9\x9F\xB3 #000:<a href="http://yubitter.com/" rel="nofollow">yubitter</a>\x00\x00
・・・
HBase Shellからはバイナリに見えてしまうのでわかりにくいですが、ステータスID
"000000055ba9288c"
のところに確かに #000 というハッシュタグが見えていますね。というわけで、このテーブルを使うことで、「ハッシュタグでユーザー or ツイートを検索」機能などを実装することができるようになりました。
注意
このテーブルで、バージョンを行キー、カラムファミリー、qualifier
に続くもう一つの次元のような扱いをしていますが、このようなやり方には注意が必要です。詳しくはTogetter - 「HBaseで同一カラムに同一タイムスタンプのデータを登録した場合の挙動」を参照してください。