简介: 生产环境并没有使用传统ELK,而是使用tdagent来代替Logstash作日志收集。 关于td-agent和Fluentd的关系可以引用官网的描述"In one word, td-agent is a stable distribution package of Fluentd."

update:2017-05-20 初次修改

一般架构

fluentd-1

特性记录

  • 版本 2017 年 12 月的时候,fluentd 发布了 v1.0 版本,也就是 td-agent v3 版。
  • 性能 “a regular PC box can handle 18,000 messages/second with a single process.”即一般来说,fluentd 单节点的吞吐量大概是 18w/sec 左右。 要想提高性能的话,可以在输出端(match)指定 num_threads 来提高并发,在输入端安装 fluent-plugin-multiprocess 插件来提高 CPU 的利用率(Ruby 也有 GIL 问题)。

指令说明

指令 功能
source 决定从哪里读取日志,关键字type指定启用插件后配置相关参数
match 设定当满足指定条件时如何处理日志的方法,在source指令追加的标签(tag)满足match指令的条件时,该日志将被指定插件处理。定义输出的目标,如写入文件,或者发送到指定
filter 过滤,也即事件处理流水线,可在输入和输出之间运行
system 系统级别的设置
label 定义一组操作,实现复用及内部路由
@include 引入其他文件,和python的import类似

source指令

  • source **Tips:**每个 source 指令必须包括 “type” 参数,指定使用那种插件

    Routing(路由):source 把事件提交到 fluentd 的路由引擎中。一个事件由三个实体组成:tag、time 和 record。 tag:是一个通过 “.” 来分离的字符串(e.g. myapp.access),用作 Fluentd 内部路由引擎的方向 time:时间字段由输入插件指定,并且必须为 Unix 时间格式。 record:一个 JSON 对象。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    <source>
      type forward #使 fluentd 转变为一个 TCP 端点,以接受 TCP 报文,监听24224端口
      port 24224
    </source>
    
    <source>
      @type http #使fluentd转变为一个httpd端点以接受进入的 http 报文,监听7777端口
      port 7777
    </source>
    #可以使用curl -X POST -d 'json={"json":"message"}' http://localhost:7777/debug.test测试,可以在/var/log/td-agent/td-agent.log看到输入内容,这个例子中tag就是debug.test,时间就是current time,record就是{"json":"message"},这个url还可以写成http://localhost:7777/debug.test?json={"json":"message"}
    
    <source>
      type tail #tail方式是 Fluentd 内置的输入方式,其原理是不停地从源文件中获取增量日志,与linx命令tail相似,也可以使用其他输入方式如http、forward等输入,也可以使用输入插件,将 tail 改为相应的插件名称 如: type tail_ex
      format json  #指定json格式解析,也可使用apache格式(apache为fluentd内置的日志解析器)
      time_key time
      time_format %N
      pos_file /var/log/td-agent/logics_5001.log.pos #优化参数(将access_log上次的读取长度写入到该文件,主要保证在fluentd服务宕机重启后能够继续收集,避免日志数据收集丢失,保证数据收集的完整性),注意此文件的权限
      path /opt/supervisor/log/logics_shard5001.%d.%m.%Y.log   #指定收集日志文件的位置
      tag logics.5001.205  #指定标签,用来对不同的日志进行分类,与match操作相匹配
    </source>
    

