消费kafka数据,将数据同时导入ES以及GCS。两种做法,一种是同一Topic及消费组,采用Fluentd的copy类型,将一份数据拷贝为多份,然后输出;一种是利用订阅模型,采用不同消费组进行消费。这里用了第二种。

安装插件

1
2
3
4
5
/opt/td-agent/bin/fluent-gem install fluent-plugin-gcs -v "0.4.0" --no-document
vim /opt/td-agent/cre.json
chown -R td-agent.td-agent /opt/td-agent/cre.json
mkdir /var/log/fluent
chown -R td-agent.td-agent /var/log/fluent

参考

https://github.com/daichirata/fluent-plugin-gcs

https://docs.fluentd.org/output/copy

配置示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<source>
  @type kafka_group
  consumer_group elasticsearch
  brokers kafka.host:9092
  topics kafka_backup_to_gcs
  # offset_zookeeper
  # offset_zk_root_node  /fluent-plugin-kafka
  #offset 2458884411
  start_from_beginning false
  add_prefix kafka-backup
</source>
<match kafka-backup.*>
  @type gcs

  project YOUR_PROJECT
  keyfile /opt/td-agent/cre.json
  bucket YOUR_GCS_BUCKET_NAME
  object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
  path logs/${tag}/%Y/%m/%d/

  # if you want to use ${tag} or %Y/%m/%d/ like syntax in path / object_key_format,
  # need to specify tag for ${tag} and time for %Y/%m/%d in <buffer> argument.
  <buffer tag,time>
    @type file
    path /var/log/fluent/gcs
    timekey 1h # 1 hour partition
    timekey_wait 10m
    timekey_use_utc true # use utc
  </buffer>

  <format>
    @type json
  </format>
</match>