Pigという、Hadoopをお手軽に使いましょうツールを使ってみました。
また、 Scala によるユーザー定義関数(UDF)の記述、組み込みをしてみました。
題材はいつもと同じです。
を利用しています。
で利用できます。
また https://github.com/ueshin/pig-scala-aggregate/tree/pig-scala-aggregate-0.6.1 でブラウズできます。
また、 Scala によるユーザー定義関数(UDF)の記述、組み込みをしてみました。
題材はいつもと同じです。
- Pig: 0.6.0
- Scala: 2.8.0.Beta1
を利用しています。
$ git clone https://github.com/ueshin/pig-scala-aggregate.git
$ cd pig-scala-aggregate
$ git checkout pig-scala-aggregate-0.6.1
で利用できます。
また https://github.com/ueshin/pig-scala-aggregate/tree/pig-scala-aggregate-0.6.1 でブラウズできます。
インストール
Pig Releasesページの Download a release now! リンクより辿って、 pig-0.6.0.tar.gz ファイルをダウンロード、適当なフォルダに展開してください。以降、Pigをインストールしたフォルダを
${PIG_HOME}
と記述します。${PIG_HOME}/conf/pig-env.sh
というファイルを作成、もしくは編集し、PIG_CLASSPATH
環境変数にScalaのライブラリを追加します。例)
${SCALA_HOME}
はScalaのインストールフォルダを設定してある想定です。PIG_CLASSPATH=${SCALA_HOME}/lib/scala-library.jar
その他、環境に合わせて
PATH
の設定などを行ってください。Pig Latin
いわゆるPigスクリプトのことを、Pig Latin
と呼ぶそうです。まずは
contrib
として提供されているライブラリを使った集計を行ってみます。Aggregator.pig
をダウンロードし、適当なフォルダに設置したら、
$ ${PIG_HOME}/bin/pig -x local -param input=<in-dir> -param output=<out-file> Aggregator.pig
で実行できます。
1行目の "
REGISTER
" するjarファイルのパスは、環境に応じて修正してください。pigファイル内に
$input
/ $output
の様に記述すると、-param
パラメータによって文字列置換をすることができます。これにより、実行時に引数として値を渡すことができるようになっています。
にしても、Hadoopの
MapReduce
を直接記述するより、遥かに遥かに短い行数で済んでいます。。。UDF in Scala
ユーザー定義関数(UDF)を、Scalaで記述してみたものが、 ApacheLogLoader.scala(実際にはCombinedLogLoader.scalaおよびCommonLogLoader.scala) と LongToDateFormat.scala です。ポイント
LoadFunc
は、determineSchema
によってSchema
をUDF側で指定することができます。複雑なデータをロードするような場合には、こちらで指定した方が、
Pig Latin
がスッキリするし、めんどくさくなくていいです。ただし、
Pig Latin
のAS
節で指定があった場合には、そちらが優先されます。CombinedLogLoader.scala 64行目〜
override def determineSchema(fileName: String, execType: ExecType, storage: DataStorage): Schema = {
val schema = new Schema
schema.add(new Schema.FieldSchema("remote_host", DataType.CHARARRAY))
schema.add(new Schema.FieldSchema("remote_user", DataType.CHARARRAY))
schema.add(new Schema.FieldSchema("requested_time", DataType.LONG))
schema.add(new Schema.FieldSchema("method", DataType.CHARARRAY))
schema.add(new Schema.FieldSchema("request_path", DataType.CHARARRAY))
schema.add(new Schema.FieldSchema("protocol", DataType.CHARARRAY))
schema.add(new Schema.FieldSchema("status_code", DataType.INTEGER))
schema.add(new Schema.FieldSchema("content_length", DataType.LONG))
schema.add(new Schema.FieldSchema("referer", DataType.CHARARRAY))
schema.add(new Schema.FieldSchema("user_agent", DataType.CHARARRAY))
schema
}
EvalFunc
は、getArgToFuncMapping
によって引数としてとるべき値の型を指定することができます。exec
メソッドに渡されるTuple
オブジェクトから値を取得するときに、ClassCastException
を抑制することができる他、別クラスに処理を委譲することもできます。LongToDateFormat.scala 41行目〜
override def getArgToFuncMapping() = {
val schema = new Schema(
Arrays.asList(
new Schema.FieldSchema(null, DataType.LONG) ::
new Schema.FieldSchema(null, DataType.CHARARRAY) ::
Nil: _*
)
)
Arrays.asList(new FuncSpec(this.getClass.getName, schema) :: Nil: _*)
}
処理の委譲については、例えば
org.apache.pig.builtin.MAX
クラスのソースコードを参照してください。実行
早速、実装したUDFを使ってPigを実行してみます。$ mvn clean package
$ ${PIG_HOME}/bin/pig -x local -param jar=target/pig-scala-aggregate-0.6.1.jar -param input=<in-dir> -param output=<out-file> src/main/pig/Aggregator.pig
Embedded Pig
最後に、プログラム側からPigを実行してみます。Aggregator.scala
PigServer
オブジェクトに対して、Pig Latin
をほぼそのまま文字列でregister
していくだけです。この辺をScalaっぽく記述できるような薄いラッパーを作ってしまえば面白いかも?
実行
実行してみます。$ mvn clean package
$ target/appassembler/bin/aggregator <in-dir> <out-file>