無印吉澤

Site Reliability Engineering(SRE)、ソフトウェア開発、クラウドコンピューティングなどについて、吉澤が調べたり試したことを書いていくブログです。

EmbulkのfilterプラグインをRubyで開発する話

Embulkにはfilterプラグインという仕組みがあり、これを自作することで、Embulkで入力およびパースした結果を色々と加工することができます。例えば、「すべてのログにホスト名を追加する」といった、ログの種類によらない共通処理を定義するのに向いた仕組みです。

ただ、いろいろ試してみた結果、以前の記事で取り上げたような特殊なログを処理する場合でも、

  • パース処理の中で、一度完成したらほとんど直す必要がない基本的な部分 → parserプラグイン
  • 試行錯誤しながら、何度か直す必要がある部分 → filterプラグイン

と使い分けた方が、コードの見通しが良くなりました。個人的には、parserプラグインと同じくらいfilterプラグインも自作することが多そうなので、作り方をメモしておきます。

特定カラムに含まれる文字列を置換するfilterプラグイン

一例として、以下のようなCSVファイルを読み込む際に、comment列に含まれる「: account=(数字)」や「 by alice」という文字列は集計の邪魔なので削除したい、という場合を考えます。

id,account,time,comment
1,,2015-01-27 19:23:49,"login failure: account=alice"
2,,2015-01-27 19:01:23,"login failure: account=bob"
3,alice,2015-01-28 02:20:02,"login by alice"

まず、embulk newコマンドでfilterプラグインのひな形を作成します。

happyturn% embulk new ruby-filter myapp
2015-05-30 16:38:29.963 +0900: Embulk v0.6.10
Creating embulk-filter-myapp/
  Creating embulk-filter-myapp/README.md
  Creating embulk-filter-myapp/LICENSE.txt
  Creating embulk-filter-myapp/.gitignore
  Creating embulk-filter-myapp/Rakefile
  Creating embulk-filter-myapp/Gemfile
  Creating embulk-filter-myapp/embulk-filter-myapp.gemspec
  Creating embulk-filter-myapp/lib/embulk/filter/myapp.rb

コマンド実行直後のembulk-filter-myapp/lib/embulk/filter/myapp.rbのひな形は以下の通りです。

module Embulk
  module Filter

    class MyappFilterPlugin < FilterPlugin
      Plugin.register_filter("myapp", self)

      def self.transaction(config, in_schema, &control)
        # configuration code:
        task = {
          "property1" => config.param("property1", :string),
          "property2" => config.param("property2", :integer, default: 0),
        }

        yield(task, out_columns)
      end

      def init
        # initialization code:
        @property1 = task["property1"]
        @property2 = task["property2"]
      end

      def close
      end

      def add(page)
        # filtering code:
        page.each do |record|
          page_builder.add(record)
        end
      end

      def finish
        page_builder.finish
      end
    end

  end
end

まず、transactionメソッドのなかで、フィルタを通したあとのカラムを定義します。カラムを増減させないなら、in_schemaをそのままout_columnに代入するだけでOKです。initメソッドは、今回のプラグインはパラメータを取らないので空にします。

      def self.transaction(config, in_schema, &control)
        # 説明のために代入文を書いた。yieldメソッドにそのまま渡してもOK
        task = {}
        out_columns = in_schema

        yield(task, out_columns)
      end

      def init
      end

そして、フィルタのためのコードをaddメソッドの中に記載します。変数recordは配列なので、4番目にあるcomment列を参照したいなら、record[3]と指定する必要があります。

      def add(page)
        # filtering code:
        idx = 3
        page.each do |record|
          case record[idx]
          when /^login failure:/
            record[idx] = "login failure"
          when /^login by/
            record[idx] = "login"
          end

          page_builder.add(record)
        end
      end

あるいは、列の順序が変わる可能性があるのでどうしても列を名前で参照したい、という場合は、以下のようにpageからschemaを取り出して、comment列のインデックスを取得することもできます。

      def add(page)
        # find index of "comment" column
        columns = page.schema.select{|c| c.name == "comment" }
        idx = columns[0].index

        # filtering code:
        page.each do |record|
          case record[idx]
          when /^login failure:/
            record[idx] = "login failure"
          when /^login by/
            record[idx] = "login"
          end

          page_builder.add(record)
        end
      end

まあ、今度は逆にカラム名の変更に弱くなってしまうので、どちらが良いかは一長一短かと。大抵の場合はrecord[3]で良い気がします。

あとは、設定ファイルへ以下のようにfilterの名前を記載し、

filters:
  - type: myapp

embulk previewコマンドを実行すれば、フィルタが有効になっていることを確認できます。

happyturn% embulk preview config.yml -L embulk-filter-myapp
2015-05-30 17:20:16.564 +0900: Embulk v0.6.10
2015-05-30 17:20:17.351 +0900: Loaded plugin embulk-filter-myapp (0.1.0)
2015-05-30 17:20:17.365 +0900 [INFO] (preview): Listing local files at directory '/Users/myoshiz/devel/try1/csv' filtering filename by prefix 'sample_'
2015-05-30 17:20:17.369 +0900 [INFO] (preview): Loading files [/Users/myoshiz/devel/try1/csv/sample_01.csv.gz]
+---------+----------------+-------------------------+----------------+
| id:long | account:string |          time:timestamp | comment:string |
+---------+----------------+-------------------------+----------------+
|       1 |                | 2015-01-27 19:23:49 UTC |  login failure |
|       2 |                | 2015-01-27 19:01:23 UTC |  login failure |
|       3 |          alice | 2015-01-28 02:20:02 UTC |          login |
+---------+----------------+-------------------------+----------------+

サンプルコード

今回のサンプルはGistにもアップロードしました。

参考文献