Kafka 是由 Linkedin 开发并开源的分布式消息系统,因其分布式及高吞吐率而被广泛使用,现已与 Cloudera Hadoop、Apache Storm、Apache Spark、Flink 集成。
Kafka 使用场景
- 页面访问量 PV、页面曝光 Expose、页面点击 Click 等行为事件;
- 实时计算中的 Kafka Source、Dataflow Pipeline;
- 业务的消息系统,通过发布订阅消息解耦多组微服务,消除峰值(流入的速度和持久化落盘的速度的差速,流入多,消费慢,用于做消息堆积,将流量平滑到下游的消费系统)
Kafka 是一种分布式的,基于发布/订阅的消息系统。其主要设计目标如下:
- 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上的数据也能保证常数时间复杂度的访问性能;
- 高吞吐率,即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输;
- 支持 Kafka Server 间的消息分区,以及分布式消费,同时保证每个 Partition 内的消息顺序传输;
- 同时支持离线数据处理和实时数据处理;
- Scale out,支持在线水平扩展
为何使用消息系统
解耦
消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
而基于消息发布/订阅机制,可以联动多个业务下游子系统,能够在不侵入的情况下分步编排和开发,来保证数据的一致性。
冗余
有些情况下,处理数据的过程可能会失败,除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失的风险。
许多消息队列所采用的「插入-获取-删除」范式中,在把一个消息从队列中删除之前,需要处理系统明确地指出该消息已经被处理完毕,从而确保你的数据被安全地保存直到你使用完毕。
扩展性
因为消息队列解耦了处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数,扩展就像调大电力按钮一样简单。
灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准,来投入资源随时待命无疑是巨大的浪费。
使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷请求而完全崩溃。
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
顺序保证
在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能够保证数据会按照特定的顺序来处理。
Kafka 保证一个 Partition 内的消息的有序性。
缓冲
在任何重要的系统中,都会有需要不同处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率地执行,写入队列的处理会尽可能地快速。该缓冲有助于控制和优化数据流经过系统的速度。
异步通讯
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
Topic & Partition
Topic
Topic 在逻辑上可以被认为是一个 queue,每条消费都必须指定它的 Topic,可以简单地理解为必须指明把这条消息放进哪个 queue 里。
我们把一类消息按照主题来分类,有点类似于数据库中的表。
为了使得 Kafka 的吞吐率可以线性提高,物理上把 Topic 分成一个或多个 Partition。对应到系统上就是一个或若干个目录。
Broker
Kafka 集群包含一个或多个服务器,每个服务器节点称为一个 Broker。
Broker 存储 Topic 的数据。如果某个 Topic 有 N 个 Partition,集群有 N 个 Broker,那么每个 Broker 存储该 Topic 的一个 Partition。
- 从 Scale out 的性能角度思考,通过 Broker Kafka Server 的更多节点,带来更多的存储,建立更多的 Partition,把 I/O 负载到更多的物理节点上,从而提高总吞吐 IOPS;
- 从 Scale up 的角度思考,一个 Node 拥有越多的 Physical Disk,也可以负载更多的 Partition,从而提升总吞吐 IOPS
- 如果某个 Topic 有 N 个 Partition,集群有(N+M)个 Broker,那么其中有 N 个 Broker 存储该 Topic 的一个 Partition,剩下的 M 个 Broker 不存储该 Topic 的 Partition 数据;
- 如果某个 Topic 有 N 个 Partition,集群中 Broker 的数目少于 N 个,那么一个 Broker 存储该 Topic 的一个或多个 Partition
Topic 只是一个逻辑概念,真正在 Broker 间分布的是 Partition。
每一条消息被发送到 Broker 中,会根据 Partition 规则,选择被存储到哪一个 Partition。如果 Partition 规则设置的合理,那么所有的消息都可以均匀分布到不同的 Partition 中。
实验条件:3 个 Broker,1 个 Topic,无 Replication,异步模式,3 个 Producer,消息 Payload 为 100 字节。
- 当 Partition 数量小于 Broker 个数时,Partition 数量越大,吞吐率越高,且呈线性提升;
- Kafka 会将所有 Partition 均匀分布到所有 Broker 上,所以当只有 2 个 Partition 时,会有 2 个 Broker 为该 Topic 服务;3 个 Partition 时,同理会有 3 个 Broker 为该 Topic 服务;
- 当 Partition 数量多于 Broker 个数时,总吞吐量并未有所提升,甚至还有所下降。可能的原因是,当 Partition 数量为 4 和 5 时,不同的 Broker 上的 Partition 数量不同,而 Producer 会将数据均匀发送到各 Partition 上,这就造成各 Broker 的负载不同,不能最大化集群吞吐量
存储原理
- Kafka 的消息是存在于文件系统之上的,Kafka 高度依赖于文件系统来存储和缓存消息;
- 利用了操作系统的机制,当磁盘操作发生时,操作系统会将主内存中未使用的空间作为缓存,以加速磁盘读写操作。因此,尽管磁盘的读写速度较慢,但通过操作系统的磁盘缓存机制,可以大大提高 Kafka 的性能;
- Kafka 利用顺序 I/O,以及 Page Cache 达成超高吞吐;
- 任何发布到 Partition 的消息都会被追加到 Partition 数据文件的尾部,这样的顺序写磁盘操作让 Kafka 的效率非常高
Kafka 集群保留所有发布的 message,不管这个 message 有没有被消费过,Kafka 提供可配置的保留策略去删除旧数据(还有一种策略是根据分区大小来删除数据)。
例如,如果将保留策略设置为两天,在 message 写入后的两天内,它可用于消费,之后它将被丢弃以腾出空间。Kafka 的性能与存储的数据量大小无关, 所以即使将数据存储很长一段时间也是没有问题的。
Offset 偏移量,每条消息都有一个当前 Partition 下唯一的 64 字节的 Offset,它相当于当前分区第一条消息的偏移量,即第几条消息。消费者可以指定消费的位置信息,当消费者挂掉再重新恢复的时候,可以从消费位置继续消费。
假设现在 Kafka 集群只有一个 Broker,创建了 2 个 Topic:Topic1 和 Topic2,Partition 的数量分别为 1 和 2,那么根目录下就会创建如下三个文件夹:
在 Kafka 文件存储中,同一个 Topic 下可以有多个不同的 Partition,每个 Partition 都是一个目录。而每一个目录又被平均分配成多个大小相等的 Segment File,Segment File 又是由 index file 和 data file 组成,它们总是成对出现,后缀.index
和.log
分表表示 Segment 的索引文件和数据文件。
Segment 是 Kafka 文件存储的最小单位。Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件的最后一条消息的 Offset 值。
其中以索引文件中元数据(3, 497)为例,依次在数据文件中表示第 3 个 message(在全局 Partition 中表示第 368769 + 3 = 368772 个 message)以及该消息的物理偏移地址为 497(即该消息的起始位置离数据文件开头偏移了 497 个字节,我们可以通过该偏移量查询得到该消息的具体内容)。
注意:该 Index 文件并不是从 0 开始,也不是每次递增 1 的(即表示索引文件中包含的索引记录并不是连续的,并且索引条目中存储的 message 在数据文件中的物理偏移量也不是连续递增的)。这是因为在 Kafka 中一个 Partition 中可能包含大量的 message,使用传统的顺序递增方式存储索引条目可能会导致索引文件过大,占用过多的存储空间,所以 Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引。稀疏索引存储的思路是每隔一定的字节数(例如 1000 字节)在数据文件和索引文件中建立一条索引记录,而不是在每个 message 之间都建立一条索引记录。这样做的好处是减小索引文件的大小,进而减少索引文件的读取和写入开销,提高了 Kafka 的读写性能。
- 减少索引占用的存储空间,稀疏索引只在一定字节范围内,每个分区只建立了一定数量的索引,相比于 Dense Index,可以大大减少索引占用的存储空间;
- 提高索引查询效率,稀疏索引建立了一定数量的索引,查询时只需查询这些索引,可以大大减少索引查询所需的时间,提高查询效率;
- 另外,Kafka 的数据写入是追加写入,且数据一旦写入,就不能被修改和删除。这种写入方式也保证了索引的基本有序性,从而提高了查询效率;
- 因为其文件名为上一个 Segment 的最后一条消息的 Offset ,所以当需要查找一个指定 Offset 的 message 时,只需通过在所有的 Segment 的文件名中进行二分查找,就能找到它所归属的 Segment,然后在其 Index 文件中找到对应到文件上的物理位置,就能拿出该 message
Kafka 是如何准确知道 message 的偏移的呢?
这是因为在 Kafka 中定义了标准的数据存储结构,在 Partition 中的每一条 message 都包含了以下三个属性:
- Offset:表示 message 在当前 Partition 中的偏移量,是一个逻辑上的值,唯一确定了 Partition 中的一条 message,可以简单的认为是一个 ID;
- MessageSize:表示 message 内容 Data 的大小;
- Data:message 的具体内容
例如读取 Offset = 368776 的 message,需要通过下面 2 个步骤查找:
- 查找 Segment File,其中 00000000000000000000.index 表示最开始的文件,起始偏移量 Offset 为 0。第二个文件00000000000000368769.index 的消息起始偏移量为 368769 + 1 = 368770。其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据 Offset 二分查找文件列表,就可以快速定位到具体文件。 当 Offset = 368776 时定位到 00000000000000368769.index | log;
- 第二步通过 Segment File 查找 message,通过第一步定位到 Segment File,当 Offset = 368776 时,依次定位到00000000000000368769.index 的元数据物理位置和 00000000000000368769.log 的物理偏移地址,然后再通过00000000000000368769.log 顺序查找直到 Offset = 368776 为止
Segment Index File 采取稀疏索引存储方式,它减少了索引文件的大小,通过 mmap 可以直接内存操作。
mmap(Memory-mapped file)是一种内存映射文件机制,可以将文件数据直接映射到进程的地址空间中,从而允许程序使用内存来直接访问文件数据,而无需将数据先读入内存再进行访问。通过 mmap,一块连续的虚拟内存地址被映射到文件的一个区域或整个文件上,将文件数据当作内存映射到了进程地址空间。
- 数据能够被更高效的读取,因为它可以直接从磁盘读取到内存中,可以避免频繁的系统调用,提高文件 I/O 的效率;
- 内存映射文件允许多个进程同时操作一个文件,因为所有进程都访问相同的内存区域;
- 一旦内存映射建立,对于多数情况(如文件比较大时),不能直接将内存释放,限制了系统的可用内存;
- 由于内存映射文件操作基于页表,所以频繁修改内存区域的文件将产生很多的页表项更新,增加了内存管理的开销
Kafka 高效文件存储设计的特点:
- Kafka 把 Topic 中一个 Parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完的文件,减少磁盘占用;
- 通过索引信息可以快速定位 message 和确定 response 的最大大小;
- 通过 Index 元数据全部映射到 memory,可以避免 Segment File 的 I/O 磁盘操作;
- 通过索引文件稀疏存储,可以大幅降低 Index 文件元数据占用空间的大小
Kafka 从 0.10.0.0 版本起,为分片日志文件中新增了一个 .timeindex 的索引文件,可以根据时间戳定位消息。同样可以通过脚本 kafka-dump-log.sh 查看时间索引的文件内容。
- 首先定位分片,将 1570793423501 与每个分片的最大时间戳进行对比(最大时间戳取时间索引文件的最后一条记录时间,如果时间为 0 则取该日志分段的最近修改时间),直到找到大于或等于 1570793423501 的日志分段,因此会定位到时间索引文件00000000000003257573.timeindex,其最大时间戳为 1570793423505;
- 重复使用 Offset 找到 log 文件的步骤
Producer
Producer 发送消息到 Broker 时,会根据 Paritition 机制选择将其存储到哪一个 Partition。如果 Partition 机制设置合理,所有消息都可以均匀分布到不同的 Partition 里,这样就实现了负载均衡。
- 指明 Partition 的情况下,直接将给定的 Value 作为 Partition 的值;
- 没有指明 Partition 但有 Key 的情况下,将 Key 的 Hash 值与分区数取余得到 Partition 值;
- 既没有指明 Partition 也没有 Key 的情况下,第一次调用时随机生成一个整数(后面每次调用都在这个整数上自增),将这个值与可用的分区数取余,得到 Partition 值,也就是常说的 Round-Robin 轮询算法
生产者在发送消息之前需要选择消息要发送到哪个主题的哪个分区中,选择完毕后将消息添加到批量消息中。批量消息中的所有消息都属于同一主题和分区。一旦批量消息中的消息数量达到了一定阈值或等待时间到达了一定值,就会触发批量消息的发送。发送过程将由生产者的一个单独线程进行处理。在这个过程中,生产者将批量消息发送到对应的 Kafka Broker,Broker 将存储这些消息并返回记录元数据(RecordMetadata)对象作为响应。
RecordMetadata 对象包含消息的主题、分区及位移等元数据信息。如果发送成功,则 RecordMetadata 中包含的位移表示消息在对应分区中的位置。如果发送失败,则生产者通常会重试该消息,直到达到一定的发送次数或超时时间。
Kafka 集群中的每个 Partition 都会向生产者返回 ACK,以保证消息已经被正确写入。如果生产者收到 ACK,则将继续发送下一轮的消息,如果没有收到 ACK,则会进行重发。通过这种机制,Kafka 生产者可以确保消息被可靠地发送到指定的主题和分区中。
生产者发送批量消息的目的是为了提高消息发送的效率。在 Kafka 中,生产者将消息添加到批量消息中,是为了减少单个消息的网络传输开销和提高网络带宽的利用率。批量发送能够将多条消息合并为一条发送,降低频繁的网络通信带来的开销。
具体地,当生产者发送消息到 Kafka 集群中的某个 Partition 后,这些消息将被缓存在生产者的发送缓冲区中。为了提高消息发送的效率,生产者不会立刻将发送缓冲区中的单个消息发送出去,而是等待缓冲区中积累足够多的消息之后,合并成一批消息进行发送。批量消息中的每条消息都属于同一个主题和分区,通过这种方式可以减少单条消息的网络传输时间和传输开销,提高消息发送的吞吐量和效率。
需要注意的是,批量消息的发送过程是由生产者的一个独立线程进行的,这也意味着批量消息的发送和单个消息的发送可能会存在一定的延迟差异。
Producer Exactly Once
0.11 版本的 Kafka,引入了幂等性:Producer 不论向 Server 发送多少重复数据,Server 端都只会持久化一条。
要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。
开启幂等性的 Producer 在初始化时会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而 Borker 端会对(PID, Partition, SeqNumber)做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
但是 PID 重启后就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区会话的 Exactly Once。
Consumer
我们可以创建一个消费者实例去从 Kafka 中读取消息,并且进行检查,最后产生结果数据,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。
对于这种场景,我们需要增加多个消费者来进行水平扩展。
Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。
假设有一个 T1 主题,该主题有 4 个分区;同时有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息。
如果增加新的消费者 C2 到消费组 G1,那么每个消费者将会分别收到两个分区的消息。
如果增加到 4 个消费者,那么每个消费者将会分别收到一个分区的消息。这时候每个消费者都处理其中一个分区,即满负载运行。
但如果继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息。总而言之,我们可以通过增加消费组的消费者来进行水平扩展,提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。
另外,消费者的数量不应该比分区数还多,因为多出来的消费者是空闲的,没有任何帮助。
如果 C1 处理消息仍然还有瓶颈,我们如何优化和处理?
把 C1 内部的消息进行二次 sharding,开启多个 goroutine worker 进行消费,为了保障 Offset 提交的正确性,需要使用 watermark 机制,保障最小的 Offset 保存,才能往 Broker 提交。
Consumer Group
Kafka 一个很重要的特性就是,只需写入一次消息,就可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。而为了使得每个应用都能读到全量的消息,则应用需要有不同的消费组。
假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者。在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用。
总而言之,如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。
当新的消费者加入消费组时,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的。另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他消费者。这种现象称为重平衡(Rebalance)。
重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。不过在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。
消费者通过定期发送心跳(Hearbeat)到一个作为组协调者(Group Coordinator)的 Broker 来保持在消费组内存活。这个 Broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。如果消费者超过一定时间没有发送心跳,那么它的会话(Session)就会过期,组协调者就会认为该消费者已经宕机,然后触发重平衡。
从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费。
通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。
在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。
另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock),即指应用没有故障但是由于某些原因导致不能进一步消费。
但是活锁也很容易导致连锁故障,当消费端下游的组件性能退化时,消息消费会变的很慢,很容易触发 livelock 的重新均衡机制,反而影响了吞吐。
Partition 会为每个 Consumer Group 保存一个偏移量,用于记录 Group 消费到的位置。
Kafka 从 0.9 版本开始将消费端的位移信息保存在集群的内部主题(__consumer_offsets)中,该主题默认为 50 个分区,每条日志项的格式都是 「TopicPartition, OffsetAndMetadata」,key 为主题分区,主要存放主题、分区以及消费组的信息,value 为 OffsetAndMetadata 对象,主要包括位移、位移提交时间、自定义元数据等信息。
通过将消费者的偏移量信息保存在内部主题中,Kafka 可以实现偏移量的持久化和可靠性,确保消费者在分布式环境中能够正确追踪消费位置和进行故障恢复。这些偏移量信息对于消费者组的负载均衡、重新分配以及实现 Exactly-Once 语义等功能至关重要。
- 位移追踪:消费位移主题保存了每个消费者组在每个分区上的偏移量,记录了消费者组消费消息的位置,可以准确地追踪消费者的消费进度;
- 组内偏移量同步:消费位移主题充当了消费者组中各个消费者之间的协调者。消费者可以通过订阅消费位移主题来同步最新的偏移量信息,以便进行负载均衡、分组再平衡等操作;
- 恢复和恢复消费:通过存储偏移量信息,消费者能够在崩溃或重启后重新加入消费组,并继续从上一次消费的位置继续消费消息
分组协调者(Group Coordinator)是一个服务,Kafka 集群中的每个节点在启动时都会启动这样一个服务,该服务主要是用来存储消费分组相关的元数据信息,每个消费组均会选择一个协调者来负责组内各个分区的消费位移信息存储,选择的主要步骤如下:
- 确定消费组的位移信息存入哪个分区:前面提到默认的 __consumer_offsets 主题分区数为 50,通过以下算法可以计算出对应消费组的位移信息应该存入哪个分区,partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount),其中 groupId 为消费组的 id,由消费端指定,groupMetadataTopicPartitionCount 为主题分区数;
- 根据 partition 寻找该分区的 leader 所对应的节点 Broker,该 Broker 的 Coordinator 即为该消费组的 Coordinator
Consumer Commit Offset
消费端可以通过设置参数 enable.auto.commit 来控制是自动提交还是手动提交,如果值为 true 则表示自动提交,在消费端的后台会定时的提交消费位移信息,时间间隔由 auto.commit.interval.ms(默认为 5 秒):
- 可能存在重复的位移数据提交到消费位移主题中,因为不管是否有新的消费记录,每隔 5 秒就会往主题中写入一条消息,这样就会产生大量的同 key 消息,而其实只需要一条,因此需要依赖日志压缩策略来清理数据;
- 重复消费,假设位移提交的时间间隔为 5 秒,那么在 5 秒内如果发生了 Rebalance,则所有的消费者都会从上一次提交的位移处开始消费,那么期间消费的数据则会再次被消费
集中 Delivery Guarantee:
- 读完消息先 commit 再处理消息。在这种模式下,如果 Consumer 在 commit 后还没来得及处理消息,就 crash 了,那么下次重新开始工作后就无法读到刚刚已提交而未处理的消息,即 At most once;
- 读完消息先处理再 commit。在这种模式下,如果在处理完消息之后 commit 之前,Consumer 发生 crash,那么下次重新开始工作时还会处理刚刚未 commit 的消息,而实际上该消息已经被处理过了,即 At least once
在一些使用场景中,由于消息具有主键并具备幂等性,因此在处理这些消息时可以达到 Exactly once 的语义。然而,该说法被认为有些牵强,原因如下:
- 幂等性与 Exactly once 的区别:幂等性是指对于同一条消息的重复处理,结果等效于只处理一次。而 Exactly once 是指确保每条消息仅被处理一次,并且不会产生重复或丢失;
- 主键并不能完全保证操作的幂等性:虽然在一些场景中,消息具有主键,可以通过主键去判断并保证幂等性。但是主键并不能完全解决所有幂等性的问题,因为处理操作的幂等性还取决于具体的业务逻辑和操作细节;
- Delivery guarantee 关注的是消息被处理的次数:Exactly once 语义通常是指对于每条消息,保证它被处理的次数正确。而不同的处理方式可以有不同的结果,因此我们不应该将处理过程的特性(如幂等性)看作 Kafka 的特性;
- 因此,尽管在某些情况下,通过消息的主键和幂等性可以部分满足 Exactly once 的要求,但这并不代表 Kafka 本身提供了 Exactly once 的机制。实际上,在保证消息处理的幂等性方面,仍然需要结合具体的业务逻辑和设计来确保消息处理的正确性和可靠性
Push vs Pull
作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 Broker push 消息,并由 Consumer 从 Broker pull 消息。一些 logging-centric system(指将日志作为系统设计和运维的核心组成部分的系统),比如 Facebook 的 Scribe 和 Cloudera 的 Flume,采用 push 模式。
事实上,push 模式 和 pull 模式各有优劣:
- push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 Broker 决定的。push 模式的目标是尽可能地以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,最典型的表现就是拒绝服务以及网络拥塞;
- 而 pull 模式则可以根据 Consumer 的消费能力以适当的速率来消费消息
对于 Kafka 而言,pull 模式更合适。pull 模式可简化 Broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式,即可批量消费也可逐条消费,同时还能选择不同的提交方式,从而实现不同的传输语义。
而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。pull 模式不足之处是,如果 Kafka 中没有数据,消费者可能会陷入循环中,一直返回空数据。
因为消费者从 Broker 主动拉取数据,需要维护一个长轮询,针对这一点, Kafka 的消费者在消费数据时会传入一个时长参数 timeout。如果当前没有数据可供消费,Consumer 会等待一段时间之后再返回,这段时长即为 timeout。
总而言之,Kafka 是采用了 pull 模式来满足消费者主动拉取消息的需求,而生产者则是使用 push 模式将消息推送到 Kafka。
Replication
Kafka 在 0.8 以前的版本中,并不提供 HA 机制(指系统具备高可用性和容错能力的一种设计和实现方式,旨在保证系统在面对单点故障或异常情况时仍能持续提供服务),一旦一个或多个 Broker 宕机,则宕机期间其上所有的 Partition 都将无法继续提供服务。若该 Broker 永远不能再恢复,亦或磁盘故障,则其上的数据将丢失。
在 Kafka 0.8 以前的版本中,是没有 Replication 的,一旦某一个 Broker 宕机,则其上所有的 Partition 数据都将不可被消费,这与 Kafka 的数据持久性及 Delivery Guarantee 的设计目标相悖。同时 Producer 都不能再将数据存于这些 Partition 中:
- 如果 Producer 使用同步模式,则 Producer 会在尝试重新发送 message.send.max.retries(默认值为 3)次后抛出 Exception,用户可以选择停止发送后续数据,也可选择继续发送。前者会造成数据的阻塞,后者会造成本应发往该 Broker 的数据丢失;
- 如果 Producer 使用异步模式,则 Producer 会尝试重新发送 message.send.max.retries(默认值为 3)次后记录该异常,并继续发送后续数据,这会造成数据丢失,并且只能通过日志发现该问题
由此可见,在没有 Replication 的情况下,一旦某机器宕机或者某个 Broker 停止工作,则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言,Replication 机制的引入非常重要。
Leader
引入 Replication 之后,同一个 Partition 可能会有多个 Replica,这时需要在这些 Replica 之间选出一个 Leader,Producer 和 Consumer 只与这个 Leader 进行交互,其它的 Replica 作为 Follower 从 Leader 中复制数据。
因为需要保证同一个 Partition 中的多个 Replica 之间的数据一致性(其中一个宕机后其它的 Replica 必须要能继续服务,并且既不能造成数据重复,也不能造成数据丢失)。
如果没有一个 Leader,所有的 Replica 都可同时读/写数据,那就需要保证多个 Replica 之间互相(N×N 条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了 Replication 实现的复杂性,同时也增加了出现异常的几率。而引入 Leader 后,只有 Leader 负责数据的读写,Follower 只向 Leader 顺序 Fetch 数据(N 条通路),使得系统更加简单且高效。
由于 Kafka 集群目前依赖于 Zookeeper 集群,所以最简单最直观的方案是,所有的 Follower 都在 ZooKeeper 上设置一个 Watch,一旦 Leader 宕机,则其对应的 ephemeral znode 会自动删除,此时所有的 Follower 都尝试创建该节点,而创建成功者(ZooKeeper 保证了只有一个能创建成功)即是新的 Leader,其他的 Replica 即为 Follower。
但这种方案可能存在以下缺点:
- split-brain(脑裂):这是由 ZooKeeper 的特性所引起的,虽然 ZooKeeper 能保证所有的 Watch 按顺序触发,但并不能保证同一时刻所有的 Replica "看到"的状态是一样的,这就可能造成不同 Replica 的响应不一致 ;
- herd effect(羊群效应):如果宕机的那个 Broker 上的 Partition 比较多,则会造成多个 Watch 被触发,造成集群内大量的调整;
- ZooKeeper 负载过重:每个 Replica 都要为此在 ZooKeeper 上注册一个 Watch,而当集群规模增加到几千个 Partition 时,ZooKeeper 的负载会过重
Controller
Kafka 的 Leader Election 方案解决了上述问题,它在所有的 Broker 中选出一个 Controller,所有 Partition 的 Leader 选举都是由 Controller 来决定的。Controller 会将 Leader 的改变直接通过 RPC 的方式(比 ZooKeeper Queue 的方式更高效)通知需要为此作为响应的 Broker(指通知其他 Broker,告知它们有关 Partition Leader 的变化,并需要它们做出相应的响应)。其他 Broker 收到通知后,会根据新的 Leader 信息做出相应的调整。这可能包括更新本地的元数据(比如更新自身负责的 Partition 的 Leader 信息),重新分配相关资源等。通过这种方式,Kafka 能够快速地在集群中进行 Leader 选举,并将新的 Leader 信息迅速传播给其他 Broker,以保证系统的可靠性和高可用性。
Kafka 集群 Controller 的选举过程如下:
- 每个 Broker(Kafka 服务器节点)在 ZooKeeper 中的 /controller 路径上注册一个 Watch。此 Watch 用于监视 Controller 的变化;
- 当前的 Controller(即 Leader)出现故障或意外退出时,对应的 /controller 路径会失去数据,这是因为该路径是一个临时节点(ephemeral node),它与创建它的 Broker 的会话相关联。一旦会话终止,该临时节点将被自动删除;
- 当 ZooKeeper 上的 /controller 路径消失时,所有"活着"的 Broker 都会参与竞选新的 Controller。每个 Broker 都会创建一个新的 Controller Path,即在 /controller 路径上创建一个新的临时节点;
- ZooKeeper 保证了竞选过程的原子性和独占性,只会有一个 Broker 在竞选中成功,成为新的 Controller,而其他 Broker 则会竞选失败;
- 竞选成功的 Broker 成为新的 Controller,负责管理与集群相关的操作,如 Partition 的 Leader 选举、自动创建和删除 Topic 等。它还负责与其他 Broker 进行协调,以确保集群的正常运行;
- 竞选失败的 Broker 在以上过程完成后,会重新注册 Watch 监视新的 /controller 路径,等待下一次 Controller(Leader)故障和选举
Kafka Partition Leader 的选举过程如下(由 Controller 执行):
- 从 Zookeeper 中读取当前分区的所有 ISR(in-sync replicas)集合;
- 调用配置的分区选择算法选择分区的 Leader
Partition 分布
Kafka 集群中的 Partition(分区)复制(Replication)默认情况下是自动分配的。在 Kafka 集群中,每个 Broker 都有均等分配 Partition 的 Leader 机会。
创建 1 个 Topic,包含 4 个 Partition,2 个 Replication:
上图中,箭头指向为副本,以 Partition-0 为例,Broker1 中 Parition-0 为 Leader,Broker2 中 Partition-0 为副本。
当集群中新增 2 个节点,Partition 增加到 6 个:
上图中,每个 Broker(按照 BrokerId 有序)依次分配主 Partition,下一个 Broker 则为副本,如此循环迭代分配,多副本都遵循此规则。
副本分配算法如下:
- 将所有 n 个 Broker 和待分配的 i 个 Partition 进行排序;
- 将第 i 个 Partition 分配到第(i mod n)个 Broker 上;
- 将第 i 个 Partition 的第 j 个副本分配到第((i + j) mod n)个 Broker 上
Leader
和大部分的分布式系统一样,Kafka 处理失败需要明确定义一个 Broker 是否"活着"。对于 Kafka 而言,存活包含两个条件:
- 副本所在节点需要与 ZooKeeper 维持 Session(这个通过 ZK 的 Heartbeat 机制来实现);
- 从副本的最后一条消息的 Offset 需要和主副本的最后一条消息的 Offset 的差值不超过设定阈值(replica.lag.max.messages),或者从副本的 LEO 落后于主副本的 LEO 的时长不大于设定阈值(replica.lag.time.max.ms)。官方推荐使用后者判断,并在 Kafka 0.10.0 移除了replica.lag.max.messages 参数
Leader 会跟踪与其保持同步的 Replica 列表,该列表称为 ISR(即 In-Sync Replica)。如果一个 Follower 宕机,或者落后太多,则 Leader 将会把它从 ISR 中移除,当其再次满足以上条件之后,则又会被重新加入集合中。
ISR 的引入主要是为了解决同步复制和异步复制这两种方案各自的缺陷:
- 在同步复制中,当 Producer 向 Kafka 集群发送消息时,要求至少有一个 Replica(副本)确认已经成功写入消息,然后返回一个确认给 Producer。这种确认确保了数据的可靠性,因为只有在至少一个副本写入成功后才会返回确认。如果无法满足至少一个副本写入成功的条件,Producer 将会收到错误的响应。如果有从副本宕机或者超时,就会拖慢该副本组的整体性能;
- 在异步复制中,当 Producer 向 Kafka 集群发送消息时,不需要等待副本的确认,而是立即返回一个确认给 Producer。这样可以提高 Producer 的吞吐量,因为它不需要等待确认。但是,这种方式存在一定的风险,因为如果副本在尚未写入消息时出现故障,数据可能会丢失
在 Kafka 中,默认情况下使用异步复制的方式,用以提高性能和吞吐量。但是,如果数据的可靠性是更重要的因素,那么可以选择使用同步复制。可以通过配置 Producer 的属性来控制复制方式的选择。
如上所述,无论是同步复制还是异步复制,Kafka 都会为每个 Partition 维护一个 In-Sync Replica(ISR)的集合,这是一组已经追上了 Leader(领导者)的副本。只有 ISR 中的副本才会参与消息的读写操作。当副本无法追上 Leader 或者发生故障时,会被移出 ISR,待恢复后再次加入 ISR。这样可以保证数据的一致性和可用性。
Replicated log
分布式日志系统,主要保证:
- commit log 不会丢失;
- commit log 在不同机器上是一致的
基于主从复制的 Replicated log 实现:
- raft:基于多数节点的 ack,节点一般称为 leader/follower;
- pacificA:基于所有节点的 ack,节点一般称为 primary/secondary;
- bookkeeper:基于法定个数节点的 ack,节点一般称为 writer/bookie
Kafka 在 Zookeeper 中动态维护了一个 ISR,ISR 里的所有 Replica 都是已经跟上了 Leader,只有 ISR 里的成员才有被选为 Leader 的可能。在这种模式下,对于 f + 1 个 Replica 而言,一个 Partition 能在保证不丢失已经 commit 的消息的前提下,容忍 f 个 Replica 的失败。在大多数使用场景中,这种模式是非常有利的。需要注意的是,为了容忍 f 个 Replica 的失败,Majority Vote 和 ISR 在提交(commit)之前都需要等待至少 f + 1 个 Replica 的确认。这是为了确保数据的可靠性和一致性。但是,ISR 所需的总 Replica 的个数几乎是 Majority Vote 的一半,因为它只包含已经跟上 Leader 的 Replica,而其他的 Replica 则在追赶过程中或者有可能落后。
而对于 Producer 而言,它可以选择是否等待消息 commit,这可以通过 request.required.acks 来设置:
- 0:Producer 发送消息后,不需要等待任何确认,直接返回;
- 1:Producer 发送消息后,等待 Leader 确认接收成功,然后返回;
- all(或 -1):Producer 发送消息后,等待 ISR 中的所有 Replica 确认接收成功后,才返回
当设置为 all(或 -1)时,这种机制确保了只要 ISR 中有一个或以上的 Follower,一条被 commit 的消息就不会丢失。因为只有被 ISR 中的所有 Replica 确认接收成功后,消息才会被标记为已提交(committed),即使 Leader 在此期间发生故障,在新的 Leader 选举完成后,仍然可以保证消息的可靠性。
通过设置不同的 request.required.acks 参数值,Producer 可以根据不同的需求和对消息可靠性的要求来平衡吞吐量和数据的一致性。较大的等待确认级别可能会导致较高的延迟,但能提供更高的可靠性保证。而较小的等待确认级别则可以提供更低的延迟,但可能会增加消息丢失的风险。
High Watermark & Log End Offset
初始时 Leader 和 Follower 的 HW 和 LEO 都是 0。Leader 中的 remote LEO 指的就是 Leader 端保存的 Follower LEO,也被初始化成 0。此时,Producer 没有发送任何消息给 Leader,而 Follower 已经开始不断地给 Leader 发送 FETCH 请求了,但因为没有任何数据,因此什么都不会发生。值得一提的是,Follower 发送过来的 FETCH 请求因为没有数据,而会暂时被寄存到 Leader 端的 purgatory 中,等待 500ms(replica.fetch.wait.max.ms 参数)超时后会强制完成。倘若在寄存期间 Producer 端发送过来了数据,那么 Kafka 会自动唤醒该 FETCH 请求,让 Leader 继续处理。
High Watermark(高水位标记)和 Log End Offset(日志结束位置)是 Kafka 中重要的概念,用于跟踪消息的复制和消费状态。
- High Watermark(HW):High Watermark 是每个 Partition 中的一个重要标记,表示已经被认为是"已复制"和"可安全消费"的最高消息偏移量(Offset)。在 Kafka 中,Producer 发送消息到 Leader,Leader 将这些消息写入日志并进行复制。当所有 Replica 都将消息复制到其本地日志中,并且其偏移量等于或大于 High Watermark 时,此消息被认为是"已复制"的。High Watermark 表示了可以安全从该 Partition 进行消费的偏移量;
- Log End Offset(LEO):Log End Offset 是指 Partition 中当前日志的最高偏移量。它表示了当前日志中最新的消息的偏移量。Producer 生产消息时,消息被追加到 Partition 的日志中,并分配一个唯一的偏移量。每次写入新的消息,Log End Offset 都会增加。消费者可以通过跟踪 Log End Offset,了解最新可消费的消息
High Watermark 和 Log End Offset 的关系如下:文章来源:https://uudwc.com/A/jAPPY
- 对于 Leader 来说,High Watermark 是其所有 Follower 中最小的 Log End Offset,并且 Leader 只有在所有 Follower 都复制了该偏移量之后,才能将 High Watermark 推进到新的值;
- 对于 Follower 来说,High Watermark 不等于自身的 Log End Offset,而是表示 Leader 的 High Watermark 的位置。Follower 的 Log End Offset 可能会落后于 Leader 的 High Watermark,这是正常的复制机制
High Watermark 的存在保证了消息的可靠性和一致性。只有当消息被所有的 Replica 都复制并达到 High Watermark 之后,Kafka 才确保消息不会丢失或被暂时消费。
消费者可以以 High Watermark 作为消费的起点,确保消费的消息是可靠的和一致的。消费者可以根据 High Watermark 将已经被确认为"已复制"的消息进行消费,而不用担心未被复制的消息可能会丢失。文章来源地址https://uudwc.com/A/jAPPY