TwitterのStreaming APIを使ってツイートを収集し、様々な解析をする(したい)ためのプログラムを開発しています。
元々は7月にNoSQL会@博多でHBaseについて発表した際のサンプルだったのですが、そのまま引き続き開発を続けています。
最近になってプロジェクト名を「Coming on Stream」に決めました。
下記で参照できるプログラムの動作環境は
を利用していますが、HBaseは0.20.6でも動作すると思います。
で利用できます。
また https://github.com/ueshin/hbase-twitter/tree/hbase-twitter-0.0.1でブラウズできます。
元々は7月にNoSQL会@博多でHBaseについて発表した際のサンプルだったのですが、そのまま引き続き開発を続けています。
最近になってプロジェクト名を「Coming on Stream」に決めました。
下記で参照できるプログラムの動作環境は
- Hadoop: 0.20.2
- HBase: 0.20.5
- Scala: 2.8.0
を利用していますが、HBaseは0.20.6でも動作すると思います。
$ git clone https://github.com/ueshin/hbase-twitter.git
$ cd hbase-twitter
$ git checkout hbase-twitter-0.0.1
で利用できます。
また https://github.com/ueshin/hbase-twitter/tree/hbase-twitter-0.0.1でブラウズできます。
ツイートを収集する
まずはStreaming APIを使ってHBaseにツイートを蓄えていきます。twitter
テーブル
格納用のHBaseテーブルイメージを 発表資料スライド p.14 に掲載しています。要するに、 縦方向行キーにユーザーID、カラムファミリー
user
にそれぞれユーザー情報を格納、カラムファミリー status
にステータスを格納していきます。格納時点のバージョン(タイムスタンプ)が、(ほぼ)ユーザーがツイートした時刻、格納する値は
Status
オブジェクト(のバイナリ表現)になります。あるユーザーのツイート(status)が横方向に伸びていくイメージです。
テーブルの
create
文は次のようになります。create.rb
create 'twitter', { NAME => 'user' }, { NAME => 'status' }
Streaming API
Twitterからの取り込みは、Streaming APIを使います。主な部分は簡易的に以前作ったことがあるので、それをベースに、
Handler
を実装します。HTableHandler.scala
val table = new HTable("twitter")
・・・
val put = new Put(user.key)
・・・
put.add("status", status.key, new StatusWritable(status))
table.put(put)
テーブルを取得して、
Put
オブジェクトにデータをadd
して、テーブルにput
すると。実行
Maven2でコンパイルします。$ mvn clean package
それでは実行してみます。
HBaseは起動してあるものとします。
$ sh target/appassembler/bin/streaming <user-id> <password>
user-id
: TwitterアカウントのユーザーIDpassword
: パスワード
正常に動作していれば、
twitter
テーブルにデータが格納されていくはずです。HBaseのShellから確認してみます。
hbase(main):001:0> list
twitter
1 row(s) in 0.1290 seconds
hbase(main):002:0> count 'twitter'
Current count: 1000, row: 0000000000005e31
Current count: 2000, row: 00000000000957f7
・・・
おぉ、入っています!(よね?)
サンプリングモードでは、1日でだいたい50万ツイートほどのデータが取得できるようです。
ちなみにストリーミングが切断されてもリコネクトなどの処理をまだ行っていませんのであしからず。