Kafka 设计分析

Scroll Down

生产者

负载均衡

生产者发布到分区Leader上,分区的Follower去同步Leader的消息即可。生产者发送时可以随机发送大欧多个分区的leader上,也可以指定某一个分区去发送,通过实现partitionsFor()来实现发送指定。

异步发送

Kafka提高效率的方式是批处理,在生产者发送数据的时候是尝试在内存里的缓存中积累数据,并在单个请求中发送更大的内容。通过可配置的缓存属性去实现低延迟高吞吐量。

配置信息

  • retries

用来表示生产者发送失败之后的重试次数,

  • buffer.memory

发送缓存区的占用内存,这部分内存包括数据缓存内存和数据压缩的预留内存。当生产速度超过了缓存区的数据发送到Broker的速度的时候就会产生阻塞,当超过max.block.ms的设定的时候就会抛出异常,设置单位是字节,默认是33554432字节。

  • acks

默认值是1 备选有0,1,-1 all,0代表,send方法执行完毕发送到缓存区内就认为已经发送成功。就直接返回了。1代表需要分区leader接收到缓存区的数据之后,send方法才算返回。-1和all是一个意思,不仅仅需要leader接受,而且还需要所有的Follower都同步到,这样才会返回发送完毕。

  • batch.size:

发送缓存区的最大值,该值设置大则需要更大的内存,批处理效果更佳,但是延迟会增大,如果设置小则是频繁的发送,导致吞吐量下降。

  • max.block.ms

这个参数用于控制KafkaProducer.send()KafkaProducer.partitionsFor()执行的最大阻塞时间。

  • linger.ms

延迟发送数据的最大延迟时间。

  • delivery.timeout.ms

调用send()返回后报告成功或失败的最大时间。这个配置等于请求时间*重试次数和延迟发送数据时间之和。如果遇到不可恢复的错误,重试已经用尽,那么生产者会报告发送记录失败。

  • request.timeout.ms

每次请求的最大超时时间,这个时间配置要小于服务器的配置replica.lag.time.max.ms,这样可以避免消息的重复发送。

  • delivery.timeout.ms

调用send()返回后报告成功或失败的最大时间。这个配置等于请求时间和延迟发送数据时间之和。如果遇到不可恢复的错误,重试已经用尽,那么生产者会报告发送记录失败。

消费者

消费向broker发送fetch数据的请求,broker区选择分区进行获取数据,每次请求都是在请求指定的偏移量区消费没然后从broker上拿回来数据,可以根据偏移量进行重复消费。

Push VS Poll

推送模式和拉模式。在现在消息引擎中是很常见的数据获取模型。由Server主动推送到消费者,这样做的方式可以让消费者全功率工作,应为这个消费的速率由Server去控制。但是弊端呢就是如果消费过程的时间比较慢的话,极容易引起消费者苦不堪言,积压着大量的任务。导致消费服务拒绝对外访问。而拉模式呢消费的速度由消费者自己控制,可以落后于生产者,再之后消费者服务器资源充足的时候去把偏移追上来。还有就是使用服务器推送模式的话,会增加消息服务器的额外负担,如果要实现低延迟推送,那势必需要实施推送,这样会导致连接的持有和消费者的量级线性相关。而拉模式则不会有这个情况,拉的情况每次基于当前偏移量区拉去,消费成功之后再更新当前的偏移量。拉模式呢比较劣势的情况就是要有一个长轮询,一直去判断poll回来数据没有,无数据时候不消费进入下一次循环,有数据则处理数据提交偏移量。

消费偏移量

消费偏移量的处理其实是消息引擎的一个性能提升的点,一个消费到达broker之后,由消费者消费,这个偏移量无非就是两种情况,一个有broker维护,另一个就是有消费者提交。但是针对偏移量的提交让broker和Consumer达成一致却不是一个小问题。如果broker维护的话,消费者拉取之后就记作已消费,导致丢消息,由consumer提交偏移量的话的话,消息的状态改为已发送->已发送未确认消费->已确认消费。这样broker会导致多个状态,如果消费者提交确认的时候网络抖动,导致提交失败,势必会导致消息被重复消费。同事,broker还维护了大量的状态,只有达到已确认消费的message才能落盘或者删除。

