Pig Latin / UDF in Scala

| # Comments | 1 Trackback
Pigという、Hadoopをお手軽に使いましょうツールを使ってみました。
また、 Scala によるユーザー定義関数(UDF)の記述、組み込みをしてみました。

題材はいつもと同じです。

  • Pig: 0.6.0
  • Scala: 2.8.0.Beta1

を利用しています。

$ git clone https://github.com/ueshin/pig-scala-aggregate.git
$ cd pig-scala-aggregate
$ git checkout pig-scala-aggregate-0.6.1

で利用できます。

また https://github.com/ueshin/pig-scala-aggregate/tree/pig-scala-aggregate-0.6.1 でブラウズできます。

インストール

Pig Releasesページの Download a release now! リンクより辿って、 pig-0.6.0.tar.gz ファイルをダウンロード、適当なフォルダに展開してください。
以降、Pigをインストールしたフォルダを ${PIG_HOME} と記述します。

${PIG_HOME}/conf/pig-env.sh というファイルを作成、もしくは編集し、PIG_CLASSPATH環境変数にScalaのライブラリを追加します。

例)
${SCALA_HOME}はScalaのインストールフォルダを設定してある想定です。

PIG_CLASSPATH=${SCALA_HOME}/lib/scala-library.jar

その他、環境に合わせてPATHの設定などを行ってください。

Pig Latin

いわゆるPigスクリプトのことを、Pig Latinと呼ぶそうです。

まずはcontribとして提供されているライブラリを使った集計を行ってみます。

Aggregator.pig

をダウンロードし、適当なフォルダに設置したら、

$ ${PIG_HOME}/bin/pig -x local -param input=<in-dir> -param output=<out-file> Aggregator.pig

で実行できます。
1行目の "REGISTER" するjarファイルのパスは、環境に応じて修正してください。

pigファイル内に $input / $output の様に記述すると、-paramパラメータによって文字列置換をすることができます。
これにより、実行時に引数として値を渡すことができるようになっています。

にしても、HadoopのMapReduceを直接記述するより、遥かに遥かに短い行数で済んでいます。。。

UDF in Scala

ユーザー定義関数(UDF)を、Scalaで記述してみたものが、 ApacheLogLoader.scala(実際にはCombinedLogLoader.scalaおよびCommonLogLoader.scala) と LongToDateFormat.scala です。

ポイント

LoadFuncは、determineSchemaによってSchemaをUDF側で指定することができます。
複雑なデータをロードするような場合には、こちらで指定した方が、Pig Latinがスッキリするし、めんどくさくなくていいです。
ただし、Pig LatinAS節で指定があった場合には、そちらが優先されます。

CombinedLogLoader.scala 64行目〜

  override def determineSchema(fileName: String, execType: ExecType, storage: DataStorage): Schema = {
    val schema = new Schema
    schema.add(new Schema.FieldSchema("remote_host",    DataType.CHARARRAY))
    schema.add(new Schema.FieldSchema("remote_user",    DataType.CHARARRAY))
    schema.add(new Schema.FieldSchema("requested_time", DataType.LONG))
    schema.add(new Schema.FieldSchema("method",         DataType.CHARARRAY))
    schema.add(new Schema.FieldSchema("request_path",   DataType.CHARARRAY))
    schema.add(new Schema.FieldSchema("protocol",       DataType.CHARARRAY))
    schema.add(new Schema.FieldSchema("status_code",    DataType.INTEGER))
    schema.add(new Schema.FieldSchema("content_length", DataType.LONG))
    schema.add(new Schema.FieldSchema("referer",        DataType.CHARARRAY))
    schema.add(new Schema.FieldSchema("user_agent",     DataType.CHARARRAY))
    schema
  }

EvalFuncは、getArgToFuncMappingによって引数としてとるべき値の型を指定することができます。
execメソッドに渡されるTupleオブジェクトから値を取得するときに、ClassCastExceptionを抑制することができる他、別クラスに処理を委譲することもできます。

LongToDateFormat.scala 41行目〜

  override def getArgToFuncMapping() = {
    val schema = new Schema(
      Arrays.asList(
        new Schema.FieldSchema(null, DataType.LONG) ::
        new Schema.FieldSchema(null, DataType.CHARARRAY) ::
        Nil: _*
      )
    )
    Arrays.asList(new FuncSpec(this.getClass.getName, schema) :: Nil: _*)
  }

処理の委譲については、例えば org.apache.pig.builtin.MAX クラスのソースコードを参照してください。

実行

早速、実装したUDFを使ってPigを実行してみます。

$ mvn clean package
$ ${PIG_HOME}/bin/pig -x local -param jar=target/pig-scala-aggregate-0.6.1.jar -param input=<in-dir> -param output=<out-file> src/main/pig/Aggregator.pig

Embedded Pig

最後に、プログラム側からPigを実行してみます。

Aggregator.scala

PigServerオブジェクトに対して、Pig Latinをほぼそのまま文字列でregisterしていくだけです。

この辺をScalaっぽく記述できるような薄いラッパーを作ってしまえば面白いかも?

実行

実行してみます。

$ mvn clean package
$ target/appassembler/bin/aggregator <in-dir> <out-file>

トラックバック(1)

先日セットアップした擬似分散モードの動作確認を行います。テストに使うサンプルはい... 続きを読む

comments powered by Disqus

Twitter Icon

AdSense

Creative Commons License
このブログはクリエイティブ・コモンズでライセンスされています。
Powered by Movable Type 5.14-ja

Google検索

カスタム検索

2013年10月

    1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31