浅析Kafka

Scroll Down

image

文档

中文官方文档【半兽人】翻译

kafka数据可靠性深度解读

Kafka功能

image

发布 & 订阅

数据流,如消息传递系统

处理

高效并实时

存储

数据流安全地在分布式集群中复制存储


kafka是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(超级快)等优点,并已在成千上万家公司运行。

Kafka概念解释

Broker

Controller:主要负责Partition管理和副本状态管理,也会执行类似于重分配Partition之类的管理任务。如果当前的Controller失败,会从其他正常的Broker中重新选举Controller。

consumer -- consumerGroup

分区

是Kafka中横向扩展和一切并行化的基础,
对某一个Partition的所有读写请求都是只由Leader来处理,所以Kafka会尽量把Leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中;
每个分区的消息是有序的,多个分区之间的消息不是有序的
$KAFKA_HOME/config/server.properties
num.partitions=3

分区状态

Kafka中Partition的四种状态

1.NonExistentPartition —— 这个状态表示该分区要么没有被创建过或曾经被创建过但后面被删除了。
2. NewPartition —— 分区创建之后就处于NewPartition状态。在这个状态中,分区应该已经分配了副本,但是还没有选举出leader和ISR。
3. OnlinePartition —— 一旦分区的leader被推选出来,它就处于OnlinePartition状态。
4. OfflinePartition —— 如果leader选举出来后,leader broker宕机了,那么该分区就处于OfflinePartition状态。

状态的转换关系

NonExistentPartition -> NewPartition
首先将第一个可用的副本broker作为leader broker并把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存
然后对这个分区,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上
OnlinePartition, OfflinePartition -> OnlinePartition
为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。

NewPartition, OnlinePartition -> OfflinePartition
标记分区状态为离线(offline)。
OfflinePartition -> NonExistentPartition
离线状态标记为不存在分区,表示该分区失败或者被删除。

Controller选择器

KafkaController中共定义了五种selector选举器:

  1. ReassignedPartitionLeaderSelector
    从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。
  2. PreferredReplicaPartitionLeaderSelector
    如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。
  3. ControlledShutdownLeaderSelector
    将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。
  4. NoOpLeaderSelector
    原则上不做任何事情,返回当前的leader和isr。
  5. OfflinePartitionLeaderSelector
    从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。

所有的leader选择完成后,都要通过请求把具体的request路由到对应的handler处理。

def handle(request: RequestChannel.Request) {  
  try{  
    trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)  
    request.requestId match {  
      case RequestKeys.ProduceKey => handleProducerRequest(request)  // producer  
      case RequestKeys.FetchKey => handleFetchRequest(request)       // consumer  
      case RequestKeys.OffsetsKey => handleOffsetRequest(request)  
      case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)  
      case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成为leader或follower设置同步副本组信息  
      case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)  
      case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)  
      case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)  //shutdown broker  
      case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)  
      case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)  
      case requestId => throw new KafkaException("Unknown api code " + requestId)  
    }  
  } catch {  
    case e: Throwable =>  
      request.requestObj.handleError(e, requestChannel, request)  
      error("error when handling request %s".format(request.requestObj), e)  
  } finally  
    request.apiLocalCompleteTimeMs = SystemTime.milliseconds  
}

offset

一个连续的序列号叫做offset,用于partition唯一标识一条消息,代表一个分区中的第几个消息,每个分区的offset都是从0开始,每添加一个消息,该值就+1;
在0.8.2之前,comsumer定期提交已经消费的kafka消息的offset位置到zookeeper中保存。对zookeeper而言,每次写操作代价是很昂贵的,而且zookeeper集群是不能扩展写能力的。在0.8.2开始,可以把comsumer提交的offset记录在compacted topic(__comsumer_offsets)中,该topic设置最高级别的持久化保证,即ack=-1。__consumer_offsets由一个三元组< comsumer group, topic, partiotion> 组成的key和offset值组成,在内存也维持一个最新的视图view,所以读取很快

segment