Kafka的消费偏移量的设计则也是选取了Consumer来提交偏移量,但是不同的实际是减少了Broker维护大量消息的状态。Kafka的每个Topic的每个分区都是有序的并且该分区只能被同一个消费组的其中的一个消费者去消费。由于单分区内是有序的而且broker维护的就是一个消息的偏移量,这样等待消息确认消费的成本比较低廉,而且kafka同样还支持消息回溯,就是指定过去的偏移量区进行重复消费,这样的好处是如果消费者的代码有问题,可以重复消费。其实还是存在重复消费的问题在其中,如果consumer的代码已消费完毕,但是没有提交偏移量,依旧是存在重新消费的。重复消费的问题需要consumer端的业务代码区保证。

MQTT协议=消息传递的语义

  • At most once

最多一次,信息可能丢失,但永远不会被重新传递。

  • At least once

最少一次,信息绝对不会丢失,但可能会被重新传递。

  • Exactly once

不会丢 只传递一次

这是三个语义带来的就是实质的问题其实是:发布消息的持久性保证和使用消息时的保证。在Kafka设计中,如果说消息被发布到broker上之后就该记录不会被丢失,除非服务器爆炸。我们站在生产者和消费者两个角度来看看待这些语义,Exactly once大多数的系统都其实实现的是覆盖面比较少,或者说是某些场景中的Exactly once,Kafka也是如此,在使用Kafka Stream场景中,基于事务生成器来控制,读取偏移量和写入偏移量在一个场景中操作,要么成功要么都失败,对于外部系统的使用要实现Exactly once,可以参照Kafka Connect来补偿实现。此外使用了事务之后性能则会非常低,除了在流处理场景中,基本并未大面积使用。

  • 生产者

生产者发送失败的重试实现了最少一次的语义,禁用失败重试,则是保证了语义最多一次。

  • 消费者

由消费者去提交偏移量,如果说消费了之前去提交偏移量然后对消息的处理异步去做,但是消费逻辑失败了,这样就导致了重新启动的消费者读取的位置是之前保存的,这个就是最多一次,如果是消息处理完毕之后去提交偏移量,这样在提交偏移量的时候该消费者发生了奔溃,那么新连接的消费者就还是读取之前的消费偏移量,这样就是最少一次的语义

至于Exactly once语义,在上述的Kafka Stream场景我们叙述了一下,实际上kafka提供了 幂等生产者 喝事务生产消费来实现,创建的时候生产者需要启动enable.idempotent=true,还需要指定一个TranscationID,然后在消费者也需要去配置一下isolation.level >[read_uncommitted,read_committed]同时,生产者发送代码也是需要修改,就是在发送记录之前需要开启是事务,发送完毕之后需要提交事务。kafka的事务实现依托于二提交。

副本

Kafka的副本是默认启用的。如果你没有对分区指定副本数量,默认是一个。Kafka的副本不像是其他消息引擎的副本需要配置,而且Kafka的副本是一直活跃的参与,用于提升吞吐量和加强容错实现故障快速转移。
每个分区上会有多个副本,包括一个leader和多个follower,并且leader均匀分布在集群内的broker上,通常只有leader对外提供读写,如果leader下线了,会出发选主。理想环境下,leader和follower的消息数据应该是一模一样,实际上则是Follower会落后一些Leader的消息记录,但是Follower会在未来的时间里去把消息同步过来。
当leader节点下线之后,需要从Follower中去再次选举出一个对外提供服务的leader。选举的Follower必须保证和ZK由心跳连接,同时这些Follower的数据不能落后Leader太多,这样丢失的消息不会太多。满足这俩点的副本组成的集合就是传说中的ISR(In Sync Replication),集合内部的副本也不是永久不变的,当某个副本和Leader的消息数量之差相差太大,就会被剔除ISR集合之内,被剔除的副本在未来的某个时间当它的消息记录和leader相差很少的时候又会再同步加入。判断剔除副本和配置项replica.lag.time.max.ms相关,ISR集合又个最少同步leader消息的配置,生产者(ack=-1)发送到Broker上会校验这个配置,如果生产者是(默认或者ack=0)则不会去校验这个配置。

选举

