前回の準備編からの続きです。
基本的には、準備編で用意したコードに、Rubyでパース処理を書き足すだけで、簡単に独自のparserプラグインを作れました。ただ、その際にいくつか調べた点や、自分で工夫した点があったのでご紹介します。何かの参考になれば幸いです。
今回のコード
今回書いたサンプルコードは、GitHubにアップロードしました。
muziyoshiz/embulk-parser-multiline-log-sample
中身はembulk new ruby-parserコマンドで生成したひな形の、ほとんどそのままで、編集したのはREADME.mdと、lib/embulk/parser/multiline-log-sample.rbのみです。今回は、手元にある特殊なログを解析する、というユースケースのため、guessの方は書きませんでした。
lib/embulk/parser/multiline-log-sample.rbを直接見たい方はこちら(↓)。
embulk-parser-multiline-log-sample/multiline-log-sample.rb
今回書いたパーサの解説
前回のおさらいになりますが、今回は以下のようなログが手元にあることを想定します。
- 基本的には生成時刻とログレベルとエラーメッセージが出力される
- ログレベルはINFO、WARN、またはERROR
- ERRORレベルのときにはスタックトレースが表示される場合もある
- スタックトレースについては、その直前のERRORログの一部としてパースしたい、というニーズがある
今回書いたパーサは、エラーレベルが出力された行から、次のエラーレベルが出力された行の1行前までを、ひとかたまりのログとして解釈します。そして、そのログの一部を、以下のカラムに出力します。
- "time": 時刻
- "log_level": ログレベル
- "message": ログレベルの後ろに表示されるメッセージ
- "second_line": 2行目のログ(スタックトレースの1行目を含む場合がある)
- "third_line": 3行目のログ(スタックトレースの2行目を含む場合がある)
- "caused_by": Caused by節に続くクラス名
- "caused_by_message": Caused byのクラス名に続くメッセージ
カラムの定義は、self.transactionメソッド内で行います。今回は以下のようにしました。
columns = [ Column.new(0, "time", :timestamp), Column.new(1, "log_level", :string), Column.new(2, "message", :string), Column.new(3, "second_line", :string), Column.new(4, "third_line", :string), Column.new(5, "caused_by", :string), Column.new(6, "caused_by_message", :string), ]
カラムに使える型として、ひな形に含まれていた:stringと:integer以外に何があるのか分からなかったのですが、embulk/lib/embulk/column.rbに以下の記載があったので、この5種類かと思います。ドキュメントにも記載があるのかもしれませんが、ちょっと見つけられませんでした。
module Type if Embulk.java? def self.from_java(java_type) java_type.getName.to_sym end def self.new_java_type(ruby_type) case ruby_type when :boolean Java::Types::BOOLEAN when :long Java::Types::LONG when :double Java::Types::DOUBLE when :string Java::Types::STRING when :timestamp Java::Types::TIMESTAMP else raise ArgumentError, "Unknown type #{ruby_type.inspect}: supported types are :boolean, :long, :double, :string and :timestamp" end end end end
設定ファイル
独自のパーサなので特に設定も要らないのですが、以下のように、配列で指定したログレベルだけ出力するようにしてみました。log_levelsを書かなかった場合は、すべてのログレベルを表示します。なお、charsetとnewlineは、前回の準備編で紹介した、LineDecoderユーティリティが使う設定です。
in: type: any file input plugin type parser: type: multiline-log-sample charset: UTF-8 newline: LF log_levels: [ "WARN", "ERROR" ]
設定は、self.transactionメソッドの中で、
task = { "decoder_task" => DataSource.from_java(parser_task.dump), "log_levels" => config.param("log_levels", :array, default: nil) }
のようにハッシュに入れてから、initメソッドの中で
@log_levels = task["log_levels"]
のようにインスタンス変数に格納して使うようです。
config.paramメソッドで指定できる型についても、ドキュメントは見当たらなかったのですが、:arrayが普通に使えました。恐らく、embulk/lib/embulk/data_source.rbにある以下の型が使えるのかな、と思います。もしこれが合ってるとしたら、カラムに使える型とは、微妙に違いますね。
- :integer
- :float
- :string
- :bool
- :hash
- :array
その他の工夫
コードを見てもらえればわかると思いますが、今回は複数行のログを1つのかたまりとして解釈するために、特殊な状態遷移をしています(もっと綺麗な書き方もあると思いますけど……)。
このコードを書くにあたって、以下のように書くとインデントが深くなってしまうので、
while decoder.nextFile while line = decoder.poll # do something end end
以下のように、ファイルの切れ目を無視して、とにかく次の行を読み続けるメソッドを作成し、decoder.pollの代わりにこちらを使いました。
# Return new line from any file or nil def read_new_line(decoder) begin line = decoder.poll rescue # At first time, java.lang.IllegalStateException is thrown end if line == nil return nil if decoder.nextFile == false read_new_line(decoder) else line end end
こういうニーズが多ければ、LineDecoderクラスの方に足しても良いと思いますけど、まあこれは特殊なニーズかも……。
embulk previewの実行
こういう感じでパースされます。バッチリですね。なお、この例では問題ありませんが、ログ出力場所と分析に使うマシンのタイムゾーンが違っている場合は、どういう設定が要るのか少し気になりました。
happyturn% embulk preview -I embulk-parser-multiline-log-sample/lib config.yml 2015-03-22 21:34:57.502 +0900: Embulk v0.5.3 2015-03-22 21:34:58.197 +0900 [INFO] (preview): Listing local files at directory '/Users/myoshiz/devel/embulk-plugins/testdata' filtering filename by prefix 'myproject.log.' 2015-03-22 21:34:58.201 +0900 [INFO] (preview): Loading files [/Users/myoshiz/devel/embulk-plugins/testdata/myproject.log.20150314] +-----------------------------+------------------+----------------------------------------------------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------+--------------------------------+--------------------------+ | time:timestamp | log_level:string | message:string | second_line:string | third_line:string | caused_by:string | caused_by_message:string | +-----------------------------+------------------+----------------------------------------------------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------+--------------------------------+--------------------------+ | 2015-03-14 11:12:22.123 UTC | ERROR | Book reader error | Exception in thread "main" java.lang.IllegalStateException: A book has a null property | at com.example.myproject.Author.getBookIds(Author.java:38) | java.lang.NullPointerException | | | 2015-03-14 11:13:34.456 UTC | WARN | Suspected SQL injection attack (input="; DROP TABLE USERS --") | | | | | | 2015-03-14 11:16:00 UTC | ERROR | Indescribable error | | | | | | 2015-03-14 11:16:45.183 UTC | ERROR | Book reader error | Exception in thread "main" java.lang.IllegalStateException: A book has a null property | at com.example.myproject.Author.getBookIds(Author.java:38) | java.sql.SQLException | Unknown reason | +-----------------------------+------------------+----------------------------------------------------------------+----------------------------------------------------------------------------------------+------------------------------------------------------------+--------------------------------+--------------------------+
embulk runの実行
今回は、ログに対してSQL関数を使うことを想定して、PostgreSQLに出力してみます。まず、embulk-output-postgresqlプラグインをインストールします。
happyturn% embulk gem install embulk-output-postgresql 2015-03-21 17:49:18.942 +0900: Embulk v0.5.2 Fetching: embulk-output-postgresql-0.2.1.gem (100%) Successfully installed embulk-output-postgresql-0.2.1 1 gem installed
適当にPostgreSQLサーバを用意してから、config.ymlにoutputプラグインの設定を書きます。
in: type: file path_prefix: /Users/myoshiz/devel/embulk-plugins/testdata/myproject.log. parser: charset: UTF-8 newline: LF type: multiline-log-sample log_levels: [ "WARN", "ERROR" ] exec: {} out: type: postgresql host: localhost user: embulk password: "" database: embulk_test table: multiline_logs mode: insert
そして、embulk runを実行すると、PostgreSQLにデータが書き込まれます。CREATE TABLE IF NOT EXISTSのあたりでテーブルが自動生成されていることもわかります。
happyturn% embulk run -I embulk-parser-multiline-log-sample/lib config-psql.yml 2015-03-22 21:40:50.830 +0900: Embulk v0.5.3 2015-03-22 21:40:51.939 +0900 [INFO] (transaction): Listing local files at directory '/Users/myoshiz/devel/embulk-plugins/testdata' filtering filename by prefix 'myproject.log.' 2015-03-22 21:40:51.943 +0900 [INFO] (transaction): Loading files [/Users/myoshiz/devel/embulk-plugins/testdata/myproject.log.20150314] 2015-03-22 21:40:52.068 +0900 [INFO] (transaction): SQL: SET search_path TO "public" 2015-03-22 21:40:52.070 +0900 [INFO] (transaction): > 0.00 seconds 2015-03-22 21:40:52.072 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS "multiline_logs" ("time" TIMESTAMP, "log_level" TEXT, "message" TEXT, "second_line" TEXT, "third_line" TEXT, "caused_by" TEXT, "caused_by_message" TEXT) 2015-03-22 21:40:52.077 +0900 [INFO] (transaction): > 0.01 seconds 2015-03-22 21:40:52.123 +0900 [INFO] (transaction): {done: 0 / 1, running: 0} 2015-03-22 21:40:52.144 +0900 [INFO] (task-0000): SQL: SET search_path TO "public" 2015-03-22 21:40:52.145 +0900 [INFO] (task-0000): > 0.00 seconds 2015-03-22 21:40:52.146 +0900 [INFO] (task-0000): Copy SQL: COPY "multiline_logs" ("time", "log_level", "message", "second_line", "third_line", "caused_by", "caused_by_message") FROM STDIN 2015-03-22 21:40:52.232 +0900 [INFO] (task-0000): Loading 4 rows (623 bytes) 2015-03-22 21:40:52.234 +0900 [INFO] (task-0000): > 0.00 seconds (loaded 4 rows in total) 2015-03-22 21:40:52.234 +0900 [INFO] (transaction): {done: 1 / 1, running: 0} 2015-03-22 21:40:52.250 +0900 [INFO] (main): Committed. 2015-03-22 21:40:52.250 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"/Users/myoshiz/devel/embulk-plugins/testdata/myproject.log.20150314"},"out":{}}
PostgreSQLに入ったので、こんな感じで集計できます。
embulk_test=# SELECT embulk_test-# to_char(time,'YYYY-MM-DD') AS day, log_level, message, count(*) AS count embulk_test-# FROM multiline_logs embulk_test-# GROUP BY day, log_level, message embulk_test-# ORDER BY day; day | log_level | message | count ------------+-----------+----------------------------------------------------------------+------- 2015-03-14 | ERROR | Book reader error | 2 2015-03-14 | ERROR | Indescribable error | 1 2015-03-14 | WARN | Suspected SQL injection attack (input="; DROP TABLE USERS --") | 1 (3 rows)
あとは今回の例で言うと、パースした結果を眺めてみて、WARNの"input="以下の内容は別のカラムに分けて分析したい、と思うかもしれません。その場合は、恐らく一度multiline_logsテーブルを消すなり、自分でカラムを足してからembulk runを実行するなりしないとエラーになると思います。そこまではまだ試してないので、そのうち試してみます。
まとめ
こんな感じでパース処理の部分だけ自分で書き足せば、
などは、Embulkのプラグインを入れて設定ファイルを編集するだけで簡単にできました。主要アプリケーションのログのparserプラグインは今後色々出てくると思いますが、この機会に、手元にある独自ログのためのパーサを書いてみても面白いんじゃないでしょうか。
あと、今回は独自ログを対象にしたのでguessの方はサボってしまいましたが、いずれ機会があればそちらの方も書いてみたいと思います。
参考文献
- embulk-output-jdbc/embulk-output-postgresql at master · embulk/embulk-output-jdbc
- 今回使ったembulk-output-postgresqlのページ。
- List of Embulk Plugins by Category | Embulk
- 公式のプラグイン一覧です。パーサはまだ少ないので狙い目?な感じ(2015-03-22時点で4件)。
- embulk-input-filesplit | RubyGems.org | your community gem host
- 過去のログをEmbulk(エンバルク)を使ってバルクロードしよう - Qiita