partition还可以细分为segment,一个partition物理上由多个segment组成

消息存储目录:

$KAFKA_HOME/config/server.properties
log.dirs=/tmp/kafka-logs
文件存储到/tmp/kafka-logs

Partition的物理结构

Topic 名称 mobileReportUsers ,分区数为4;
则/tmp/kafka-logs/目录下会有4个mobileReportUsers-n的文件夹,n{0,1,2,3}
每个文件下存储这segment的.log数据文件与.index索引文件
每个文件夹就是一个分区,文件夹下的log与index文件就是分区的segment;

Segment文件命名规则

partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充;
所以每个segment的offset起始偏移为“文件名+1”

Segment的insex.log文件

.index里面存储的是offset 与该消息在当前segment中的物理偏移量存储格式为
[offset, 物理偏移量]

如何根据offset查找消息

先用二分法,根据segment的文件名,定位到.index文件,然后从index文件中查找到该offset所在当前.log文件中的偏移量,然后从.log文件偏移量的位置开始读取;每条消息都有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(NBytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。

可靠性与副本

副本

为了提高消息的可靠性,Kafka每个topic的partition有N个副本(replicas),其中N(大于等于1)是topic的复制因子(replica fator)的个数,配置副本方式:
$KAFKA_HOME/config/server.properties
default.replication.refactor=2

在Kafka中发生复制时确保partition的日志能有序地写到其他节点上,N个replicas中,其中一个replica为leader,其他都为follower, leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据

副本中的三种节点

  1. 分区副本的leader (ISR)
  2. 分区处于同步状态的in-sync flower (ISR) (满足同步阈值的队列)
  3. 分区处于非同步状态out-of-sync flower (OSR) (不满足同步阈值的队列)
  4. 分区处于非同步状态的stuck flower 被阻塞的队列
    ISR包括leader和in-sync flower;
    image

LEO

LogEndOffset的缩写,表示每个partition的log最后一条Message的位置

HW

HW是HighWatermark的缩写,取一个partition对应的ISR队列中,最小的LEO作为HW, consumer能够看到的此partition的位置;
HW = min(Leader.HW, Follower.offset)
对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。
image

AR

所有副本(replicas)统称为Assigned Replicas,即AR

ISR(In-Sync Replicas)

副本同步队列,所有不落后的replica集合, 不落后有两层含义:距离上次FetchRequest的时间不大于某一个值或落后的消息数不大于某一个值

OSR(out-of-sync)

副本非同步队列

副本同步队列阈值

Leader负责跟踪副本状态,满足以下条件的flower进入ISR队列,不满足以下条件的flower进入OSR队列:

  • replica.lag.max.messages设置为4,表明只要follower落后leader不超过3,就不会从同步副本列表中移除(该参数在0.10.x版本中已经移除,如果短时间内生产者发送了大量的消息,那么就会导致flower直接掉线)
  • replica.lag.time.max设置为500 ms,表明只要follower向leader发送请求时间间隔不超过500 ms,就不会被标记为死亡,也不会从同步副本列中移除

Flower与leader副本不同步的原因

一个partition的follower落后于leader足够多时,被认为不在同步副本列表或处于滞后状态

  1. 慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。
  2. 卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。
  3. 新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。

同步信息的保存zookeeper

Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:
/brokers/topics/[topicname]/partitions/[partition]/state。目前有两个地方会对这个Zookeeper的节点进行维护:

  • Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。
  • leader来维护:leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。

可靠级别:副本同步策略

一条消息只有被ISR中的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失。而对于producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR中有一个或者以上的follower,一条被commit的消息就不会丢失。

Kafka在Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader(应该是在阈值允许的范围内,认为跟上了leader ),只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)。在这种模式下,对于f+1个副本,一个Kafka topic能在保证不丢失已经commit消息的前提下容忍f个副本的失败

当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

  • 1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果flowser还没有来的及fetch数据,leader宕机了,则会丢失数据。
  • 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • -1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员由于某些情况会增加也会减少,最少就只剩一个leader),这样就变成了acks=1的情况。

