**背景介绍:**线上需求将业务log数据清洗后导入kibana,原始log也要一同导入,从服务直接输出两份log肯定是不合理的,所以考虑从fluentd收集的时候进行处理,先将原始数据复制为2份,然后一份直接导入kibana,另一份通过fluentd的filter进行过滤筛选后导入kibana。其中filter遇到无法筛选nested类型数据的问题,升级版本后并更改写法后解决;升级后导致ES的index无法按原来的方法命名的问题,通过更换ES插件解决,具体见以下记录。

测试环境: OS: Amazon Linux AMI release 2015.03 Fluentd: td-agent 0.12.20

问题1:filter过滤条件不支持nested

问题描述

  • 原始数据如下 需要两个过滤条件type_name以及info.Type(这是一个nested键值)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
{
    "Level": "Error",
    "logtime": 1519444062811430400,
    "@timestamp": "2018-02-24T03:47:42.811Z",
    "utc8": "2018-02-24 11:47:42",
    "type_name": "CostCurrency",
    "gid": 202,
    "sid": 3258,
    "avatar": "4",
    "corplvl": 44,
    "channel": "130134001232",
    "info": {
        "Reason": "AbstractCancelCost",
        "Type": "VI_HC",
        "Value": 20,
        "BefValue": 3139,
        "AftValue": 3119,
        "VIP": 5
    }
}
  • 首先查找文档找到了filter_grep插件 经过测试,regexp1是可以的,但regexp2不行,提了Issues答复是因为这个插件不支持nested,目前已经被弃用了。测试配置如下
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<match logics.**>
    @type copy
    <store>
        @type elasticsearch
        ...
    </store>
    <store>
        @type relabel
        @label @CostCurrency
    </store>
</match>
<label @CostCurrency>
    <filter logics.**>
        @type grep
        regexp1 type_name CostCurrency
        regexp2 info.Type VI_HC
    </filter>
    <match logics.**>
        @type elasticsearch
        ...
    </match>
</label>
#td-agent --dry-run -c /etc/td-agent/td-agent.conf  #测试配置文件命令

解决方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
<filter logics.**>
    @type grep
    #regexp1 type_name CostCurrency #这个在v0.12版本是可用的
    <regexp>
      key type_name
      pattern CostCurrency
    </regexp>
    <regexp>
      key $.info.Type
      pattern ^VI_HC$
    </regexp>
  </filter>

问题2:升级v0.14之后无法使用${tag_parts[-1]}这种方式命名index

问题描述

  • 目前环境中client TD发来的数据tag类似part1.xxx.part2,我希望拥有相同part1和part2的使用同一个index,所以有了这个需求 升级版本之后,之前使用的logstash_dateformat logics-${tag_parts[-1]}.%Y.%m.%d这种方式不生效了,经验证,只能使用${tag}这一种方式,但并达不到我需要的效果,经过查证官方文档,使用elasticsearch_dynamic插件替代原来的elasticsearch插件,就可以正常使用了。不过官方警告如下,所以目前考虑是不是需要更改log收集的思路,目前先按这个执行测试,后续有问题再改进

Please note, this uses Ruby’s eval for every message, so there are performance and security implications.

  • 升级后配置如下
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  <store>
#    @type elasticsearch
    @type elasticsearch_dynamic
    host data1.elasticsearch.qa.net
    port 9200
    request_timeout 15s #defaults to 5s
    reload_connections false
    reload_on_failure true # defaults to false
    logstash_format true
    logstash_prefix bilogs-${tag_parts[0]}-${tag_parts[2]}
    logstash_dateformat %Y.%m.%d
    time_key time
  </store>

完整配置

  • 完整中转配置
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
<source>
  @type forward
  #port 24224
</source>
<match debug.**>
  @type stdout
</match>
<source>
  @type debug_agent
  bind 127.0.0.1
  port 24230
</source>
##################################################################################
<match logics.**>
  @type copy
  #@type forest
  #subtype copy
  <store>
#    @type elasticsearch
    @type elasticsearch_dynamic
    host data1.elasticsearch.taiqa.net
    port 9200
    request_timeout 15s #defaults to 5s
    reload_connections false
    reload_on_failure true # defaults to false
    logstash_format true
    logstash_prefix bilogs-${tag_parts[0]}-${tag_parts[2]}
    logstash_dateformat %Y.%m.%d
    time_key time
    <buffer>
      @type file
      path /var/log/td-agent/buffer/td-gamex-buffer
      chunk_limit_size 512MB #Default: 8MB (memory) / 256MB (file)
      total_limit_size 32GB #Default: 512MB (memory) / 64GB (file)
      chunk_full_threshold 0.9 #output plugin will flush the chunk when actual size reaches chunk_limit_size * chunk_full_threshold
      compress text #The option to specify compression of each chunks, during events are buffered
      flush_mode default
      flush_interval 15s #Default: 60s
      flush_thread_count 1 #Default: 1 The number of threads of output plugins, which is used to write chunks in parallel
      delayed_commit_timeout 60 #The timeout seconds until output plugin decides that async write operation fails
      overflow_action throw_exception
      retry_timeout 10m
    </buffer>
  </store>
  <store>
    @type relabel
    @label @CostCurrency
  </store>
</match>
<label @CostCurrency>
  <filter logics.**>
    @type grep
    #regexp1 type_name CostCurrency
    <regexp>
      key type_name
      pattern CostCurrency
    </regexp>
    <regexp>
      key $.info.Type
      pattern ^VI_HC$
    </regexp>
  </filter>
  <match logics.**>
    @type elasticsearch
    host data1.elasticsearch.taiqa.net
    port 9200
    request_timeout 15s #defaults to 5s
    reload_connections false
    reload_on_failure true # defaults to false
    logstash_format true
    logstash_prefix cost
    logstash_dateformat currency-hc.%Y.%m.%d
    time_key time
    <buffer>
      @type file
      path /var/log/td-agent/buffer/td-cost-buffer
      chunk_limit_size 512MB #Default: 8MB (memory) / 256MB (file)
      total_limit_size 32GB #Default: 512MB (memory) / 64GB (file)
      chunk_full_threshold 0.9 #output plugin will flush the chunk when actual size reaches chunk_limit_size * chunk_full_threshold
      compress text #The option to specify compression of each chunks, during events are buffered
      flush_mode default
      flush_interval 15s #Default: 60s
      flush_thread_count 1 #Default: 1 The number of threads of output plugins, which is used to write chunks in parallel
      delayed_commit_timeout 60 #The timeout seconds until output plugin decides that async write operation fails
      overflow_action throw_exception
      retry_timeout 10m
    </buffer>
  </match>
</label>

参考文档

关于tag或字段rewrite ES插件文档