2013年3月13日水曜日

FluentdでJSON形式のログを読み込んで整形する

スズキです。

下記のようなJSON形式のログをFluentd(と、そのプラグイン)で処理したくて、
いろいろと調べてみました。
{ "a": "test", "b": "1", "c": 2, "d": true, "e": "true", "f": [ { "g": "test1" }, { "h": "test2" } ] }

そのそのJSON形式のログもTailプラグインで下記のように"format json"で簡単に
読み込むことができます。
<source>
    type     tail
    format   json
    path     /tmp/test.log
    tag      tail.json
    pos_file /tmp/tail.syslog.pos
</source>
<match tail.json>
    type     file
    path     /tmp/tail
</match>
読み込んだデータは下記のように出力されます。
2013-03-13T16:31:23+09:00       tail.json       {"a":"test","b":"1","c":2,"d":true,"e":"true","f":[{"g":"test1"},{"h":"test2"}]}

しかしFluentdのプラグインは、ネスト構造を考慮してないものが多いような気がします。
なので、下記のようなプラグインを作成してネスト構造のJSONをフラットなJSONに
変更して、他のプラグインで処理できるようにしてみました。
module Fluent
    require "jsonpath"
    class ConvertOutput < Output
        Fluent::Plugin.register_output("convert", self)
        config_param :format, :string, :default => nil
        config_param :tag   , :string, :default => nil

        def configure(conf)
            super
            @formatter = create_formatter(JSON.parse(@format))
        end

        def create_formatter(formatter)
            case formatter
            when Array
                formatter.map{|e| create_formatter(e)}
            when Hash
                formatter.inject({}) do |hash, (k, v)|
                    hash[k] = create_formatter(v)
                    hash
                end
            when String
                if formatter.start_with?("$")
                    JsonPath.new(formatter)
                else
                    formatter
                end
            else
                formatter
            end
        end

        def emit(tag, es, chain)
            es.each do |time, record|
                converted = convert_record(@formatter, record)
                Engine.emit(@tag, time, converted)
            end
            chain.next
        end

        def convert_record(formatter, record)
            case formatter
            when Array
                formatter.map{|e| convert_record(e, record)}
            when Hash
                formatter.inject({}) do |hash, (k, v)|
                    hash[k] = convert_record(v, record)
                    hash
                end
            when JsonPath
                formatter.first(record)
            else
                formatter
            end
        end

    end
end
※上記は"/etc/td-agent/plugin/out_convert.rb"として配置しています。
※下記のように"jsonpath"のGemパッケージをインストールする必要があります。
# /usr/lib64/fluent/ruby/bin/gem install jsonpath
Fetching: jsonpath-0.5.0.gem (100%)
Successfully installed jsonpath-0.5.0
1 gem installed
Installing ri documentation for jsonpath-0.5.0...
Installing RDoc documentation for jsonpath-0.5.0...

Fluentdの設定ファイル(/etc/td-agent/td-agent.conf)は下記のような感じになります。
<source>
    type     tail
    format   json
    path     /tmp/test.log
    tag      tail.json
    pos_file /tmp/tail.syslog.pos
</source>
<match tail.json>
    type     convert
    format   { "a": "$['b']", "b": 10, "c": { "d": "$['a']" }, "e": [ {"f": "$['f'][0]" }, {"g": "$['f'][1]['h']" } ] }
    tag      convert.json
</match>
<match convert.json>
    type     file
    path     /tmp/convert
</match>
※"format"の部分に整形後のログのテンプレート(データの指定はJSONPathを利用)を
記載します。

すると、最初のJSON形式のログが、下記のように整形されて出力されるようになります。
2013-03-13T15:27:10+09:00       convert.json    {"a":"1","b":10,"c":{"d":"test"},"e":[{"f":{"g":"test1"}},{"g":"test2"}]}

これで、いつも使ってるプラグインにつなげれるはず...
--------
http://www.suz-lab.com

0 コメント: