背景: 直接使用EFK进行日志收集,在大规模高压力的情况下Elasticsearch会存在丢数据的情况,现在考虑使用MQ(Message Queue)进行缓冲,达到不丢数据的目的。由于对于日志收集响应速度并不是十分高,并且对日志的可靠性要求较高,最终选择Kafka来充当消息队列而非官方推荐的redis。这里着重进行kafka介绍,之后会整合EFK+kafka的应用落地记录。
关于Kafka的基本原理
基本介绍
Kafka是由LinkedIn使用Scala开发的一个分布式的消息系统。最初用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline),Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。kafka是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等。
基本组成
Tips: 主要由四部分组成,Topic(话题)、Producer(生产者)、Broker(服务节点)、Consumer(消费者)
- Broker
已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。组成kafka集群的每个服务器,都称为是Broker。Broker可以容纳多个Topic
- Topic
是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。每条发送到kafka的消息都有一个类别,这个类别就叫做Topic。可以理解为一个消息队列(Message Queue)的名称。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。 Partition
- parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
- partition分区数,控制topic将分片成多少个log。可以显示指定,如果不指定则会使用broker(server.properties)中的num.partitions配置的数量
- 为了实现扩展性,一个非常大的topic可以分布到多个 broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
- partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体 (多个partition间)的顺序。
- 也就是说,一个topic在集群中可以有多个partition,那么分区的策略是什么?(消息发送到哪个分区上,有两种基本的策略,一是采用Key Hash算法,一是采用Round Robin算法)
Offset
- kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
- Producer
消息生产者,就是向kafka broker发消息的客户端。负责发布消息到Kafka broker。
- Consumer
消息消费者,向 Kafka broker 读取消息的客户端。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。 Consumer Group
- 每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)
- 消息系统有两类,一是广播,二是订阅发布。广播是把消息发送给所有的消费者;发布订阅是把消息只发送给订阅者。Kafka通过Consumer Group组合实现了这两种机制: 实现一个topic消息广播(发给所有的consumer)和单播(发给任意一个consumer)。一个topic可以有多个Consumer Group。
- topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个 consumer(这是实现一个Topic多Consumer的关键点:为一个Topic定义一个CG,CG下定义多个Consumer)。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
- 典型的应用场景是,多个Consumer来读取一个Topic(理想情况下是一个Consumer读取Topic的一个Partition),那么可以让这些Consumer属于同一个Consumer Group即可实现消息的多Consumer并行处理,原理是Kafka将一个消息发布出去后,ConsumerGroup中的Consumers可以通过Round Robin的方式进行消费(Consumers之间的负载均衡使用Zookeeper来实现)
搭建kafka集群
**Tips:**这里使用的zk及kafka版本如下 zookeeper:3.4.10 kafka:2.11-0.10.2.0
安装配置zookeeper
- Install
1 2 3 4 5 6 7 8
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz tar xvf zookeeper-3.4.10.tar.gz cp zookeeper-3.4.10 /usr/local/zookeeper/ -r cd /usr/local/zookeeper/ cp conf/zoo_sample.cfg conf/zoo.cfg mkdir -p /var/zookeeper/data echo 1 >/var/zookeeper/data/myid #注意,这里要与下面config中server.1的配置一致,此server为10.17.0.112,所以echo 1,同理226这台server就echo 2
- Config
1 2 3 4 5 6 7 8 9
#cat /usr/local/zookeeper/conf/zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/var/zookeeper/data clientPort=2181 server.1=10.17.0.112:2888:3888 server.2=10.17.0.226:2888:3888 server.3=10.17.0.211:2888:3888
- Start
1 2 3
cd /usr/local/zookeeper ./bin/zkServer.sh start ./bin/zkServer.sh status
1 2 3 4 5 6 7 8 9 10 11 12
#cat /etc/rc.d/init.d/zookeeper #!/bin/bash #chkconfig:2345 20 90 #description:zookeeper #processname:zookeeper case $1 in start) /usr/local/zookeeper/bin/zkServer.sh start;; stop) /usr/local/zookeeper/bin/zkServer.sh stop;; status) /usr/local/zookeeper/bin/zkServer.sh status;; restart) /usr/local/zookeeper/bin/zkServer.sh restart;; *) echo "require start|stop|status|restart";; esac
安装配置kafka
- Install
1 2 3 4
wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz tar xvf kafka_2.11-0.10.2.0.tgz cp kafka_2.11-0.10.2.0 /usr/local/kafka/ -r cd /usr/local/kafka/
- Config(需要改server和consumer配置)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#cat /usr/local/kafka/config/server.properties |egrep -v "^#|^$" broker.id=2 #唯一值,我这里按zk的serverID进行了配置 listeners = PLAINTEXT://10.17.0.226:9092 #这个地方在这个版本及以后需要打开注释填写本机地址,之前的版本需配置host.name num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.17.0.112:2181,10.17.0.226:2181,10.17.0.221:2181 #这里填写zookeeper的地址,另外消费配置上也需要做对应修改 zookeeper.connection.timeout.ms=6000
1 2 3 4
#cat /usr/local/kafka/config/consumer.properties |egrep -v "^#|^$" zookeeper.connect=10.17.0.112:2181,10.17.0.226:2181,10.17.0.221:2181 zookeeper.connection.timeout.ms=6000 group.id=test-consumer-group
- Start
1 2 3 4 5
cd /usr/local/kafka/ bin/kafka-server-start.sh config/server.properties & #后台执行,退出终端后终止 bin/kafka-server-start.sh -daemon config/server.properties & #后台执行,退出终端后不终止
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#cat /etc/rc.d/init.d/kafka #!/bin/bash #chkconfig:2345 30 80 #description:kafka #processname:kafka case $1 in start) /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties &;; stop) /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties;; restart) /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties & ;; *) echo "require start|stop|restart" ;; esac
安装配置kafka监控程序
目前常用有三种: Kafka Web Console:监控功能较为全面,可以预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。 Kafka Manager:偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息。 KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。 这里使用第三种KafkaOffsetMonitor
- install
1 2
wget https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jar #下载即可
- start
1 2 3 4 5 6
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk 10.17.0.112:2181,10.17.0.226:2181,10.17.0.221:2181 \ --port 8089 \ --refresh 10.seconds \ --retain 2.days &
常用kafka命令
- 创建及查看Topic
1 2 3 4 5 6
bin/kafka-topics.sh --create --zookeeper 10.17.0.211:2181 --replication-factor 3 --partitions 2 --topic prod-test #创建一个名为prod-test的topic,有3个副本(即控制消息保存在3个broker上),2个分区 bin/kafka-topics.sh --list --zookeeper 10.17.0.211:2181 #list topic,列出所有topic bin/kafka-topics.sh --describe --zookeeper 10.17.0.211:2181 --topic prod-test #查看某个topic的具体信息
- 生产消费数据(可以测试集群搭建是否成功)
1 2 3 4
bin/kafka-console-producer.sh --broker-list 10.17.0.211:9092 --topic prod-test #从控制台向topic生产数据 bin/kafka-console-consumer.sh --zookeeper 10.17.0.226:2181 --topic prod-test --from-beginning #从控制台消费topic prod-test的数据,可以使用任意集群中的地址进行测试
- 查看topic某分区偏移量最大(小)值
1 2 3 4
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic prod-test --time -2 --broker-list 10.17.0.211:9092 --partitions 1 #查看prod-test这个topic分区1的分组最小偏移值(time -2 表示最小偏移量) bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic prod-test --time -1 --broker-list 10.17.0.211:9092 --partitions 1 #查看prod-test这个topic分区0的分组最大偏移值(time -1 表示最大偏移量
- 增加topic分区数
1 2
bin/kafka-topics.sh --zookeeper 10.17.0.211:2181 --alter --topic prod-test --partitions 5 #增加prod-test的这个topic的分区数到5个(比如之前是3个,就是再增加2个),这个数字只能比现在已有的分区数大。
- 查看topic消费进度
1 2
#consumer group可以从zk中查看 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-consumer-50561 --zookeeper 10.17.0.226:2181