消费kafka数据,将数据同时导入ES以及GCS。两种做法,一种是同一Topic及消费组,采用Fluentd的copy类型,将一份数据拷贝为多份,然后输出;一种是利用订阅模型,采用不同消费组进行消费。这里用了第二种。
安装插件
1
2
3
4
5
|
/opt/td-agent/bin/fluent-gem install fluent-plugin-gcs -v "0.4.0" --no-document
vim /opt/td-agent/cre.json
chown -R td-agent.td-agent /opt/td-agent/cre.json
mkdir /var/log/fluent
chown -R td-agent.td-agent /var/log/fluent
|
参考
https://github.com/daichirata/fluent-plugin-gcs
https://docs.fluentd.org/output/copy
配置示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
<source>
@type kafka_group
consumer_group elasticsearch
brokers kafka.host:9092
topics kafka_backup_to_gcs
# offset_zookeeper
# offset_zk_root_node /fluent-plugin-kafka
#offset 2458884411
start_from_beginning false
add_prefix kafka-backup
</source>
<match kafka-backup.*>
@type gcs
project YOUR_PROJECT
keyfile /opt/td-agent/cre.json
bucket YOUR_GCS_BUCKET_NAME
object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
path logs/${tag}/%Y/%m/%d/
# if you want to use ${tag} or %Y/%m/%d/ like syntax in path / object_key_format,
# need to specify tag for ${tag} and time for %Y/%m/%d in <buffer> argument.
<buffer tag,time>
@type file
path /var/log/fluent/gcs
timekey 1h # 1 hour partition
timekey_wait 10m
timekey_use_utc true # use utc
</buffer>
<format>
@type json
</format>
</match>
|