HBase
を使ったMapReduce
を実装してみました。題材はいつもと同じですが、Scala版のみ実装しました。
- Hadoop: 0.20.2
- HBase: 0.20.5
- Scala: 2.8.0.RC6
を 利用しています。
$ git clone https://github.com/ueshin/hbase-aggregate.git
$ cd hbase-aggregate
$ git checkout hbase-aggregate-0.20.1
で 利用できます。
また https://github.com/ueshin/hbase-aggregate/tree/hbase-aggregate-0.20.1 でブラウズできます。
概要
2つのMapReduce
プログラムを作成します。CombinedLogLoader
Aggregator
プログラムで利用する
access
テーブルの定義は次の通りです。- 行キー : リモートアドレス
- 列ファミリー : "log"
- 修飾子 : リクエストパス
- バージョン : リクエスト日時
- 値 :
CombinedLog
インスタンス
hbase> create 'access', { NAME => 'log', VERSIONS => java.lang.Integer::MAX_VALUE }
実装
準備
HBaseのライブラリは、Maven2レポジトリに登録されていないようですので、ローカルでインストールしてしまいます。必要なファイルは
hbase-0.20.5.jar
とzookeeper-3.2.2.jar
の2つです。$ mvn install:install-file -Dfile=${HBASE_HOME}/hbase-0.20.5.jar -DgroupId=org.apache.hadoop -DartifactId=hbase -Dversion=0.20.5 -Dpackaging=jar
$ mvn install:install-file -Dfile=${HBASE_HOME}/lib/zookeeper-3.2.2.jar -DgroupId=org.apache.hadoop -DartifactId=zookeeper -Dversion=3.2.2 -Dpackaging=jar
CombinedLogLoader
HDFS上にあるアクセスログをパースしつつHBaseのaccess
テーブルへロードします。CombinedLogLoader.scala
CombinedLogLoadMapper.scala
入力がファイルシステム上のテキストファイルなので、
Mapper
への入力はTextInputFormat
を使います。また、今回は
Mapper
の出力をそのままHBaseへと流せばよいので、Reducer
にはorg.apache.hadoop.hbase.mapreduce.IdentityTableReducer
を利用します。この場合、
Mapper
の出力キーをImmutableBytesWritable
クラス、出力値をPut
クラスもしくはDelete
クラスとします。設定には
TableMapReduceUtil#initTableReduceJob
メソッドを利用し、出力先テーブル名とReducer
クラスを指定します。CombinedLogParser.scala
ログのパースには、以前作成した
CombinedLogParser
を利用しました。Aggregator
HBaseにロードされたログを集計します。Aggregator.scala
AggregateMapper.scala
AggregateCombiner.scala
AggregateReducer.scala
こちらは
Mapper
の入力がHBaseのテーブルとなるので、TableMapReduceUtil#initTableMapperJob
メソッドで入力テーブル名、scan
、Mapper
クラスなどを設定します。Scan
で、Mapper
への入力対象となる行や列、バージョンを指定できます。Mapper
へは、1回の処理で1つの行がResult
インスタンスとして入ってきますので、必要なデータを取り出して処理します。Combiner
、Reducer
はよくある集計処理です。実行
HadoopとHBaseを起動しておきます。また、
access
テーブルを作成しておきます。コンパイル
Maven2でコンパイルします。$ mvn clean package
データロード
HDFS上にアクセスログを設置し、データロードを実行します。$ /usr/local/hadoop/bin/hadoop fs -ls input
Found 3 items
-rw-r--r-- 1 ueshin staff 8216533 2010-05-17 23:56 /user/ueshin/input/201001.log
-rw-r--r-- 1 ueshin staff 7994287 2010-05-17 23:56 /user/ueshin/input/201002.log
-rw-r--r-- 1 ueshin staff 8948672 2010-05-17 23:56 /user/ueshin/input/201003.log
$ sh target/appassembler/bin/loader input
ロードできました。
HBase Shell で確認してみます。
hbase> count 'access'
Current count: 1000, row: 188.92.74.80
Current count: 2000, row: 219.42.56.38
Current count: 3000, row: 65.55.215.84
3451 row(s) in 0.9290 seconds
データが入っています。
集計
引き続き集計を行ないます。$ sh target/appassembler/bin/aggregator output
$ ${HADOOP_HOME}/bin/hadoop -ls output
$ ${HADOOP_HOME}/bin/hadoop -cat output/part-r-00000
集計結果が表示されます。