背景: 直接使用EFK进行日志收集,在大规模高压力的情况下Elasticsearch会存在丢数据的情况,现在考虑使用MQ(Message Queue)进行缓冲,达到不丢数据的目的。由于对于日志收集响应速度并不是十分高,并且对日志的可靠性要求较高,最终选择Kafka来充当消息队列而非官方推荐的redis。这里着重进行kafka介绍,之后会整合EFK+kafka的应用落地记录。

关于Kafka的基本原理

基本介绍

Kafka是由LinkedIn使用Scala开发的一个分布式的消息系统。最初用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline),Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。kafka是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等。

基本组成

Tips: 主要由四部分组成,Topic(话题)、Producer(生产者)、Broker(服务节点)、Consumer(消费者)

  • Broker

    已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。组成kafka集群的每个服务器,都称为是Broker。Broker可以容纳多个Topic

  • Topic

    是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。每条发送到kafka的消息都有一个类别,这个类别就叫做Topic。可以理解为一个消息队列(Message Queue)的名称。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。 Partition

    • parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
    • partition分区数,控制topic将分片成多少个log。可以显示指定,如果不指定则会使用broker(server.properties)中的num.partitions配置的数量
    • 为了实现扩展性,一个非常大的topic可以分布到多个 broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
    • partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体 (多个partition间)的顺序。
    • 也就是说,一个topic在集群中可以有多个partition,那么分区的策略是什么?(消息发送到哪个分区上,有两种基本的策略,一是采用Key Hash算法,一是采用Round Robin算法)

    Offset

    • kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
  • Producer

    消息生产者,就是向kafka broker发消息的客户端。负责发布消息到Kafka broker。

  • Consumer

    消息消费者,向 Kafka broker 读取消息的客户端。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。 Consumer Group

    • 每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)
    • 消息系统有两类,一是广播,二是订阅发布。广播是把消息发送给所有的消费者;发布订阅是把消息只发送给订阅者。Kafka通过Consumer Group组合实现了这两种机制: 实现一个topic消息广播(发给所有的consumer)和单播(发给任意一个consumer)。一个topic可以有多个Consumer Group。
    • topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个 consumer(这是实现一个Topic多Consumer的关键点:为一个Topic定义一个CG,CG下定义多个Consumer)。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
    • 典型的应用场景是,多个Consumer来读取一个Topic(理想情况下是一个Consumer读取Topic的一个Partition),那么可以让这些Consumer属于同一个Consumer Group即可实现消息的多Consumer并行处理,原理是Kafka将一个消息发布出去后,ConsumerGroup中的Consumers可以通过Round Robin的方式进行消费(Consumers之间的负载均衡使用Zookeeper来实现)

搭建kafka集群

**Tips:**这里使用的zk及kafka版本如下 zookeeper:3.4.10 kafka:2.11-0.10.2.0

安装配置zookeeper

  • Install
    1
    2
    3
    4
    5
    6
    7
    8
    
    wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
    tar xvf zookeeper-3.4.10.tar.gz
    cp zookeeper-3.4.10 /usr/local/zookeeper/ -r
    cd /usr/local/zookeeper/
    cp conf/zoo_sample.cfg conf/zoo.cfg
    mkdir -p /var/zookeeper/data
    echo 1 >/var/zookeeper/data/myid
    #注意,这里要与下面config中server.1的配置一致,此server为10.17.0.112,所以echo 1,同理226这台server就echo 2
    
  • Config
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    #cat /usr/local/zookeeper/conf/zoo.cfg
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/var/zookeeper/data
    clientPort=2181
    server.1=10.17.0.112:2888:3888
    server.2=10.17.0.226:2888:3888
    server.3=10.17.0.211:2888:3888
    
  • Start
    1
    2
    3
    
    cd /usr/local/zookeeper
    ./bin/zkServer.sh start
    ./bin/zkServer.sh status
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    #cat /etc/rc.d/init.d/zookeeper
    #!/bin/bash
    #chkconfig:2345 20 90
    #description:zookeeper
    #processname:zookeeper
    case $1 in
          start) /usr/local/zookeeper/bin/zkServer.sh start;;
          stop) /usr/local/zookeeper/bin/zkServer.sh stop;;
          status) /usr/local/zookeeper/bin/zkServer.sh status;;
          restart) /usr/local/zookeeper/bin/zkServer.sh restart;;
          *)  echo "require start|stop|status|restart";;
    esac
    

