文档
Kafka功能
发布 & 订阅
数据流,如消息传递系统
处理
高效并实时
存储
数据流安全地在分布式集群中复制存储
kafka是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(超级快)等优点,并已在成千上万家公司运行。
Kafka概念解释
Broker
Controller:主要负责Partition管理和副本状态管理,也会执行类似于重分配Partition之类的管理任务。如果当前的Controller失败,会从其他正常的Broker中重新选举Controller。
consumer -- consumerGroup
分区
是Kafka中横向扩展和一切并行化的基础,
对某一个Partition的所有读写请求都是只由Leader来处理,所以Kafka会尽量把Leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中;
每个分区的消息是有序的,多个分区之间的消息不是有序的
$KAFKA_HOME/config/server.properties
num.partitions=3
分区状态
Kafka中Partition的四种状态
1.NonExistentPartition —— 这个状态表示该分区要么没有被创建过或曾经被创建过但后面被删除了。
2. NewPartition —— 分区创建之后就处于NewPartition状态。在这个状态中,分区应该已经分配了副本,但是还没有选举出leader和ISR。
3. OnlinePartition —— 一旦分区的leader被推选出来,它就处于OnlinePartition状态。
4. OfflinePartition —— 如果leader选举出来后,leader broker宕机了,那么该分区就处于OfflinePartition状态。
状态的转换关系
NonExistentPartition -> NewPartition
首先将第一个可用的副本broker作为leader broker并把所有可用的副本对象都装入ISR,然后写leader和ISR信息到zookeeper中保存
然后对这个分区,发送LeaderAndIsr请求到每个可用的副本broker,以及UpdateMetadata请求到每个可用的broker上
OnlinePartition, OfflinePartition -> OnlinePartition
为该分区选取新的leader和ISR以及接收LeaderAndIsr请求的一组副本,然后写入leader和ISR信息到zookeeper中保存。
NewPartition, OnlinePartition -> OfflinePartition
标记分区状态为离线(offline)。
OfflinePartition -> NonExistentPartition
离线状态标记为不存在分区,表示该分区失败或者被删除。
Controller选择器
KafkaController中共定义了五种selector选举器:
- ReassignedPartitionLeaderSelector
从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。 - PreferredReplicaPartitionLeaderSelector
如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。 - ControlledShutdownLeaderSelector
将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。 - NoOpLeaderSelector
原则上不做任何事情,返回当前的leader和isr。 - OfflinePartitionLeaderSelector
从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。
所有的leader选择完成后,都要通过请求把具体的request路由到对应的handler处理。
def handle(request: RequestChannel.Request) {
try{
trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request) // producer
case RequestKeys.FetchKey => handleFetchRequest(request) // consumer
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成为leader或follower设置同步副本组信息
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) //shutdown broker
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
request.requestObj.handleError(e, requestChannel, request)
error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
offset
一个连续的序列号叫做offset,用于partition唯一标识一条消息,代表一个分区中的第几个消息,每个分区的offset都是从0开始,每添加一个消息,该值就+1;
在0.8.2之前,comsumer定期提交已经消费的kafka消息的offset位置到zookeeper中保存。对zookeeper而言,每次写操作代价是很昂贵的,而且zookeeper集群是不能扩展写能力的。在0.8.2开始,可以把comsumer提交的offset记录在compacted topic(__comsumer_offsets)中,该topic设置最高级别的持久化保证,即ack=-1。__consumer_offsets由一个三元组< comsumer group, topic, partiotion> 组成的key和offset值组成,在内存也维持一个最新的视图view,所以读取很快
segment
partition还可以细分为segment,一个partition物理上由多个segment组成
消息存储目录:
$KAFKA_HOME/config/server.properties
log.dirs=/tmp/kafka-logs
文件存储到/tmp/kafka-logs
Partition的物理结构
Topic 名称 mobileReportUsers ,分区数为4;
则/tmp/kafka-logs/目录下会有4个mobileReportUsers-n的文件夹,n{0,1,2,3}
每个文件下存储这segment的.log数据文件与.index索引文件
每个文件夹就是一个分区,文件夹下的log与index文件就是分区的segment;
Segment文件命名规则
partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充;
所以每个segment的offset起始偏移为“文件名+1”
Segment的insex.log文件
.index里面存储的是offset 与该消息在当前segment中的物理偏移量存储格式为
[offset, 物理偏移量]
如何根据offset查找消息
先用二分法,根据segment的文件名,定位到.index文件,然后从index文件中查找到该offset所在当前.log文件中的偏移量,然后从.log文件偏移量的位置开始读取;每条消息都有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(NBytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。
可靠性与副本
副本
为了提高消息的可靠性,Kafka每个topic的partition有N个副本(replicas),其中N(大于等于1)是topic的复制因子(replica fator)的个数,配置副本方式:
$KAFKA_HOME/config/server.properties
default.replication.refactor=2
在Kafka中发生复制时确保partition的日志能有序地写到其他节点上,N个replicas中,其中一个replica为leader,其他都为follower, leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据
副本中的三种节点
- 分区副本的leader (ISR)
- 分区处于同步状态的in-sync flower (ISR) (满足同步阈值的队列)
- 分区处于非同步状态out-of-sync flower (OSR) (不满足同步阈值的队列)
- 分区处于非同步状态的stuck flower 被阻塞的队列
ISR包括leader和in-sync flower;
LEO
LogEndOffset的缩写,表示每个partition的log最后一条Message的位置
HW
HW是HighWatermark的缩写,取一个partition对应的ISR队列中,最小的LEO作为HW, consumer能够看到的此partition的位置;
HW = min(Leader.HW, Follower.offset)
对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。
AR
所有副本(replicas)统称为Assigned Replicas,即AR
ISR(In-Sync Replicas)
副本同步队列,所有不落后的replica集合, 不落后有两层含义:距离上次FetchRequest的时间不大于某一个值或落后的消息数不大于某一个值
OSR(out-of-sync)
副本非同步队列
副本同步队列阈值
Leader负责跟踪副本状态,满足以下条件的flower进入ISR队列,不满足以下条件的flower进入OSR队列:
- replica.lag.max.messages设置为4,表明只要follower落后leader不超过3,就不会从同步副本列表中移除(该参数在0.10.x版本中已经移除,如果短时间内生产者发送了大量的消息,那么就会导致flower直接掉线)
- replica.lag.time.max设置为500 ms,表明只要follower向leader发送请求时间间隔不超过500 ms,就不会被标记为死亡,也不会从同步副本列中移除
Flower与leader副本不同步的原因
一个partition的follower落后于leader足够多时,被认为不在同步副本列表或处于滞后状态
- 慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。
- 卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。
- 新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。
同步信息的保存zookeeper
Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:
/brokers/topics/[topicname]/partitions/[partition]/state。目前有两个地方会对这个Zookeeper的节点进行维护:
- Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。
- leader来维护:leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。
可靠级别:副本同步策略
一条消息只有被ISR中的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失。而对于producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR中有一个或者以上的follower,一条被commit的消息就不会丢失。
Kafka在Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader(应该是在阈值允许的范围内,认为跟上了leader ),只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)。在这种模式下,对于f+1个副本,一个Kafka topic能在保证不丢失已经commit消息的前提下容忍f个副本的失败
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:
- 1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果flowser还没有来的及fetch数据,leader宕机了,则会丢失数据。
- 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
- -1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员由于某些情况会增加也会减少,最少就只剩一个leader),这样就变成了acks=1的情况。
case问题处理
注意:当取值为-1时,另外一个参数min.insync.replicas参数开始生效,默认值为1,代表ISR中的最小副本数是多少,如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常(这时候write不能服务,但是read能继续正常服务):org.apache.kafka.common.errors.NotEnoughReplicasExceptoin:Messages are rejected since there are fewer in-sync replicas than required。
此种情况恢复方案:
- 尝试恢复(重启)replica-0,如果能起来,系统正常;
- 如果replica-0不能恢复,需要将min.insync.replicas设置为1,恢复write功能。
在ack=-1的时候会有三种情况:
- 数据发送到leader后, ISR的follower全部完成数据同步后,leader此时挂掉,那么会选举出新的leader,数据不会丢失。
- 数据发送到leader后,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复。
- 数据发送到leader后,follower还没同步到任何数据,没有同步到数据的follower被选举为新的leader的话,这样消息就不会重复
数据不一致怎么办
leader挂掉会重新选举,新的leader会发送“指令”让其余的follower截断至自身的HW的位置然后再拉取新的消息。如果失败的follower恢复过来,它首先将自己的log文件截断到上次checkpointed时刻的HW的位置,之后再从leader中同步消息;
这样就保证数据的一致性了
如上图,某个topic的某partition有三个副本,分别为A、B、C。A作为leader肯定是LEO最高,B紧随其后,C机器由于配置比较低,网络比较差,故而同步最慢。这个时候A机器宕机,这时候如果B成为leader,假如没有HW,在A重新恢复之后会做同步(makeFollower)操作,在宕机时log文件之后直接做追加操作,而假如B的LEO已经达到了A的LEO,会产生数据不一致的情况,所以使用HW来避免这种情况。
A在做同步操作的时候,先将log文件截断到之前自己的HW的位置,即3,之后再从B中拉取消息进行同步。
如果失败的follower恢复过来,它首先将自己的log文件截断到上次checkpointed时刻的HW的位置,之后再从leader中同步消息。leader挂掉会重新选举,新的leader会发送“指令”让其余的follower截断至自身的HW的位置然后再拉取新的消息。
当ISR中的个副本的LEO不一致时,如果此时leader挂掉,选举新的leader时并不是按照LEO的高低进行选举,而是按照ISR中的顺序选举。
Leader选举策略
两种方案:
- 等待ISR中任意一个replica“活”过来,并且选它作为leader(unclean.leader.election.enable=false),kafka数据一致性大于持久化可用性;
- 选择第一个“活”过来的replica(并不一定是在ISR中)作为leader (unclean.leader.election.enable=true 默认),kafka的持久化可用性大于数据一致性;
当unclean.leader.election.enable=false的时候,leader只能从ISR中选举,当ISR中所有副本都失效之后,需要ISR中最后失效的那个副本能恢复之后才能选举leader, 即leader 副本replica-0先失效,replica-1后失效,这时[ISR=(1),leader=-1],需要replica-1恢复后才能选举leader;
这种情况下:如果replicat-1不能恢复,保守的方案建议把unclean.leader.election.enable设置为true,但是这样会有丢失数据的情况发生;
如果request.required.acks<>-1,这样就可以恢复服务;
如果request.required.acks=-1这样可以恢复read服务。还需要将min.insync.replicas设置为1,恢复write功能;
当ISR中的replica-0, replica-1同时宕机,此时[ISR=(0,1)],不能对外提供服务,此种情况恢复方案:尝试恢复replica-0和replica-1,当其中任意一个副本恢复正常时,如果request.required.acks<>-1,即可恢复服务;如果request.required.acks=-1,则对外可以提供read服务,直到2个副本恢复正常,write功能才能恢复,或者将将min.insync.replicas设置为1。
Rebalance
controlled.shutdown.enable ,是否在在关闭broker时主动迁移leader partition。基本思想是每次kafka接收到关闭broker进程请求时,主动把leader partition迁移到其存活节点上,即follow replica提升为新的leader partition。如果没有开启这个参数,集群等到replica会话超时,controller节点才会重现选择新的leader partition,这些leader partition在这段时间内也不可读写。如果集群非常大或者partition 很多,partition不可用的时间将会比较长
zk结构
读写机制
使用直接内存和操作系统缓存
- 写message
- 消息从java堆转入page cache(即物理内存)。
- 由异步线程刷盘,消息从page cache刷入磁盘。
- 读message
- 消息直接从page cache转入socket发送出去。
- 当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁盘Load消息到page cache,然后直接从socket发出去
Kafka重度依赖底层操作系统提供的PageCache功能。写盘是使用Asynchronous+Batch的方式,当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小,所以现代的OS都支持PageCache。
读取操作,有Read Request进来的时候分为两种情况,第一种是内存中完成数据交换,写数据的时候需要读的数据已经在page cache中存在了,一种是发生缺页中断,去硬盘上读取;
网络发送
Kafka采用了Sendfile技术,零拷贝;
Kafka的设计初衷是尽一切努力在内存中完成数据交换,无论是对外作为一整个消息系统,或是内部同底层操作系统的交互。如果Producer和Consumer之间生产和消费进度上配合得当,完全可以实现数据交换零I/O。这也就是我为什么说Kafka使用“硬盘”并没有带来过多性能损失的原因。下面是我在生产环境中采到的一些指标。
优化建议
消息大小配置
- 修改kafka的broker配置:message.max.bytes(默认:1000000B),这个参数表示单条消息的最大长度。在使用kafka的时候,应该预估单条消息的最大长度,不然导致发送失败。
- 修改kafka的broker配置:replica.fetch.max.bytes (默认: 1MB),broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
- 修改消费者程序端配置:fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。如果不调节这个参数,就会导致消费者无法消费到消息,并且不会爆出异常或者警告,导致消息在broker中累积,此处要注意。
根据需要,调整上述三个参数的大小。但是否参数调节得越大越好,或者说单条消息越大越好呢?
参考http://www.mamicode.com/info-detail-453907.html的说法:
- 从性能上考虑:通过性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。
- 可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。
- 垃圾回收:更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。
kafka中处理超大消息的一些考虑
Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
针对这个问题,有以下几个建议:
- 最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。
- 第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。
- 第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。
不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:
broker 配置:
message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。
log.segment.bytes (默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
Consumer 配置:
fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。
性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。
可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保 分区数最大的消息不会超过服务器的内存,否则会报OOM错误。同样地,消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。
垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。
一切的一切,都需要在权衡利弊之后,再决定选用哪个最合适的方案。
Partition数量
Kafka的分区数量应该是Broker数量的整数倍,partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上;
- Partition的数量并不是越多越好,Partition的数量越多,平均到每一个Broker上的数量也就越多。考虑到Broker宕机(Network Failure, Full GC)的情况下,需要由Controller来为所有宕机的Broker上的所有Partition重新选举Leader,假设每个Partition的选举消耗10ms,如果Broker上有500个Partition,那么在进行选举的5s的时间里,对上述Partition的读写操作都会触发LeaderNotAvailableException
- 如果挂掉的Broker是整个集群的Controller,那么首先要进行的是重新任命一个Broker作为Controller。新任命的Controller要从Zookeeper上获取所有Partition的Meta信息,获取每个信息大概3-5ms,那么如果有10000个Partition这个时间就会达到30s-50s。而且不要忘记这只是重新启动一个Controller花费的时间,在这基础上还要再加上前面说的选举Leader的时间
- 在Broker端,对Producer和Consumer都使用了Buffer机制。其中Buffer的大小是统一配置的,数量则与Partition个数相同。如果Partition个数过多,会导致Producer和Consumer的Buffer内存占用过大
Pagecache优化
- Kafka官方并不建议通过Broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响。
- 可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能。
- a. 脏页率超过第一个指标会启动pdflush开始Flush Dirty PageCache。
- b. 脏页率超过第二个指标会阻塞所有的写操作来进行Flush。
- c. 根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio。
正常关闭kafka
尽一切努力保证每次停Broker时都可以Clean Shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。kafka和zk关闭的时候不要kill
Rebalance优化
低版本kafka,多个消费节点在zk注册临时节点产生冲突,另外zk同步数据延时性时间拉长远大于(rebalance.max.retries * rebalance.backoff.ms),rebalance就会失败
如下配置会导致rebalance失败
rebalance.max.retries * rebalance.backoff.ms < zookeeper.session.timeout.ms
遇到的异常
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for test-0 due to 30083 ms has passed since batch creation plus linger time
advertised.host.name和host.name该字段的值是生产者和消费者使用的。如果没有设置,则会取host.name的值,默认情况下,该值为localhost。思考一下,如果生产者拿到localhost这个值,只往本地发消息,必然会报错(因为本地没有kafka服务器)出现上面错误,是因为kafka的server.properties没有配置host.name=10.8.122.26;默认是null;每个节点都需要配置下,否则本地收不到kafka消息,也无法给kafka写消息
2.
Configured brokerId 1 doesn't match stored brokerId 0 in meta.properties
kafka.common.InconsistentBrokerIdException: Configured brokerId 1 doesn't match stored brokerId 0 in meta.properties
at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:630)
at kafka.server.KafkaServer.startup(KafkaServer.scala:175)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
at kafka.Kafka$.main(Kafka.scala:67)
at kafka.Kafka.main(Kafka.scala)
错误的原因是log.dirs目录下的meta.properties中配置的broker.id和配置目录下的server.properties中的broker.id不一致了,解决问题的方法是将两者修改一致后再重启。
分区数量的影响
How to choose the number oftopics/partitions in a Kafka cluster?
如何为一个kafka集群选择topics/partitions的数量?
This is a common question asked by many Kafka users.The goal of this post is to explain a few important determining factors andprovide a few simple formulas.
这是许多kafka使用者经常会问到的一个问题。本文的目的是介绍与本问题相关的一些重要决策因素,并提供一些简单的计算公式。
More Partitions Lead to HigherThroughput
越多的分区可以提供更高的吞吐量
The first thing to understand is that a topic partition is the unit ofparallelism in Kafka. On both the producer and the broker side, writes todifferent partitions can be done fully in parallel. So expensive operationssuch as compression can utilize more hardware resources. On the consumer side,Kafka always gives a single partition's data to one consumer thread. Thus, thedegree of parallelism in the consumer (within a consumer group) is bounded bythe number of partitions being consumed. Therefore, in general, the morepartitions there are in a Kafka cluster, the higher the throughput one canachieve.
首先我们需要明白以下事实:在kafka中,单个patition是kafka并行操作的最小单元。
在producer和broker端,向每一个分区写入数据是可以完全并行化的,此时,可以通过加大硬件资源的利用率来提升系统的吞吐量,例如对数据进行压缩。
在consumer段,kafka只允许单个partition的数据被一个consumer线程消费。因此,在consumer端,每一个Consumer Group内部的consumer并行度完全依赖于被消费的分区数量。
综上所述,通常情况下,在一个Kafka集群中,partition的数量越多,意味着可以到达的吞吐量越大。
A rough formula for picking the number of partitionsis based on throughput. You measure the throughout that you can achieve on asingle partition for production (call itp) and consumption (call it c).Let's say your target throughput ist. Then you need to have at least max(t/p,t/c) partitions. The per-partition throughput that one can achieve on theproducer depends on configurations such as the batching size, compressioncodec, type of acknowledgement, replication factor, etc. However, in general,one can produce at 10s of MB/sec on just a single partition as shown in thisbenchmark. The consumer throughput is oftenapplication dependent since it corresponds to how fast the consumer logic canprocess each message. So, you really need to measure it.
我们可以粗略地通过吞吐量来计算kafka集群的分区数量。假设对于单个partition,producer端的可达吞吐量为p,Consumer端的可达吞吐量为c,期望的目标吞吐量为t,那么集群所需要的partition数量至少为max(t/p,t/c)。在producer端,单个分区的吞吐量大小会受到批量大小、数据压缩方法、 确认类型(同步/异步)、复制因子等配置参数的影响。经过测试,在producer端,单个partition的吞吐量通常是在10MB/s左右。在consumer端,单个partition的吞吐量依赖于consumer端每个消息的应用逻辑处理速度。因此,我们需要对consumer端的吞吐量进行测量。
Although it's possible to increase the number ofpartitions over time, one has to be careful if messages are produced with keys.When publishing a keyed message, Kafka deterministically maps the message to apartition based on the hash of the key. This provides a guarantee that messages with the same key are alwaysrouted to the same partition. This guarantee can be important for certainapplications since messages within a partition are always delivered in order tothe consumer. If the number of partitions changes, such a guarantee may nolonger hold. To avoid this situation, a common practice is to over-partition abit. Basically, you determine the number of partitions based on a future targetthroughput, say for one or two years later. Initially, you can just have asmall Kafka cluster based on your current throughput. Over time, you can addmore brokers to the cluster and proportionally move a subset of the existingpartitions to the new brokers (which can be done online). This way, you cankeep up with the throughput growth without breaking the semantics in theapplication when keys are used.
虽然随着时间的推移,我们能够对分区的数量进行添加,但是对于基于Key来生成的这一类消息需要我们重点关注。当producer向kafka写入基于key的消息时,kafka通过key的hash值来确定消息需要写入哪个具体的分区。通过这样的方案,kafka能够确保相同key值的数据可以写入同一个partition。kafka的这一能力对于一部分应用是极为重要的,例如对于同一个key的所有消息,consumer需要按消息的顺序进行有序消费。如果partition的数量发生改变,那么上面的有序性保证将不复存在。为了避免上述情况发生,通常的解决办法是多分配一些分区,以满足未来的需求。通常情况下,我们需要根据未来1到2年的目标吞吐量来设计kafka的分区数量。
一开始,我们可以基于当前的业务吞吐量为kafka集群分配较小的broker数量,随着时间的推移,我们可以向集群中增加更多的broker,然后在线方式将适当比例的partition转移到新增加的broker中去。通过这样的方法,我们可以在满足各种应用场景(包括基于key消息的场景)的情况下,保持业务吞吐量的扩展性。
In addition to throughput, there are a few otherfactors that are worth considering when choosing the number of partitions. Asyou will see, in some cases, having too many partitions may also have negativeimpact.
在设计分区数时,除了吞吐量,还有一些其他因素值得考虑。正如我们后面即将看到的,对于一些应用场景,集群拥有过的分区将会带来负面的影响。
More Partitions Requires More OpenFile Handles
越多的分区需要打开更多地文件句柄
Each partition maps to a directory in the file systemin the broker. Within that log directory, there will be two files (one for theindex and another for the actual data) per log segment. Currently, in Kafka,each broker opens a file handle of both the index and the data file of everylog segment. So, the more partitions, the higher that one needs to configurethe open file handle limit in the underlying operating system. This is mostlyjust a configuration issue. We have seen production Kafka clusters running withmore than 30 thousand open file handles per broker.
在kafka的broker中,每个分区都会对照着文件系统的一个目录。在kafka的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。当前版本的kafka,每个broker会为每个日志段文件打开一个index文件句柄和一个数据文件句柄。因此,随着partition的增多,需要底层操作系统配置更高的文件句柄数量限制。这更多的是一个配置问题。我们曾经见到过,在生产环境Kafka集群中,每个broker打开的文件句柄数量超过30,000。
每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。
More Partitions May IncreaseUnavailability
更多地分区会导致更高的不可用性
Kafka supports intra-cluster replication, which provides higher availabilityand durability. A partition can have multiple replicas, each stored on adifferent broker. One of the replicas is designated as the leader and the restof the replicas are followers. Internally, Kafka manages all those replicasautomatically and makes sure that they are kept in sync. Both the producer andthe consumer requests to a partition are served on the leader replica. When abroker fails, partitions with a leader on that broker become temporarilyunavailable. Kafka will automatically move the leader of those unavailablepartitions to some other replicas to continue serving the client requests. Thisprocess is done by one of the Kafka brokers designated as the controller. Itinvolves reading and writing some metadata for each affected partition inZooKeeper. Currently, operations to ZooKeeper are done serially in thecontroller.
Kafka通过多副本复制技术,实现kafka集群的高可用和稳定性。每个partition都会有多个数据副本,每个副本分别存在于不同的broker。所有的数据副本中,有一个数据副本为Leader,其他的数据副本为follower。在kafka集群内部,所有的数据副本皆采用自动化的方式进行管理,并且确保所有的数据副本的数据皆保持同步状态。不论是producer端还是consumer端发往partition的请求,皆通过leader数据副本所在的broker进行处理。当broker发生故障时,对于leader数据副本在该broker的所有partition将会变得暂时不可用。Kafka将会自动在其他数据副本中选择出一个leader,用于接收客户端的请求。这个过程由kafka controller节点broker自动完成,主要是从Zookeeper读取和修改受影响partition的一些元数据信息。在当前的kafka版本实现中,对于zookeeper的所有操作都是由kafka controller来完成的(serially的方式)。
In the common case when a broker is shut downcleanly, the controller will proactively move the leaders off the shutting downbroker one at a time. The moving of a single leader takes only a fewmilliseconds. So, from the clients perspective, there is only a small window ofunavailability during a clean broker shutdown.
在通常情况下,当一个broker有计划地停止服务时,那么controller会在服务停止之前,将该broker上的所有leader一个个地移走。由于单个leader的移动时间大约只需要花费几毫秒,因此从客户层面看,有计划的服务停机只会导致系统在很小时间窗口中不可用。(注:在有计划地停机时,系统每一个时间窗口只会转移一个leader,其他leader皆处于可用状态。)
However, when a broker is shut down uncleanly (e.g.,kill -9), the observed unavailability could be proportional to the number ofpartitions. Suppose that a broker has a total of 2000 partitions, each with 2replicas. Roughly, this broker will be the leader for about 1000 partitions.When this broker fails uncleanly, all those 1000 partitions become unavailableat exactly the same time. Suppose that it takes 5 ms to elect a new leader fora single partition. It will take up to 5 seconds to elect the new leader forall 1000 partitions. So, for some partitions, their observed unavailability canbe 5 seconds plus the time taken to detect the failure.
然而,当broker非计划地停止服务时(例如,kill -9方式),系统的不可用时间窗口将会与受影响的partition数量有关。假如,一个2节点的kafka集群中存在2000个partition,每个partition拥有2个数据副本。当其中一个broker非计划地宕机,所有1000个partition同时变得不可用。假设每一个partition恢复时间是5ms,那么1000个partition的恢复时间将会花费5秒钟。因此,在这种情况下,用户将会观察到系统存在5秒钟的不可用时间窗口。
If one is unlucky, the failed broker may be thecontroller. In this case, the process of electing the new leaders won't startuntil the controller fails over to a new broker. The controller failoverhappens automatically, but requires the new controller to read some metadatafor every partition from ZooKeeper during initialization. For example, if thereare 10,000 partitions in the Kafka cluster and initializing the metadata fromZooKeeper takes 2 ms per partition, this can add 20 more seconds to the unavailabilitywindow.
更不幸的情况发生在宕机的broker恰好是controller节点时。在这种情况下,新leader节点的选举过程在controller节点恢复到新的broker之前不会启动。Controller节点的错误恢复将会自动地进行,但是新的controller节点需要从zookeeper中读取每一个partition的元数据信息用于初始化数据。例如,假设一个kafka集群存在10,000个partition,从zookeeper中恢复元数据时每个partition大约花费2ms,则controller的恢复将会增加约20秒的不可用时间窗口。
In general, unclean failures are rare. However, ifone cares about availability in those rare cases, it's probably better to limitthe number of partitions per broker to two to four thousand and the totalnumber of partitions in the cluster to low tens of thousand.
通常情况下,非计划的宕机事件发生的情况是很少的。如果系统可用性无法容忍这些少数情况的场景,我们最好是将每个broker的partition数量限制在2,000到4,000,每个kafka集群中partition的数量限制在10,000以内。
More Partitions May Increase End-to-endLatency
越多的分区可能增加端对端的延迟
The end-to-end latency in Kafka is defined by thetime from when a message is published by the producer to when the message isread by the consumer. Kafka only exposes a message to a consumer after it hasbeen committed, i.e., when the message is replicated to all the in-syncreplicas. So, the time to commit a message can be a significant portion of theend-to-end latency. By default, a Kafka broker only uses a single thread toreplicate data from another broker, for all partitions that share replicas betweenthe two brokers. Our experiments show that replicating 1000 partitions from onebroker to another can add about 20 ms latency, which implies that theend-to-end latency is at least 20 ms. This can be too high for some real-timeapplications.
Kafka端对端延迟定义为producer端发布消息到consumer端接收消息所需要的时间。即consumer接收消息的时间减去producer发布消息的时间。Kafka只有在消息提交之后,才会将消息暴露给消费者。例如,消息在所有in-sync副本列表同步复制完成之后才暴露。因此,in-sync副本复制所花时间将是kafka端对端延迟的最主要部分。在默认情况下,每个broker从其他broker节点进行数据副本复制时,该broker节点只会为此工作分配一个线程,该线程需要完成该broker所有partition数据的复制。经验显示,将1000个partition从一个broker到另一个broker所带来的时间延迟约为20ms,这意味着端对端的延迟至少是20ms。这样的延迟对于一些实时应用需求来说显得过长。
Note that this issue is alleviated on a largercluster. For example, suppose that there are 1000 partition leaders on a brokerand there are 10 other brokers in the same Kafka cluster. Each of the remaining10 brokers only needs to fetch 100 partitions from the first broker on average.Therefore, the added latency due to committing a message will be just a few ms,instead of tens of ms.
注意,上述问题可以通过增大kafka集群来进行缓解。例如,将1000个分区leader放到一个broker节点和放到10个broker节点,他们之间的延迟是存在差异的。在10个broker节点的集群中,每个broker节点平均需要处理100个分区的数据复制。此时,端对端的延迟将会从原来的数十毫秒变为仅仅需要几毫秒。
As a rule of thumb, if you care about latency, it'sprobably a good idea to limit the number of partitions per broker to100 x bx r, where b is the number of brokers in a Kafka cluster andris the replication factor.
根据经验,如果你十分关心消息延迟问题,限制每个broker节点的partition数量是一个很好的主意:对于b各broker节点和复制因子为r的kafka集群,整个kafka集群的partition数量最好不超过100br个,即单个partition的leader数量不超过100.
More Partitions May Require MoreMemory In the Client
越多的partition意味着需要客户端需要更多的内存
In the most recent 0.8.2 release which we ship withthe Confluent Platform 1.0, we have developed a more efficientJava producer. One of the nice features of the new producer is that it allowsusers to set an upper bound on the amount of memory used for buffering incomingmessages. Internally, the producer buffers messages per partition. After enoughdata has been accumulated or enough time has passed, the accumulated messagesare removed from the buffer and sent to the broker.
在最新发布的0.8.2版本的kafka中,我们开发了一个更加高效的Java producer。新版producer拥有一个比较好的特征,他允许用户为待接入消息存储空间设置内存大小上限。在内部实现层面,producer按照每一个partition来缓存消息。在数据积累到一定大小或者足够的时间时,积累的消息将会从缓存中移除并发往broker节点。
If one increases the number of partitions, messagewill be accumulated in more partitions in the producer. The aggregate amount ofmemory used may now exceed the configured memory limit. When this happens, theproducer has to either block or drop any new message, neither of which isideal. To prevent this from happening, one will need to reconfigure theproducer with a larger memory size.
如果partition的数量增加,消息将会在producer端按更多的partition进行积累。众多的partition所消耗的内存汇集起来,有可能会超过设置的内容大小限制。当这种情况发生时,producer必须通过消息堵塞或者丢失一些新消息的方式解决上述问题,但是这两种做法都不理想。为了避免这种情况发生,我们必须重新将produder的内存设置得更大一些。
As a rule of thumb, to achieve good throughput, oneshould allocate at least a few tens of KB per partition being produced in theproducer and adjust the total amount of memory if the number of partitionsincreases significantly.
根据经验,为了达到较好的吞吐量,我们必须在producer端为每个分区分配至少几十KB的内存,并且在分区数量显著增加时调整可以使用的内存数量。
A similar issue exists in the consumer as well. Theconsumer fetches a batch of messages per partition. The more partitions that aconsumer consumes, the more memory it needs. However, this is typically only anissue for consumers that are not real time.
类似的事情对于consumer端依然有效。Consumer端每次从kafka按每个分区取出一批消息进行消费。消费的分区数越多,需要的内存数量越大。尽管如此,上述方式主要运用于非实时的应用场景。
batch.size 分区级别的缓存
Kafka 0.8.2之后推出了Java版的全新的producer,这个producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。而consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。
服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本越久越大。
Summary
总结
In general, more partitions in a Kafka cluster leadsto higher throughput. However, one does have to be aware of the potentialimpact of having too many partitions in total or per broker on things likeavailability and latency. In the future, we do plan to improve some of thoselimitations to make Kafka more scalable in terms of the number of partitions.
通常情况下,kafka集群中越多的partition会带来越高的吞吐量。但是,我们必须意识到集群的partition总量过大或者单个broker节点partition过多,都会对系统的可用性和消息延迟带来潜在的影响。未来,我们计划对这些限制进行一些改进,让kafka在分区数量方面变得更加可扩展。