当某个leader出现问题,无法对完提供访问,然后会从这个分区的副本中选择一个leader对外提供访问。从小到达我们都参见过各种各样的选举,无非是多数表决通过(半数以上),三个人中可以接受一个反对票,五个人可以接受二个反对票。在一个系统角度来看呢,这样是允许了单一故障,但是同样也增加可交互,会去交互N,吞吐量是1/N,对于数据规模小的来看比较适用如ZK,但是针对Kafka这样的性能至上的消息引擎,这样的选主不是更高效。Kafka的设计则是缩小了被投票人选的规模,候选人范围就是我们之前说到的ISR,Kafka把ISR动态维护为zookeeper内,在所有的ISR都收到了消息并且写入,才等于了这个分区的消息被已提交,当ISR集合中的副本出现变更,基于Watcher机制会把这个变更持久化到Zookeeper,ISR+容许副本失败(半数以上)投票,就保证了已经提交的消息不回丢失,即使是Ledaer下线也可以,实现了故障转移FailOver。使用副本来保障分布式环境下系统的健壮。Kafak的实现更像是微软的PaxOS算法的实现PacificA,使用其它一致性组件来进行一致性管理。可以阅读如下文章学习PacificA:PacificA论文微软官方PacificA算法阐述腾讯云,还有一点就是,Kafka并不要求ISR内的节点下线后再上线恢复拥有之前的数据记录,而是强制进行一次再同步。同步完毕之后允许再加入ISR集合。值得注意的是Kafka的不丢失数据的保障是ISR中的副本至少有一个是存活的。如果所有的都死亡了,数据是没有保障的。那么针对这样的情况如果你化身为Kafak的开发人员你会怎么处理呢?

  • 我必须要保障数据完整性,我要等ISR的一个副本恢复,不然我就丢失数据了。
  • 如果都死了,那我就随便选一个副本恢复,让他做leader,这样就会丢失数据。

如果我们在ISR中等待副本,那么只要这些副本关闭,我们就将保持不可用状态。
如果这样的副本被破坏了,或者它们的数据丢失了,那么我们就永远完了。另一方面,如果一个不同步的副本复活了,并且我们允许它成为领导者,那么它的日志将成为真相的来源,即使它不能保证拥有所有提交的消息。
在0.11.0.0版本的默认情况下,Kafka选择第一种策略,并倾向于等待一致的副本。可以使用配置属性unclean.leader.election更改此行为。支持可用性和一致性由用户配置选择。默认false。这是可用性和一致性之间的思考。并不是只有Kafak有这种问题,所有的分布式系统都会有,这就需要考虑实际的场景,到底是选可用性还是选一致性。在数据无价这个角度来看,一致性要比可用性对于kafka而言更加重要。所有在生产环境要去禁用unclean.leader.election,并且需要把ISR去设置一个最小数量,如果达到了就不允许写入,再加上报警去发现问题。

副本管理

上面说的都是一个分区内部副本的事情,在一个集群中,分区会会存在很多,所以就需要有一个管理者来控制这么多分区的选主,这个角色在Kafka中就是Controller,检测Broker级别上的故障,并负责更改Broker中所有受影响的分区进行leader选主,当其中一个controller下线之后,幸存的Broker会选择一个成为Controller。Controller的选主依赖于Zk的独占锁,当Broker启动的时候回去zk创建临时节点/controller,如果第一个车创建成功之后其余的都不回去创建,而是watch这个节点,如果遇到原来的controller下线,则更具watch机制,其余的节点又回去会去创建那个临时节点,谁成功了额就是新的Controller。

再平衡

对于有状态的应用程序,最大的性能瓶颈之一是状态变换。在Kafka consumer中,有一个称为“再平衡”的概念,这意味着对于给定的M个分区和一个消费者组中的N个消费者,Kafka将尝试平衡消费者之间的负载,并在理想情况下让每个消费者处理M/N个分区。
Broker还将通过监视使用者的健康状况来动态调整工作负载,以便我们可以将死去的使用者踢出组,并处理新的使用者的加入组请求。
当服务状态很重时,将一个主题分区从实例a重新平衡到B意味着大量的数据传输。
如果触发多个重新平衡,由于数据传输的原因,整个服务可能需要很长时间才能恢复。

日志整理

image
Kafka Topic的日志整理过程如上图,偏移量不变,然后把key的新值覆盖就值,然后在被覆盖的值上打上标记,Kafka在后台线程中删除了旧的记录。压缩日志用于系统重启之后数据集的恢复,可以避免一些聚合数据的丢失。

实现

简单点说就是通过遍历2次Topic的数据。key的新数据去覆盖旧数据。Compaction Log可以理解为一种Kafka自身的宕机恢复的手段,Compaction Log分为 log Tail 和 Log Head。头部是topic中新写入的数据,Tail是尾部的数据,第一次遍历把数据加载在内存中,然后后台清理线程定时去更新。头部的数据可能有重复,但是尾部的一定是唯一的。

