HBase MapReduce in Scala

| # Comments
HBaseを使ったMapReduceを実装してみました。

題材はいつもと同じですが、Scala版のみ実装しました。

  • Hadoop: 0.20.2
  • HBase: 0.20.5
  • Scala: 2.8.0.RC6

を 利用しています。

$ git clone https://github.com/ueshin/hbase-aggregate.git
$ cd hbase-aggregate
$ git checkout hbase-aggregate-0.20.1

で 利用できます。

また https://github.com/ueshin/hbase-aggregate/tree/hbase-aggregate-0.20.1 でブラウズできます。

概要

2つのMapReduceプログラムを作成します。

  1. CombinedLogLoader
  2. Aggregator

プログラムで利用するaccessテーブルの定義は次の通りです。

  • 行キー : リモートアドレス
  • 列ファミリー : "log"
  • 修飾子 : リクエストパス
  • バージョン : リクエスト日時
  • 値 : CombinedLogインスタンス

hbase> create 'access', { NAME => 'log', VERSIONS => java.lang.Integer::MAX_VALUE }

実装

準備

HBaseのライブラリは、Maven2レポジトリに登録されていないようですので、ローカルでインストールしてしまいます。
必要なファイルはhbase-0.20.5.jarzookeeper-3.2.2.jarの2つです。

$ mvn install:install-file -Dfile=${HBASE_HOME}/hbase-0.20.5.jar -DgroupId=org.apache.hadoop -DartifactId=hbase -Dversion=0.20.5 -Dpackaging=jar
$ mvn install:install-file -Dfile=${HBASE_HOME}/lib/zookeeper-3.2.2.jar -DgroupId=org.apache.hadoop -DartifactId=zookeeper -Dversion=3.2.2 -Dpackaging=jar

CombinedLogLoader

HDFS上にあるアクセスログをパースしつつHBaseのaccessテーブルへロードします。

CombinedLogLoader.scala
CombinedLogLoadMapper.scala

入力がファイルシステム上のテキストファイルなので、Mapperへの入力はTextInputFormatを使います。

また、今回はMapperの出力をそのままHBaseへと流せばよいので、Reducerにはorg.apache.hadoop.hbase.mapreduce.IdentityTableReducerを利用します。
この場合、Mapperの出力キーをImmutableBytesWritableクラス、出力値をPutクラスもしくはDeleteクラスとします。

設定には TableMapReduceUtil#initTableReduceJobメソッドを利用し、出力先テーブル名とReducerクラスを指定します。

CombinedLogParser.scala

ログのパースには、以前作成したCombinedLogParserを利用しました。

Aggregator

HBaseにロードされたログを集計します。

Aggregator.scala
AggregateMapper.scala
AggregateCombiner.scala
AggregateReducer.scala

こちらはMapperの入力がHBaseのテーブルとなるので、TableMapReduceUtil#initTableMapperJobメソッドで入力テーブル名、scanMapperクラスなどを設定します。

Scanで、Mapperへの入力対象となる行や列、バージョンを指定できます。

Mapperへは、1回の処理で1つの行がResultインスタンスとして入ってきますので、必要なデータを取り出して処理します。

CombinerReducerはよくある集計処理です。

実行

HadoopとHBaseを起動しておきます。
また、accessテーブルを作成しておきます。

コンパイル

Maven2でコンパイルします。

$ mvn clean package

データロード

HDFS上にアクセスログを設置し、データロードを実行します。

$ /usr/local/hadoop/bin/hadoop fs -ls input
Found 3 items
-rw-r--r--   1 ueshin staff    8216533 2010-05-17 23:56 /user/ueshin/input/201001.log
-rw-r--r--   1 ueshin staff    7994287 2010-05-17 23:56 /user/ueshin/input/201002.log
-rw-r--r--   1 ueshin staff    8948672 2010-05-17 23:56 /user/ueshin/input/201003.log

$ sh target/appassembler/bin/loader input

ロードできました。
HBase Shell で確認してみます。

hbase> count 'access'
Current count: 1000, row: 188.92.74.80 Current count: 2000, row: 219.42.56.38 Current count: 3000, row: 65.55.215.84 3451 row(s) in 0.9290 seconds

データが入っています。

集計

引き続き集計を行ないます。

$ sh target/appassembler/bin/aggregator output
$ ${HADOOP_HOME}/bin/hadoop -ls output
$ ${HADOOP_HOME}/bin/hadoop -cat output/part-r-00000

集計結果が表示されます。

comments powered by Disqus

Twitter Icon

AdSense

Creative Commons License
このブログはクリエイティブ・コモンズでライセンスされています。
Powered by Movable Type 5.14-ja

Google検索

カスタム検索

2013年10月

    1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31