match指令

  • match Tips: match指令查询匹配tags事件并处理他们。match 命令的最常见用法是将事件输出到其他系统(因此,与 match 命令对应的插件称为 “输出插件”)。 Fluentd 的标准输出插件包括 file 和 forward。每个 match 指令必须包括一个匹配模式和 type 参数。只有与模式匹配的 “tags” 事件才会发送到输出目标(在上面的示例中,只有标记 “myapp.access” 的事件匹配),Fluentd 尝试按照它们在配置文件中出现的顺序,从上到下来进行 “tags” 匹配,如上一条已经匹配那么下面的将不会被匹配 。type 参数指定使用哪种输出插件

    :匹配单个 tag 部分。例:a.,匹配 a.b,但不匹配 a 或者 a.b.c :匹配 0 或 多个 tag 部分。例:a.**,匹配 a、a.b 和 a.b.c {X,Y,Z}:匹配 X、Y 或 Z,其中 X、Y 和 Z 是匹配模式。可以和 * 和 ** 模式组合使用 当多个模式列在一个 标签(由一个或多个空格分隔)内时,它匹配任何列出的模式

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    <match logics.**> #配置输出数据流的匹配规则及匹配成功后所需要执行的动作,匹配logics标签成功的数据执行转发操作
       type forward # forward模式,转发给其他服务器处理(file类型 会将数据写入到路径文件中)
       send_timeout 60s  #发送事件日志的超时时间,默认60s
       recover_wait 10s  #接受服务器故障恢复之前等待时间,默认10s
       heartbeat_interval 1s  #心跳时间刷新频率
       phi_threshold 16  #用于检测服务器故障的阈值参数。 默认值为16。
       hard_timeout 60s  #用于检测服务器故障的硬超时。 默认值等于send_timeout参数。
       heartbeat_type tcp  #用于心跳的传输协议默认UDP连接,这里为tcp连接方式
       slow_flush_log_threshold 300.0  #用于检查块冲洗性能的阈值。默认值为20.0秒。注意,参数类型是float,而不是时间。
    如果chunk flush需要比这个阈值更长的时间,fluentd日志警告消息如下:2016-12-19 12:00:00 +0000 [warn]:缓冲区刷新花费的时间比slow_flush_log_threshold更长:elapsed_time = 15.0031226690043695 slow_flush_log_threshold = 10.0
       num_threads 2      #default 1
       buffer_chunk_limit  16M  #default 8M
       buffer_queue_limit  256  #default 256
       flush_interval  5s   #default 60s
       <server>
          name logics.shard
          host tdagent.test.net
          port 24224
          weight 1
       </server>
       <secondary>  #所有服务器不可用时使用的备份策略,这里是直接生成文件到本地目录
          type file
          path /var/log/td-agent/logics-forward-failed
       </secondary>
    </match>  
    

filter指令

  • filter Tips:“filter” 指令具有与 “match” 相同的语法,但是 filter 可以串联成 pipeline,对数据进行串行处理,最终再交给 match 输出。 使用 fliters,事件流如下:

    下面例子里,filter 获取数据后,调用原生的 @type record_transformer 插件,在事件的 record 里插入了新的字段 host_param,然后再交给 match 输出。 filter 匹配顺序与 match 相同,我们应该在 之前放置

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    #Input -> filter 1 -> ... -> filter N -> Output(Match tag)
    ### http://this.host:9880/myapp.access?json={"event":"data"}
    <source>
      @type http
      port 9880
    </source>
    <filter myapp.access>
      @type record_transformer
      <record>
        host_param "#{Socket.gethostname}"
      </record>
    </filter>
    <match myapp.access>
      @type file
      path /var/log/fluent/access
    </match>
    

system指令

  • system **Tips:**fluentd的相关设置,也可以在配置文件里设置。包含

    • log_level
    • suppress_repeated_stacktrace
    • emit_error_log_interval
    • suppress_config_dump
    • without_source
    1
    2
    3
    4
    5
    6
    7
    8
    
    <system>
      # equal to -qq option
      log_level error  #启动配置
      # equal to --without-source option
      without_source   #启动配置
      # ...
      process_name fluentd1  #服务进程名,可通过ps查看到
    </system>
    

label指令

  • label **Tips:**label用于将任务进行分组,方便复杂任务的管理。可以在 source 里指定 @label @<LABEL_NAME>,这个 source 所触发的事件就会被发送给指定的 label 所包含的任务,而不会被后续的其他任务获取到。用来接收插件通过调用 emit_error_event API 抛出的异常,使用方法和 label 一样,通过设定 就可以接收到相关的异常。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    
    <source>
      @type forward
    </source>
    <source>
      ### 这个任务指定了 label 为 @SYSTEM
      ### 会被发送给 <label @SYSTEM>
      ### 而不会被发送给下面紧跟的 filter 和 match
      @type tail
      @label @SYSTEM
    </source>
    <filter access.**>
      @type record_transformer
      <record>
        # ...
      </record>
    </filter>
    <match **>
      @type elasticsearch
      # ...
    </match>
    <label @SYSTEM>
      ### 将会接收到上面 @type tail 的 source event
      <filter var.log.middleware.**>
        @type grep
        # ...
      </filter>
      <match **>
        @type s3
        # ...
      </match>
    </label>
    

include指令

  • include **Tips:**使用include指令可以导入其他独立的配置文件中的指令,这些文件可以使用相对路径、绝对路径及HTTP的URL

Fluentd插件

**Tips:**插件有6种类型

  • input:输入
  • output:输出
  • Buffer:缓冲区
  • filter:过滤器
  • Parset:解析器
  • Formatter:格式化器

Fluentd安装

  • 安装步骤 **Tips:**这里安装的td-agent是fluentd的易安装版本,也是业界流行的的安装版本,点击查看版本下载页
    1
    2
    3
    
    cat /etc/issue #这里实验机型为Amazon Linux AMI release 2016.09
    curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
    
    

参考说明

Plugins

参照:https://blog.mallux.me/2017/02/04/fluentd/