**背景介绍:**线上需求将业务log数据清洗后导入kibana,原始log也要一同导入,从服务直接输出两份log肯定是不合理的,所以考虑从fluentd收集的时候进行处理,先将原始数据复制为2份,然后一份直接导入kibana,另一份通过fluentd的filter进行过滤筛选后导入kibana。其中filter遇到无法筛选nested类型数据的问题,升级版本后并更改写法后解决;升级后导致ES的index无法按原来的方法命名的问题,通过更换ES插件解决,具体见以下记录。
测试环境:
OS: Amazon Linux AMI release 2015.03
Fluentd: td-agent 0.12.20
问题1:filter过滤条件不支持nested
问题描述
- 原始数据如下
需要两个过滤条件
type_name
以及info.Type
(这是一个nested键值)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
{
"Level": "Error",
"logtime": 1519444062811430400,
"@timestamp": "2018-02-24T03:47:42.811Z",
"utc8": "2018-02-24 11:47:42",
"type_name": "CostCurrency",
"gid": 202,
"sid": 3258,
"avatar": "4",
"corplvl": 44,
"channel": "130134001232",
"info": {
"Reason": "AbstractCancelCost",
"Type": "VI_HC",
"Value": 20,
"BefValue": 3139,
"AftValue": 3119,
"VIP": 5
}
}
|
- 首先查找文档找到了filter_grep插件
经过测试,regexp1是可以的,但regexp2不行,提了Issues答复是因为这个插件不支持nested,目前已经被弃用了。测试配置如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
<match logics.**>
@type copy
<store>
@type elasticsearch
...
</store>
<store>
@type relabel
@label @CostCurrency
</store>
</match>
<label @CostCurrency>
<filter logics.**>
@type grep
regexp1 type_name CostCurrency
regexp2 info.Type VI_HC
</filter>
<match logics.**>
@type elasticsearch
...
</match>
</label>
#td-agent --dry-run -c /etc/td-agent/td-agent.conf #测试配置文件命令
|
解决方法
1
2
3
4
5
6
7
8
9
10
11
12
|
<filter logics.**>
@type grep
#regexp1 type_name CostCurrency #这个在v0.12版本是可用的
<regexp>
key type_name
pattern CostCurrency
</regexp>
<regexp>
key $.info.Type
pattern ^VI_HC$
</regexp>
</filter>
|
问题2:升级v0.14之后无法使用${tag_parts[-1]}这种方式命名index
问题描述
- 目前环境中client TD发来的数据tag类似part1.xxx.part2,我希望拥有相同part1和part2的使用同一个index,所以有了这个需求
升级版本之后,之前使用的
logstash_dateformat logics-${tag_parts[-1]}.%Y.%m.%d
这种方式不生效了,经验证,只能使用${tag}
这一种方式,但并达不到我需要的效果,经过查证官方文档,使用elasticsearch_dynamic
插件替代原来的elasticsearch
插件,就可以正常使用了。不过官方警告如下,所以目前考虑是不是需要更改log收集的思路,目前先按这个执行测试,后续有问题再改进
Please note, this uses Ruby’s eval for every message, so there are performance and security implications.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
<store>
# @type elasticsearch
@type elasticsearch_dynamic
host data1.elasticsearch.qa.net
port 9200
request_timeout 15s #defaults to 5s
reload_connections false
reload_on_failure true # defaults to false
logstash_format true
logstash_prefix bilogs-${tag_parts[0]}-${tag_parts[2]}
logstash_dateformat %Y.%m.%d
time_key time
</store>
|
完整配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
<source>
@type forward
#port 24224
</source>
<match debug.**>
@type stdout
</match>
<source>
@type debug_agent
bind 127.0.0.1
port 24230
</source>
##################################################################################
<match logics.**>
@type copy
#@type forest
#subtype copy
<store>
# @type elasticsearch
@type elasticsearch_dynamic
host data1.elasticsearch.taiqa.net
port 9200
request_timeout 15s #defaults to 5s
reload_connections false
reload_on_failure true # defaults to false
logstash_format true
logstash_prefix bilogs-${tag_parts[0]}-${tag_parts[2]}
logstash_dateformat %Y.%m.%d
time_key time
<buffer>
@type file
path /var/log/td-agent/buffer/td-gamex-buffer
chunk_limit_size 512MB #Default: 8MB (memory) / 256MB (file)
total_limit_size 32GB #Default: 512MB (memory) / 64GB (file)
chunk_full_threshold 0.9 #output plugin will flush the chunk when actual size reaches chunk_limit_size * chunk_full_threshold
compress text #The option to specify compression of each chunks, during events are buffered
flush_mode default
flush_interval 15s #Default: 60s
flush_thread_count 1 #Default: 1 The number of threads of output plugins, which is used to write chunks in parallel
delayed_commit_timeout 60 #The timeout seconds until output plugin decides that async write operation fails
overflow_action throw_exception
retry_timeout 10m
</buffer>
</store>
<store>
@type relabel
@label @CostCurrency
</store>
</match>
<label @CostCurrency>
<filter logics.**>
@type grep
#regexp1 type_name CostCurrency
<regexp>
key type_name
pattern CostCurrency
</regexp>
<regexp>
key $.info.Type
pattern ^VI_HC$
</regexp>
</filter>
<match logics.**>
@type elasticsearch
host data1.elasticsearch.taiqa.net
port 9200
request_timeout 15s #defaults to 5s
reload_connections false
reload_on_failure true # defaults to false
logstash_format true
logstash_prefix cost
logstash_dateformat currency-hc.%Y.%m.%d
time_key time
<buffer>
@type file
path /var/log/td-agent/buffer/td-cost-buffer
chunk_limit_size 512MB #Default: 8MB (memory) / 256MB (file)
total_limit_size 32GB #Default: 512MB (memory) / 64GB (file)
chunk_full_threshold 0.9 #output plugin will flush the chunk when actual size reaches chunk_limit_size * chunk_full_threshold
compress text #The option to specify compression of each chunks, during events are buffered
flush_mode default
flush_interval 15s #Default: 60s
flush_thread_count 1 #Default: 1 The number of threads of output plugins, which is used to write chunks in parallel
delayed_commit_timeout 60 #The timeout seconds until output plugin decides that async write operation fails
overflow_action throw_exception
retry_timeout 10m
</buffer>
</match>
</label>
|
参考文档
关于tag或字段rewrite
ES插件文档