今回は、下記のようなJSON形式のログをFluentd(Fluentdのプラグイン)で処理を行うために、調査してみました。

{ "a": "test", "b": "1", "c": 2, "d": true, "e": "true", "f": [ { "g": "test1" }, { "h": "test2" } ] }

このJSON形式のログもTailプラグインによって下記のようにformat jsonで簡単に読み込むことが可能です。


type tail
format json
path /tmp/test.log
tag tail.json
pos_file /tmp/tail.syslog.pos


type file
path /tmp/tail

読み込んだデータは下記のように出力されます。

2013-03-13T16:31:23+09:00       tail.json       {"a":"test","b":"1","c":2,"d":true,"e":"true","f":[{"g":"test1"},{"h":"test2"}]}

しかしFluentdのプラグインは、ネスト構造を考慮してないものが多いような気がします。
そこで、下記のようなプラグインを作成してネスト構造のJSONをフラットなJSONに変更して、他のプラグインで
処理できるようにしてみました。

module Fluent
require "jsonpath"
class ConvertOutput Fluent::Plugin.register_output("convert", self)
config_param :format, :string, :default => nil
config_param :tag , :string, :default => nil

def configure(conf)
super
@formatter = create_formatter(JSON.parse(@format))
end

def create_formatter(formatter)
case formatter
when Array
formatter.map{|e| create_formatter(e)}
when Hash
formatter.inject({}) do |hash, (k, v)|
hash[k] = create_formatter(v)
hash
end
when String
if formatter.start_with?("$")
JsonPath.new(formatter)
else
formatter
end
else
formatter
end
end

def emit(tag, es, chain)
es.each do |time, record|
converted = convert_record(@formatter, record)
Engine.emit(@tag, time, converted)
end
chain.next
end

def convert_record(formatter, record)
case formatter
when Array
formatter.map{|e| convert_record(e, record)}
when Hash
formatter.inject({}) do |hash, (k, v)|
hash[k] = convert_record(v, record)
hash
end
when JsonPath
formatter.first(record)
else
formatter
end
end

end
end

※上記は、/etc/td-agent/plugin/out_convert.rbとして配置しています。
※下記のように、jsonpathのGemパッケージをインストールする必要があります。

# /usr/lib64/fluent/ruby/bin/gem install jsonpath
Fetching: jsonpath-0.5.0.gem (100%)
Successfully installed jsonpath-0.5.0
1 gem installed
Installing ri documentation for jsonpath-0.5.0...
Installing RDoc documentation for jsonpath-0.5.0...

Fluentdの設定ファイル(/etc/td-agent/td-agent.conf)は下記のようになります。


type tail
format json
path /tmp/test.log
tag tail.json
pos_file /tmp/tail.syslog.pos


type convert
format { "a": "$['b']", "b": 10, "c": { "d": "$['a']" }, "e": [ {"f": "$['f'][0]" }, {"g": "$['f'][1]['h']" } ] }
tag convert.json

type file
path /tmp/convert

※formatの部分に整形後のログのテンプレート(データの指定はJSONPathを利用)を記載します。

そうすると、最初のJSON形式のログが、下記のように整形されて出力されるようになります。

2013-03-13T15:27:10+09:00       convert.json    {"a":"1","b":10,"c":{"d":"test"},"e":[{"f":{"g":"test1"}},{"g":"test2"}]}

こちらの記事はなかの人(suz-lab)監修のもと掲載しています。
元記事は、こちら