compaction1
如图所示,后台线程对静态的partation上的数据做整理之后 P3的value变为了11$,假如此时有一个生产者发送了部分数据到该partition。那么当整理后台线程执行的时候,也是会把旧值标识删除,新值覆盖旧值。
compaction1

那么还你来实现这个清理需求,你会怎么设计呢?我们要考虑Compaction Log内部什么样的日志是需要删除的,如何存储这个Compaction Log内部offset和key的对应关系,还有就是更新Key的value。顺着上面的图片来看,在Broker启动的时候根据主题配置是否启用Compaction Log来看,如果配置了在内存中维护了一个offSetMap,启动一个后台定时线程去吧Partition上的数据往offSetMap存放,当发现在Compaction Log新加入的日志之后,会去这个offSetMap判断是否有重复的key,如果有,就把key的旧offset覆盖替换为新日志的offset。
compaction1
删除的进度和topic的delete.retention.ms配置有关系,默认是24小时,如果使用者在比主题的delete.retention.ms时间更短,则会看到所有已删除记录的delete标记,换句话说:由于删除标记与读取同时发生,所以如果删除标记的滞后时间超过删除标记的滞后时间,使用者可能会错过删除标记。

配置

  • log.cleanup.policy=compact

这个配置是在Broker上配置的,配置在Server.properties文件内,它的优先级大于在Topic上设置的--config "cleanup.policy=compact",

  • log.cleaner.min.compaction.lag.ms

用于配置用于控制延迟清理,即刚进入head的日志等到这个配置的时间之后再去执行整理清理。如果没有配置,除了最后一个活动段,其余的段会去执行清理。

  • log.cleaner.max.compaction.lag.ms

用于配置日志最长的被整理的时间,用于防止某个日志长时间无法被整理。如果这个不配置就根据脏页率来清楚,如果当前log的脏页率小于配置的最小的脏率才回去作清理。下面说脏页的计算公式。

dirty ratio = the number of bytes in the head / total number of bytes in the log(tail + head)

在这里我提一下,kafka的目录结构来看下,分区日志是一种抽象,分区内有序消息,而不必担心Kafka的内部存储。
然而,实际上,分区日志被Kafka代理划分为多个部分。也就是Segments
Segments是存储在文件系统(数据目录和分区目录中)中的文件,它们的名称以.log结尾。
在下面的图中,一个分区日志被分为3个部分:
compaction1
一个分区日志,其中包含3个独立的段文件中的7条记录。
段的第一个偏移量称为段的基偏移量。段文件名总是等于它的基偏移值。分区中的最后一个段称为活动段。只有日志的活动段才能接收新生成的消息。段可以基于时间生成segment.ms也可以根据大小segment.bytes(default is 1GB)生成。段日志是消息存储的地方每个行代表一条消息的值、偏移量、时间戳、密钥、消息大小、压缩编解码器、校验和和消息格式的版本。磁盘上的数据格式与代理通过网络从生产者接收并发送给消费者的数据格式完全相同。Kafka可以高效地传输零拷贝的数据。在kafka的数据目录上我们可以看到有.index,.log文件 使用kafak的DumpLogSegments可以查询二者的对应关系

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /data/kafka/events-1/00000000003065011416.log | head -n 4
Dumping /data/kafka/appusers-1/00000000003065011416.log
Starting offset: 3065011416
offset: 3065011416 position: 0 isvalid: true payloadsize: 2820 magic: 1 compresscodec: NoCompressionCodec crc: 811055132 payload: {"name": "Travis", msg: "Hey, what's up?"}
offset: 3065011417 position: 1779 isvalid: true payloadsize: 2244 magic: 1 compresscodec: NoCompressionCodec crc: 151590202 payload: {"name": "Wale", msg: "Starving."}

compaction1
索引文件是内存映射的,偏移量查找使用二分查找查找小于或等于目标偏移量的最近偏移量。
索引文件由8个字节条目组成,4个字节存储相对于基本偏移量的偏移量,4个字节存储位置。
偏移量相对于基本偏移量,因此仅需要4个字节来存储偏移量。
例如:假设基偏移量是10000000000000000000,而不必存储后面的偏移量10000000000000000001和10000000000000000002,它们只是1和2。基于这些我们可以理解了Kafka的内部存储

  • 分区是Kafka的存储单元
  • 分区被分割成段,段是两个文件:它的日志和索引
  • 索引将每个偏移量映射到其消息在日志中的位置,它们用于查找消息
  • 索引存储相对于其段的基本偏移量的偏移量
  • 存储在磁盘上的数据与代理通过网络从生产者接收并发送给消费者的数据相同(零拷贝)

