Embulkにはfilterプラグインという仕組みがあり、これを自作することで、Embulkで入力およびパースした結果を色々と加工することができます。例えば、「すべてのログにホスト名を追加する」といった、ログの種類によらない共通処理を定義するのに向いた仕組みです。
ただ、いろいろ試してみた結果、以前の記事で取り上げたような特殊なログを処理する場合でも、
と使い分けた方が、コードの見通しが良くなりました。個人的には、parserプラグインと同じくらいfilterプラグインも自作することが多そうなので、作り方をメモしておきます。
特定カラムに含まれる文字列を置換するfilterプラグイン
一例として、以下のようなCSVファイルを読み込む際に、comment列に含まれる「: account=(数字)」や「 by alice」という文字列は集計の邪魔なので削除したい、という場合を考えます。
id,account,time,comment 1,,2015-01-27 19:23:49,"login failure: account=alice" 2,,2015-01-27 19:01:23,"login failure: account=bob" 3,alice,2015-01-28 02:20:02,"login by alice"
まず、embulk newコマンドでfilterプラグインのひな形を作成します。
happyturn% embulk new ruby-filter myapp 2015-05-30 16:38:29.963 +0900: Embulk v0.6.10 Creating embulk-filter-myapp/ Creating embulk-filter-myapp/README.md Creating embulk-filter-myapp/LICENSE.txt Creating embulk-filter-myapp/.gitignore Creating embulk-filter-myapp/Rakefile Creating embulk-filter-myapp/Gemfile Creating embulk-filter-myapp/embulk-filter-myapp.gemspec Creating embulk-filter-myapp/lib/embulk/filter/myapp.rb
コマンド実行直後のembulk-filter-myapp/lib/embulk/filter/myapp.rbのひな形は以下の通りです。
module Embulk module Filter class MyappFilterPlugin < FilterPlugin Plugin.register_filter("myapp", self) def self.transaction(config, in_schema, &control) # configuration code: task = { "property1" => config.param("property1", :string), "property2" => config.param("property2", :integer, default: 0), } yield(task, out_columns) end def init # initialization code: @property1 = task["property1"] @property2 = task["property2"] end def close end def add(page) # filtering code: page.each do |record| page_builder.add(record) end end def finish page_builder.finish end end end end
まず、transactionメソッドのなかで、フィルタを通したあとのカラムを定義します。カラムを増減させないなら、in_schemaをそのままout_columnに代入するだけでOKです。initメソッドは、今回のプラグインはパラメータを取らないので空にします。
def self.transaction(config, in_schema, &control) # 説明のために代入文を書いた。yieldメソッドにそのまま渡してもOK task = {} out_columns = in_schema yield(task, out_columns) end def init end
そして、フィルタのためのコードをaddメソッドの中に記載します。変数recordは配列なので、4番目にあるcomment列を参照したいなら、record[3]と指定する必要があります。
def add(page) # filtering code: idx = 3 page.each do |record| case record[idx] when /^login failure:/ record[idx] = "login failure" when /^login by/ record[idx] = "login" end page_builder.add(record) end end
あるいは、列の順序が変わる可能性があるのでどうしても列を名前で参照したい、という場合は、以下のようにpageからschemaを取り出して、comment列のインデックスを取得することもできます。
def add(page) # find index of "comment" column columns = page.schema.select{|c| c.name == "comment" } idx = columns[0].index # filtering code: page.each do |record| case record[idx] when /^login failure:/ record[idx] = "login failure" when /^login by/ record[idx] = "login" end page_builder.add(record) end end
まあ、今度は逆にカラム名の変更に弱くなってしまうので、どちらが良いかは一長一短かと。大抵の場合はrecord[3]で良い気がします。
あとは、設定ファイルへ以下のようにfilterの名前を記載し、
filters: - type: myapp
embulk previewコマンドを実行すれば、フィルタが有効になっていることを確認できます。
happyturn% embulk preview config.yml -L embulk-filter-myapp 2015-05-30 17:20:16.564 +0900: Embulk v0.6.10 2015-05-30 17:20:17.351 +0900: Loaded plugin embulk-filter-myapp (0.1.0) 2015-05-30 17:20:17.365 +0900 [INFO] (preview): Listing local files at directory '/Users/myoshiz/devel/try1/csv' filtering filename by prefix 'sample_' 2015-05-30 17:20:17.369 +0900 [INFO] (preview): Loading files [/Users/myoshiz/devel/try1/csv/sample_01.csv.gz] +---------+----------------+-------------------------+----------------+ | id:long | account:string | time:timestamp | comment:string | +---------+----------------+-------------------------+----------------+ | 1 | | 2015-01-27 19:23:49 UTC | login failure | | 2 | | 2015-01-27 19:01:23 UTC | login failure | | 3 | alice | 2015-01-28 02:20:02 UTC | login | +---------+----------------+-------------------------+----------------+
サンプルコード
今回のサンプルはGistにもアップロードしました。