一、必须知道的专业名词
kafka中,每个partition可以有多个副本(Replica),分为leader、follower,正常情况下,客户端只向leader发送数据、leader消费数据,follower的出现是为了保证kafka数据的高可用和一致性,也是作为灾备的存在。再保证高可用的过程中,leader与follower进行数据同步时,产生的如下一些专业术语,都是基于partition之内的概念。为了方便理解,假设以下所有的描述,都是基于某个只有一个partition,partition里面有3个副本的topic。
-
HW(High Watermark)俗称高水位,consumer能够消费kafka的最大offset
-
LEO (Log End Offset)已经写入kafka log中日志的最大offset值加1
-
AR (Assigned Replicas) kafka 分区的所有副本
-
ISR (In Sync Replicas) 在kafka中,所有与leader保持数据同步的副本(Replica)
如上图所示是HW与LEO的关系,每一个小块表示一条消息,LEO的值永远是大于HW的。我们消费kafka时,并不是producer往kafka写多少,consumer就消费多少。HW是相对于Replica之间而言的,三个Replica,一个leader,两个follower,当producer往kafka发数据时,在消息被写入leader,leader收到数据后会与其它两个follower保存数据同步。
如上图所示,每个 follower同步的速度可能都不太一样,某一时刻,leader已经写入了7条消息,follow1可能同步了第6条消息,而follow2只同步了4条消息,这时的HW值也就是4,也就是经常说的“木桶效应”,以最低水位的那个follower为准。
二、producer发送acks机制
acks机制是producer往kafka broker发数据时的一种回调机制,由于kafka多副本的特性,简言之,这种机制实际是决定各个副本(leader和follower)与producer的回调策略,目的是确认消息是否发送成功,就像是log4j的日志级别一样,每个级别的要求不一样。有如下三种级别:
-
acks=0:producer不会等待任何来自服务器的响应。
如果当中出现问题,导致服务器没有收到消息,那么producer无从得知,会造成消息丢失
由于producer不需要等待服务器的响应所以可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量
-
acks=1(默认值):只要集群的Leader节点收到消息,生产者就会收到一个来自服务器的成功响应
如果消息无法到达Leader节点(例如Leader节点崩溃,新的Leader节点还没有被选举出来)生产者就会收到一个错误响应,为了避免数据丢失,生产者会重发消息
如果一个没有收到消息的节点成为新Leader,消息还是会丢失
此时的吞吐量主要取决于使用的是同步发送还是异步发送,吞吐量还受到发送中消息数量的限制,例如生产者在收到服务器响应之前可以发送多少个消息
-
acks=-1:只有当所有参与复制的节点全部都收到消息时,生产者才会收到一个来自服务器的成功响应
这种模式是最安全的,可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群依然可以运行
三、leader选举
leader的选举先看回头看上面ISR的定义。kafka每个topic的每个partition都会维护一个ISR列表存储在zookeeper里面,我们进入zk里面,查看一个topic名为test1,partition编号为1的元数据信息,里面的记录了leader是编号为1004的broker,ISR列表是【1004,1003,1005】,其中1003,1005也就是follower。
[zk: localhost:2181(CONNECTED) 7] get /brokers/topics/test1/partitions/1/state
{"controller_epoch":4,"leader":1004,"version":1,"leader_epoch":0,"isr":[1004,1003,1005]}
- 怎样才能定义为ISR呢?
举个栗子,上述的1004,1003,1005中,三人赛跑,1004作为leader跑在最前面,1003,1005作为follower跑在后面紧追leader,假如规定一个距离10m,leader 一旦甩开任意一个follower超过10m的距离,则被甩掉的这个follower会被提出ISR列表,在实际生产中,通过设置broker的参数replica.lag.time.max.ms
来控制这个距离。
-
leader挂了怎么办?
根据优胜劣汰的原则,leader挂了,离leader最近的那个follower,会上位选举成新的leader,因为这个新的leader收到的消息最多
四、Consumer 消费一致性
世上没有100%绝对可靠的系统。万一kafka分区的某个leader节点挂了,还能保证消费者不重复消费,也不多消费吗?
先回头看上面的HW(High Watermark)定义,也就是consumer能够消费的最,大offset。
为验证高可用性,我们来举个反例。下图中,虚线淡黄色的块表示还未从leader同步的数据,按照我们上面说的,consumer只能消费HW线之前的数据。consumer正要消费record1-record3的时候,假设这时leader所在的broker突然挂了,“优胜劣汰”的原则,跑在前面的follower1会被选举成新的leader。这时你可能会问?为什么不可以把follower1的最大值作为HW,这样可以消费到record4呢?再极端一点,假如follower1也同时挂了呢?如果以follower1的record4为HW的话,当leader切换成follower2时,follower2并未收到最初始leader同步过来的record4,所以会造成丢数据!
综上所述,HW必须是以跑的最慢的follower来定,HW机制也很好的保证了消费端的一致性,不会造成consumer少消费数据。但是,这样换区的强一致性必将带来性能上的损耗,很明显的,性能一定的损耗在leader与follower直接的数据同步,这点损耗在大多数场景中还是可以接受的,换来的是kafka的高可用、一致性。
五、总结
kafka作为一个高吞吐的分布式发布订阅消息系统,在大数据应用中非常广泛,可以说是实时计算的标配。它拥有一系列的优势:高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性、高并发等等,满足我们大多数非业务型的消息队列系统,但是也有不少使用kafka用来处理业务型的数据,说明kafka在分布式发布订阅消息系统的地位越来越高,越来越成熟。