Kafak多租户

在kafka服务化场景内,集群需要对客户端请求进行配额,控制集群资源的使用,具体一般从网络带宽和有效请求接收俩方面来限制。

  • 定义字节率的阈值来限定网络带宽的配额。 (从 0.9 版本开始)
  • quest 请求率的配额,网络和 I/O线程 cpu利用率的百分比。 (从 0.11 版本开始)

生产者和消费者有可能生产或者消费非常高的数据量或以非常高的速度生成(有可能被攻击),从而垄断Broker的资源,导致网络饱和,导致Broker拒绝其他客户和本身的工作。采用限额可以防止这些问题。

Client Group

Kafka client 是一个用户的概念, 是在一个安全的集群中经过身份验证的用户。在一个支持非授权客户端的集群中,用户是一组非授权的 users,broker使用一个可配置的PrincipalBuilder类来配置 group 规则。 Client-id 是客户端的逻辑分组,客户端应用使用一个有意义的名称进行标识。(user, client-id)元组定义了一个安全的客户端逻辑分组,使用相同的user 和 client-id 标识。
资源配额可以针对 (user,client-id),users 或者client-id groups 三种规则进行配置。对于一个请求连接,连接会匹配最细化的配额规则的限制。同一个 group 的所有连接共享这个 group 的资源配额。 举个例子,如果 (user="test-user", client-id="test-client") 客户端producer 有10MB/sec 的生产资源配置,这10MB/sec 的资源在所有 "test-user" 用户,client-id是 "test-client" 的producer实例中是共享的。

Quota Configuration

资源配额的配置可以根据 (user, client-id),user 和 client-id 三种规则进行定义。在配额级别需要更高(或者更低)的配额的时候,是可以覆盖默认的配额配置。 这种机制和每个 topic 可以自定义日志配置属性类似。 覆盖 User 和 (user, client-id) 规则的配额配置会写到zookeeper的 /config/users路径下,client-id 配额的配置会写到 /config/clients路径下。 这些配置的覆盖会被所有的 brokers 实时的监听到并生效。所以这使得我们修改配额配置不需要重启整个集群。更多细节参考 here。 每个 group 的默认配额可以使用相同的机制进行动态更新。
优先级:

/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
/config/users/<user>
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

Network Bandwidth Quotas

网络带宽配额使用字节速率阈值来定义每个 group 的客户端的共享配额。 默认情况下,每个不同的客户端 group 是集群配置的固定配额,单位是 bytes/sec。 这个配额在broker上进行定义。在 clients 被限制之前,每个 group 的clients可以发布和拉取单个broker 的最大速率,单位是 bytes/sec。

Request Rate Quotas

请求速率的配额定义了一个客户端可以使用 broker request handler I/O 线程和网络线程在一个配额窗口时间内使用的百分比。 n% 的配置代表一个线程的 n%的使用率,所以这种配额是建立在总容量 ((num.io.threads + num.network.threads) * 100)%之上的. 每个 group 的client 的资源在被限制之前可以使用单位配额时间窗口内I/O线程和网络线程利用率的 n%。 由于分配给 I/O和网络线程的数量是基于 broker 的核数,所以请求量的配额代表每个group 的client 使用cpu的百分比。

Enforcement

限制这一块由Broker来控制,并且由Broker实时计算通过滑动窗口设计来实现,如果达到了配额,首先计算将CLient在限额之下延迟量,然后立即返回一个延迟响应。对于fetch请求,响应将不包含任何数据。然后Broker也会关闭和Client的Socket通道,当Client收到了延迟响应之后,它在延迟期间也不回去再向Broker发送数据。多租户这一块,针对滑动窗口着个设计,采取小窗口滑动计算,比如1秒一个这样流量洪峰不高,延迟也不会太大。和大多数的熔断机制设计一致,设计多租户一般用于Kafak上云这些场景,针对开发使用涉及偏运维方向,这一块针对Kafka的设计不再深追。有兴趣的可以自行研究。

Kafka官方文档

Kafka 压缩日志详解

Kafka 存储