case问题处理

注意:当取值为-1时,另外一个参数min.insync.replicas参数开始生效,默认值为1,代表ISR中的最小副本数是多少,如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常(这时候write不能服务,但是read能继续正常服务):org.apache.kafka.common.errors.NotEnoughReplicasExceptoin:Messages are rejected since there are fewer in-sync replicas than required。

此种情况恢复方案:
  • 尝试恢复(重启)replica-0,如果能起来,系统正常;
  • 如果replica-0不能恢复,需要将min.insync.replicas设置为1,恢复write功能。
在ack=-1的时候会有三种情况:
  1. 数据发送到leader后, ISR的follower全部完成数据同步后,leader此时挂掉,那么会选举出新的leader,数据不会丢失。
  2. 数据发送到leader后,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复。
  3. 数据发送到leader后,follower还没同步到任何数据,没有同步到数据的follower被选举为新的leader的话,这样消息就不会重复

数据不一致怎么办

leader挂掉会重新选举,新的leader会发送“指令”让其余的follower截断至自身的HW的位置然后再拉取新的消息。如果失败的follower恢复过来,它首先将自己的log文件截断到上次checkpointed时刻的HW的位置,之后再从leader中同步消息;
这样就保证数据的一致性了
image
如上图,某个topic的某partition有三个副本,分别为A、B、C。A作为leader肯定是LEO最高,B紧随其后,C机器由于配置比较低,网络比较差,故而同步最慢。这个时候A机器宕机,这时候如果B成为leader,假如没有HW,在A重新恢复之后会做同步(makeFollower)操作,在宕机时log文件之后直接做追加操作,而假如B的LEO已经达到了A的LEO,会产生数据不一致的情况,所以使用HW来避免这种情况。

A在做同步操作的时候,先将log文件截断到之前自己的HW的位置,即3,之后再从B中拉取消息进行同步。

如果失败的follower恢复过来,它首先将自己的log文件截断到上次checkpointed时刻的HW的位置,之后再从leader中同步消息。leader挂掉会重新选举,新的leader会发送“指令”让其余的follower截断至自身的HW的位置然后再拉取新的消息。

当ISR中的个副本的LEO不一致时,如果此时leader挂掉,选举新的leader时并不是按照LEO的高低进行选举,而是按照ISR中的顺序选举。

Leader选举策略

两种方案:

  1. 等待ISR中任意一个replica“活”过来,并且选它作为leader(unclean.leader.election.enable=false),kafka数据一致性大于持久化可用性;
  2. 选择第一个“活”过来的replica(并不一定是在ISR中)作为leader (unclean.leader.election.enable=true 默认),kafka的持久化可用性大于数据一致性;

当unclean.leader.election.enable=false的时候,leader只能从ISR中选举,当ISR中所有副本都失效之后,需要ISR中最后失效的那个副本能恢复之后才能选举leader, 即leader 副本replica-0先失效,replica-1后失效,这时[ISR=(1),leader=-1],需要replica-1恢复后才能选举leader;
这种情况下:如果replicat-1不能恢复,保守的方案建议把unclean.leader.election.enable设置为true,但是这样会有丢失数据的情况发生;
如果request.required.acks<>-1,这样就可以恢复服务;
如果request.required.acks=-1这样可以恢复read服务。还需要将min.insync.replicas设置为1,恢复write功能;

当ISR中的replica-0, replica-1同时宕机,此时[ISR=(0,1)],不能对外提供服务,此种情况恢复方案:尝试恢复replica-0和replica-1,当其中任意一个副本恢复正常时,如果request.required.acks<>-1,即可恢复服务;如果request.required.acks=-1,则对外可以提供read服务,直到2个副本恢复正常,write功能才能恢复,或者将将min.insync.replicas设置为1。

Rebalance

controlled.shutdown.enable ,是否在在关闭broker时主动迁移leader partition。基本思想是每次kafka接收到关闭broker进程请求时,主动把leader partition迁移到其存活节