安装配置kafka

  • Install
    1
    2
    3
    4
    
    wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
    tar xvf kafka_2.11-0.10.2.0.tgz
    cp kafka_2.11-0.10.2.0 /usr/local/kafka/ -r
    cd /usr/local/kafka/
    
  • Config(需要改server和consumer配置)
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    #cat /usr/local/kafka/config/server.properties |egrep -v "^#|^$"
    broker.id=2
    #唯一值,我这里按zk的serverID进行了配置
    listeners = PLAINTEXT://10.17.0.226:9092
    #这个地方在这个版本及以后需要打开注释填写本机地址,之前的版本需配置host.name
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.17.0.112:2181,10.17.0.226:2181,10.17.0.221:2181
    #这里填写zookeeper的地址,另外消费配置上也需要做对应修改
    zookeeper.connection.timeout.ms=6000
    
    1
    2
    3
    4
    
    #cat /usr/local/kafka/config/consumer.properties |egrep -v "^#|^$"
    zookeeper.connect=10.17.0.112:2181,10.17.0.226:2181,10.17.0.221:2181
    zookeeper.connection.timeout.ms=6000
    group.id=test-consumer-group
    
  • Start
    1
    2
    3
    4
    5
    
    cd /usr/local/kafka/
    bin/kafka-server-start.sh config/server.properties &
    #后台执行,退出终端后终止
    bin/kafka-server-start.sh -daemon config/server.properties &
    #后台执行,退出终端后不终止
    
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    #cat /etc/rc.d/init.d/kafka
    #!/bin/bash
    #chkconfig:2345 30 80
    #description:kafka
    #processname:kafka
    case $1 in
     start) /usr/local/kafka/bin/kafka-server-start.sh  -daemon /usr/local/kafka/config/server.properties &;;
     stop) /usr/local/kafka/bin/kafka-server-stop.sh  /usr/local/kafka/config/server.properties;;
     restart) /usr/local/kafka/bin/kafka-server-stop.sh  /usr/local/kafka/config/server.properties
             /usr/local/kafka/bin/kafka-server-start.sh  -daemon /usr/local/kafka/config/server.properties &
             ;;
    
     *)  echo "require start|stop|restart"  ;;
    esac
    

安装配置kafka监控程序

目前常用有三种: Kafka Web Console:监控功能较为全面,可以预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。 Kafka Manager:偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息。 KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。 这里使用第三种KafkaOffsetMonitor

  • install
    1
    2
    
    wget https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar
    #下载即可
    
  • start
    1
    2
    3
    4
    5
    6
    
    java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
         com.quantifind.kafka.offsetapp.OffsetGetterWeb \
         --zk 10.17.0.112:2181,10.17.0.226:2181,10.17.0.221:2181 \
         --port 8089 \
         --refresh 10.seconds \
         --retain 2.days &
    

常用kafka命令

  • 创建及查看Topic
    1
    2
    3
    4
    5
    6
    
    bin/kafka-topics.sh --create --zookeeper 10.17.0.211:2181 --replication-factor 3 --partitions 2 --topic prod-test
    #创建一个名为prod-test的topic,有3个副本(即控制消息保存在3个broker上),2个分区
    bin/kafka-topics.sh --list --zookeeper 10.17.0.211:2181
    #list topic,列出所有topic
    bin/kafka-topics.sh --describe --zookeeper 10.17.0.211:2181 --topic prod-test
    #查看某个topic的具体信息
    
  • 生产消费数据(可以测试集群搭建是否成功)
    1
    2
    3
    4
    
    bin/kafka-console-producer.sh --broker-list 10.17.0.211:9092 --topic prod-test
    #从控制台向topic生产数据
    bin/kafka-console-consumer.sh  --zookeeper 10.17.0.226:2181  --topic prod-test --from-beginning
    #从控制台消费topic prod-test的数据,可以使用任意集群中的地址进行测试
    
  • 查看topic某分区偏移量最大(小)值
    1
    2
    3
    4
    
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic prod-test  --time -2 --broker-list 10.17.0.211:9092 --partitions 1
    #查看prod-test这个topic分区1的分组最小偏移值(time -2 表示最小偏移量)
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic prod-test  --time -1 --broker-list 10.17.0.211:9092 --partitions 1
    #查看prod-test这个topic分区0的分组最大偏移值(time -1 表示最大偏移量
    
  • 增加topic分区数
    1
    2
    
    bin/kafka-topics.sh --zookeeper 10.17.0.211:2181  --alter --topic prod-test --partitions 5
    #增加prod-test的这个topic的分区数到5个(比如之前是3个,就是再增加2个),这个数字只能比现在已有的分区数大。
    
  • 查看topic消费进度
    1
    2
    
    #consumer group可以从zk中查看
    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-consumer-50561  --zookeeper 10.17.0.226:2181