fluentd + logstash_formatの難点をrecord_reformerで解決した話


「アプリケーションで出力したログをelasticsearchにインデックスするときにログ時間のフィールド名を@timestampにしたい。」
こんなときにfluentdのlogstash_formatを使うと少々はまります。
今回はそのハマりポイントと解決についてのお話です。

なぜログ時間のフィールド名を@timestampにしたいか

先々にkibanaでもログを取り込むことを考えていました。(kibanaではログ時間のTime Fieldの扱いが@timestampとなっています。)
アプリケーションから出力されるフィールドはtimeのままで@timestampにしたくありません。
logstash_formatを使えばtimeフィールドを@timestampに変換してくれますが難点があります。

logstash_formatを使わない理由

logstash_formatはfluentdでsource化したログをlogstashのログ形式にフォーマットしてくれるプラグインです。例えば以下のログをfluentdでsource化してelasticsearchにインデックスするとインデックスは以下のようになります。

ログ

1
{"time":"2016-12-06T10:43:10Z","post_id":1000,"category_id":200,"user_id":123}

elasticsearchでindex確認

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
$ curl -XGET "http://localhost:9200/localhost.api-2016.12.06/logstash_action_log/AVjnaIabuvtAHVSVViec?pretty"
{
  "_index" : "localhost.api-2016.12.06",
  "_type" : "logstash_action_log",
  "_id" : "AVjnaIabuvtAHVSVViec",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "post_id" : 1000,
    "category_id" : 200,
    "user_id" : 123,
    "@timestamp" : "2016-12-06T10:43:10+00:00"
  }
}

ここに1つの難点があります。logstash_formatを使うとアプリケーションログのtimeは@timestampに変換してくれます。(便利)
だけど、インデックス名がlocalhost.api-yyyyMMddとなってしまいます。
fluentdの設定からはインデックス名のprefixは指定できますが、時刻部分はコントロールできません。(type名は指定できます)fluentdの設定でいうと以下の部分です。

1
2
3
4
5
6
7
8
api.logstash_action_log>
  type copy
  <store></store>
〜〜省略〜〜
    type_name logstash_action_log ← indexのtype名

    logstash_format true ←ココでフォーマットを有効化
    logstash_prefix "localhost.api" ←ココでindex名のprefixを指定

インデックス名はコントロールしたいけど、timeフィールドは@timestampに変換してほしい。 logstash_formatだとインデックス名はコントロールできない。そこでrecord_reformerの出番です。

record_reformer

record_reformerはfluentdでsource化したログのフィールド名を新しいフィールド名に書き換えることができます。
ログ値も同様に書き換えられます。以下、利用例です。

fluentdの設定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
foo.**>
  type record_reformer
  remove_keys remove_me
  renew_record false
  enable_ruby false

  tag reformed.${tag_prefix[-2]}
  <record></record>
    hostname ${hostname}
    input_tag ${tag}
    last_tag ${tag_parts[-1]}
    message ${message}, yay!
  

書き換え前のログ

1
2
3
4
5
foo.bar {
  "remove_me":"bar",
  "not_remove_me":"bar",
  "message":"Hello world!"
}

record_reformerが書き換え後のログ

1
2
3
4
5
6
7
reformed.foo {
  "not_remove_me":"bar",
  "hostname":"YOUR_HOSTNAME",
  "input_tag":"foo.bar",
  "last_tag":"bar",
  "message":"Hello world!, yay!",
}

record_reformerが解決してくれる

見事に今回の課題をrecord_reformerが解決してくれました。
record_reformerで書き換えたmatch節からelaseticsearchにcopyするmatch節に繋げるのがコツです。
最終的な設定は以下のようになりました。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<source/>
# 省略


api.reformer_action_log>
  type record_reformer
  enable_ruby true
  tag api.action_log.reformer
  <record></record>
  @timestamp ${time.strftime('%Y-%m-%dT%H:%M:%S%z')}
  


api.action_log.reformer>
  type copy
  <store></store>
    @type elasticsearch
    host "#{ENV['ELASTICSEARCH_HOST']}"
    port "#{ENV['ELASTICSEARCH_PORT']}"

    index_name api
    type_name reformer_action_log

    include_time_key true
    flush_interval 1s

elasticsearchでindex確認

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
$ curl -XGET "http://localhost:9200/api/reformer_action_log/AVjnaJoIuvtAHVSVVied?pretty"
{
  "_index" : "api",
  "_type" : "reformer_action_log",
  "_id" : "AVjnaJoIuvtAHVSVVied",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "post_id" : 1000,
    "category_id" : 200,
    "user_id" : 123,
    "@timestamp" : "2016-12-06T10:43:10+0000"
  }
}

インデックスが定義で指定したreformer_action_logに、timeフィールドは@timestampのフィールド名になりました!

ソースを公開しています

今回のソースはgithubに公開しています。logstashの設定も入れているので合わせて確認できます。

https://github.com/soushin/fluentd-record-reformer


同じ課題にぶち当たっている人の手助けになれば嬉しいです。