1、Kafka 的设计

Kafka 将消息以 topic 为单位进行归纳,发布消息的程序称为 Producer,消费消息的程序称为 Consumer。它是以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个 Broker,Producer 通过网络将消息发送到 kafka 集群,集群向消费者提供消息,broker 在中间起到一个代理保存消息的中转站。

Kafka 中重要的组件

1Producer:消息生产者,发布消息到Kafka集群的终端或服务

2Broker:一个 Kafka 节点就是一个 Broker,多个Broker可组成一个Kafka 集群。

如果某个 Topic 下有 n 个Partition 且集群有 n 个Broker,那么每个 Broker会存储该 Topic 下的一个 Partition

如果某个 Topic 下有 n 个Partition 且集群中有 m+n 个Broker,那么只有 n 个Broker会存储该Topic下的一个 Partition

如果某个 Topic 下有 n 个Partition 且集群中的Broker数量小于 n,那么一个 Broker 会存储该 Topic 下的一个或多个 Partition,这种情况尽量避免,会导致集群数据不均衡

3Topic:消息主题,每条发布到Kafka集群的消息都会归集于此,Kafka是面向Topic 的

4Partition:Partition 是Topic在物理上的分区,一个Topic可以分为多个Partition,每个Partition是一个有序的不可变的记录序列。单一主题中的分区有序,但无法保证主题中所有分区的消息有序。

5Consumer:从Kafka集群中消费消息的终端或服务

6Consumer Group:每个Consumer都属于一个Consumer Group,每条消息只能被Consumer Group中的一个Consumer消费,但可以被多个Consumer Group消费。

7Replica:Partition 的副本,用来保障Partition的高可用性。

8Controller**: Kafka 集群中的其中一个服务器,用来进行Leader election以及各种 Failover 操作。

9Zookeeper:Kafka 通过Zookeeper来存储集群中的 meta 消息

2、Kafka 性能高原因

  1. 利用了 PageCache 缓存
  2. 磁盘顺序写
  3. 零拷贝技术
  4. pull 拉模式

3、Kafka 文件高效存储设计原理

  1. Kafka把Topic中一个Partition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完成的文件,减少磁盘占用
  2. 通过索引信息可以快速定位Message和确定response的最大大小
  3. 通过将索引元数据全部映射到 memory,可以避免 Segment 文件的磁盘I/O操作
  4. 通过索引文件稀疏存储,可以大幅降低索引文件元数据占用空间大小

4、Kafka 的优缺点

优点

  • 高性能、高吞吐量、低延迟:Kafka 生产和消费消息的速度都达到每秒10万级
  • 高可用:所有消息持久化存储到磁盘,并支持数据备份防止数据丢失
  • 高并发:支持数千个客户端同时读写
  • 容错性:允许集群中节点失败(若副本数量为n,则允许 n-1 个节点失败)
  • 高扩展性:Kafka 集群支持热伸缩,无须停机

缺点

  • 没有完整的监控工具集
  • 不支持通配符主题选择

5、Kafka 的应用场景

  1. 日志聚合:可收集各种服务的日志写入kafka的消息队列进行存储
  2. 消息系统:广泛用于消息中间件
  3. 系统解耦:在重要操作完成后,发送消息,由别的服务系统来完成其他操作
  4. 流量削峰:一般用于秒杀或抢购活动中,来缓冲网站短时间内高流量带来的压力
  5. 异步处理:通过异步处理机制,可以把一个消息放入队列中,但不立即处理它,在需要的时候再进行处理

6、Kafka 中分区的概念

主题是一个逻辑上的概念,还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看做一个可追加的日志文件 ,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,kafka保证的是分区有序而不是主题有序。

在分区中又引入了多副本(replica)的概念,通过增加副本数量可以提高容灾能力。同一分区的不同副本中保存的是相同的消息。副本之间是一主多从的关系,其中主副本负责读写,从副本只负责消息同步。副本处于不同的 broker 中,当主副本出现异常,便会在从副本中提升一个为主副本。

7、Kafka 中分区的原则

  1. 指明Partition的情况下,直接将指明的值作为Partition值
  2. 没有指明Partition值但有 key 的情况下,将 key 的 Hash 值与 topic 的Partition值进行取余得到Partition值
  3. 既没有Partition值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与Topic可用的Partition总数取余得到Parittion值,也就是常说的 round-robin 算法

8、Kafka 为什么要把消息分区

  1. 方便在集群中扩展,每个 Partition 可用通过调整以适应它所在的机器,而一个Topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了
  2. 可以提高并发,因为可以以Partition为单位进行读写

9、Kafka 中生产者运行流程

  1. 一条消息发过来首先会被封装成一个 ProducerRecord 对象
  2. 对该对象进行序列化处理(可以使用默认,也可以自定义序列化)
  3. 对消息进行分区处理,分区的时候需要获取集群的元数据,决定这个消息会被发送到哪个主题的哪个分区
  4. 分好区的消息不会直接发送到服务端,而是放入生产者的缓存区,多条消息会被封装成一个批次(Batch),默认一个批次的大小是 16KB
  5. Sender 线程启动以后会从缓存里面去获取可以发送的批次
  6. Sender 线程把一个一个批次发送到服务端

10、Kafka 中的消息封装

在Kafka 中 Producer 可以 Batch的方式推送数据达到提高效率的作用。Kafka Producer 可以将消息在内存中累积到一定数量后作为一个 Batch 发送请求。Batch 的数量大小可以通过 Producer 的参数进行控制,可以从三个维度进行控制

  • 累计的消息的数量(如500条)
  • 累计的时间间隔(如100ms)
  • 累计的数据大小(如64KB)

通过增加 Batch 的大小,可以减少网络请求和磁盘I/O的频次,具体参数配置需要在效率和时效性做一个权衡。

11、Kafka 消息的消费模式

Kafka采用大部分消息系统遵循的传统模式:Producer将消息推送到Broker,Consumer从Broker获取消息。

如果采用 Push 模式,则Consumer难以处理不同速率的上游推送消息。

采用 Pull 模式的好处是Consumer可以自主决定是否批量的从Broker拉取数据。Pull模式有个缺点是,如果Broker没有可供消费的消息,将导致Consumer不断在循环中轮询,直到新消息到达。为了避免这点,Kafka有个参数可以让Consumer阻塞直到新消息到达。

12、Kafka 如何实现负载均衡与故障转移

负载均衡是指让系统的负载根据一定的规则均衡地分配在所有参与工作的服务器上,从而最大限度保证系统整体运行效率与稳定性

负载均衡

Kakfa 的负载均衡就是每个 Broker 都有均等的机会为 Kafka 的客户端(生产者与消费者)提供服务,可以负载分散到所有集群中的机器上。Kafka 通过智能化的分区领导者选举来实现负载均衡,提供智能化的 Leader 选举算法,可在集群的所有机器上均匀分散各个Partition的Leader,从而整体上实现负载均衡。

故障转移

Kafka 的故障转移是通过使用会话机制实现的,每台 Kafka 服务器启动后会以会话的形式把自己注册到 Zookeeper 服务器上。一旦服务器运转出现问题,就会导致与Zookeeper 的会话不能维持从而超时断连,此时Kafka集群会选举出另一台服务器来完全替代这台服务器继续提供服务。

13、Kafka 中 Zookeeper 的作用

Kafka 是一个使用 Zookeeper 构建的分布式系统。Kafka 的各 Broker 在启动时都要在Zookeeper上注册,由Zookeeper统一协调管理。如果任何节点失败,可通过Zookeeper从先前提交的偏移量中恢复,因为它会做周期性提交偏移量工作。同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也是Zookeeper在维护。

14、Kafka 提供了哪些系统工具

  • Kafka 迁移工具:它有助于将代理从一个版本迁移到另一个版本
  • Mirror Maker:Mirror Maker 工具有助于将一个 Kafka 集群的镜像提供给另一个
  • 消费者检查:对于指定的主题集和消费者组,可显示主题、分区、所有者

15、Kafka 中消费者与消费者组的关系与负载均衡实现

Consumer Group 是Kafka独有的可扩展且具有容错性的消费者机制。一个组内可以有多个Consumer,它们共享一个全局唯一的Group ID。组内的所有Consumer协调在一起来消费订阅主题(Topic)内的所有分区(Partition)。当然,每个Partition只能由同一个Consumer Group内的一个Consumer 来消费。消费组内的消费者可以使用多线程的方式实现,消费者的数量通常不超过分区的数量,且二者最好保持整数倍的关系,这样不会造成有空闲的消费者。

Consumer 订阅的是Topic的Partition,而不是Message。所以在同一时间点上,订阅到同一个分区的Consumer必然属于不同的Consumer Group

Consumer Group与Consumer的关系是动态维护的,当一个Consumer进程挂掉或者是卡住时,该Consumer所订阅的Partition会被重新分配到改组内的其他Consumer上,当一个Consumer加入到一个Consumer Group中时,同样会从其他的Consumer中分配出一个或者多个Partition到这个新加入的Consumer。

负载均衡

当启动一个Consumer时,会指定它要加入的Group,使用的配置项是:Group.idopen in new window

为了维持Consumer与Consumer Group之间的关系,Consumer 会周期性地发送 hearbeat 到 coodinator(协调者),如果有 hearbeat 超时或未收到 hearbeat,coordinator 会认为该Consumer已经退出,那么它所订阅的Partition会分配到同一组内的其他Consumer上,这个过程称为 rebalance(再平衡)

16、Kafka 中消息偏移的作用

生产过程中给分区中的消息提供一个顺序ID号,称之为偏移量,偏移量的主要作用为了唯一地区别分区中的每条消息。Kafka的存储文件都是按照offset.kafka来命名

17、 生产过程中何时会发生QueueFullExpection以及如何处理

何时发生

当生产者试图发送消息的速度快于Broker可以处理的速度时,通常会发生 QueueFullException

如何解决

首先先进行判断生产者是否能够降低生产速率,如果生产者不能阻止这种情况,为了处理增加的负载,用户需要添加足够的 Broker。或者选择生产阻塞,设置Queue.enQueueTimeout.ms 为 -1,通过这样处理,如果队列已满的情况,生产者将组织而不是删除消息。或者容忍这种异常,进行消息丢弃。

18、Consumer 如何消费指定分区消息

Cosumer 消费消息时,想Broker 发出 fetch 请求去消费特定分区的消息,Consumer 可以通过指定消息在日志中的偏移量 offset,就可以从这个位置开始消息消息,Consumer 拥有了 offset 的控制权,也可以向后回滚去重新消费之前的消息。

也可以使用 seek(Long topicPartition) 来指定消费的位置。

19、Replica、Leader 和 Follower 三者的概念

Kafka 中的 Partition 是有序消息日志,为了实现高可用性,需要采用备份机制,将相同的数据复制到多个Broker上,而这些备份日志就是 Replica,目的是为了 防止数据丢失

所有Partition 的副本默认情况下都会均匀地分布到所有 Broker 上,一旦领导者副本所在的Broker宕机,Kafka 会从追随者副本中选举出新的领导者继续提供服务。

Leader**:** 副本中的领导者。负责对外提供服务,与客户端进行交互。生产者总是向 Leader副本些消息,消费者总是从 Leader 读消息

Follower**:** 副本中的追随者。被动地追随 Leader,不能与外界进行交付。只是向Leader发送消息,请求Leader把最新生产的消息发给它,进而保持同步。

20、Replica 的重要性

Replica 可以确保发布的消息不会丢失,保证了Kafka的高可用性。并且可以在发生任何机器错误、程序错误或软件升级、扩容时都能生产使用。

21、Kafka 中的 Geo-Replication 是什么

Kafka官方提供了MirrorMaker组件,作为跨集群的流数据同步方案。借助MirrorMaker,消息可以跨多个数据中心或云区域进行复制。您可以在主动/被动场景中将其用于备份和恢复,或者在主动/主动方案中将数据放置得更靠近用户,或支持数据本地化要求。

它的实现原理比较简单,就是通过从源集群消费消息,然后将消息生产到目标集群,即普通的消息生产和消费。用户只要通过简单的Consumer配置和Producer配置,然后启动Mirror,就可以实现集群之间的准实时的数据同步.

22、Kafka 中 AR、ISR、OSR 三者的概念

  • AR:分区中所有副本称为 AR
  • ISR:所有与主副本保持一定程度同步的副本(包括主副本)称为 ISR
  • OSR:与主副本滞后过多的副本组成 OSR

23、分区副本什么情况下会从 ISR 中剔出

Leader 会维护一个与自己基本保持同步的Replica列表,该列表称为ISR,每个Partition都会有一个ISR,而且是由Leader动态维护。所谓动态维护,就是说如果一个Follower比一个Leader落后太多,或者超过一定时间未发起数据复制请求,则Leader将其从ISR中移除。当ISR中所有Replica都向Leader发送ACK(Acknowledgement确认)时,Leader才commit。

24、分区副本中的 Leader 如果宕机但 ISR 却为空该如何处理

可以通过配置unclean.leader.election :

  • true:允许 OSR 成为 Leader,但是 OSR 的消息较为滞后,可能会出现消息不一致的问题
  • false:会一直等待旧 leader 恢复正常,降低了可用性

25、如何判断一个 Broker 是否还有效

  1. Broker必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个结点的连接。
  2. 如果Broker是个Follower,它必须能及时同步Leader的写操作,延时不能太久。

26、Kafka 可接收的消息最大默认多少字节,如何修改

Kafka可以接收的最大消息默认为1000000字节,如果想调整它的大小,可在Broker中修改配置参数:Message.max.bytes的值

但要注意的是,修改这个值,还要同时注意其他对应的参数值是正确的,否则就可能引发一些系统异常。首先这个值要比消费端的fetch.Message.max.bytes(默认值1MB,表示消费者能读取的最大消息的字节数)参数值要小才是正确的设置,否则Broker就会因为消费端无法使用这个消息而挂起。

27、Kafka 的 ACK 机制

Kafka的Producer有三种ack机制,参数值有0、1 和 -1

  • 0**:** 相当于异步操作,Producer 不需要Leader给予回复,发送完就认为成功,继续发送下一条(批)Message。此机制具有最低延迟,但是持久性可靠性也最差,当服务器发生故障时,很可能发生数据丢失。
  • 1**:** Kafka 默认的设置。表示 Producer 要 Leader 确认已成功接收数据才发送下一条(批)Message。不过 Leader 宕机,Follower 尚未复制的情况下,数据就会丢失。此机制提供了较好的持久性和较低的延迟性。
  • -1**:** Leader 接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都确认消息已同步,Producer 才发送下一条(批)Message。此机制持久性可靠性最好,但延时性最差。

28、Kafka 的 consumer 如何消费数据

在Kafka中,Producers将消息推送给Broker端,在Consumer和Broker建立连接之后,会主动去 Pull(或者说Fetch)消息。这种模式有些优点,首先Consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以控制每次消费的数,实现批量消费。

29、Kafka 提供的API有哪些

Kafka 提供了两套 Consumer API,分为 High-level APISample API

Sample API

这是一个底层API,它维持了一个与单一 Broker 的连接,并且这个API 是完全无状态的,每次请求都需要指定 offset 值,因此这套 API 也是最灵活的。

High-level API

该API封装了对集群中一系列Broker的访问,可以透明地消费下一个Topic,它自己维护了已消费消息的状态,即每次消费的都会下一个消息。High-level API 还支持以组的形式消费Topic,如果 Consumers 有同一个组名,那么Kafka就相当于一个队列消息服务,而各个 Consumer 均衡地消费相应Partition中的数据。若Consumers有不同的组名,那么此时Kafka就相当于一个广播服务,会把Topic中的所有消息广播到每个Consumer

30、Kafka 的Topic中 Partition 数据是怎么存储到磁盘的

Topic 中的多个 Partition 以文件夹的形式保存到 Broker,每个分区序号从0递增,且消息有序。Partition 文件下有多个Segment(xxx.index,xxx.log),Segment文件里的大小和配置文件大小一致。默认为1GB,但可以根据实际需要修改。如果大小大于1GB时,会滚动一个新的Segment并且以上一个Segment最后一条消息的偏移量命名。

31、Kafka 创建Topic后如何将分区放置到不同的 Broker 中

Kafka创建Topic将分区放置到不同的Broker时遵循以下规则:

  1. 副本因子不能大于Broker的个数。
  2. 第一个分区(编号为0)的第一个副本放置位置是随机从Broker List中选择的。
  3. 其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果有3个Broker,3个分区,假设第一个分区放在第二个Broker上,那么第二个分区将会放在第三个Broker上;第三个分区将会放在第一个Broker上,更多Broker与更多分区依此类推。剩余的副本相对于第一个副本放置位置其实是由nextReplicaShift决定的,而这个数也是随机产生的。

32、Kafka 的日志保留期与数据清理策略

概念

保留期内保留了Kafka群集中的所有已发布消息,超过保期的数据将被按清理策略进行清理。默认保留时间是7天,如果想修改时间,在server.properties里更改参数log.retention.hours/minutes/ms 的值便可。

清理策略

  • 删除: log.cleanup.policy=delete 表示启用删除策略,这也是默认策略。一开始只是标记为delete,文件无法被索引。只有过了log.Segment.delete.delay.ms这个参数设置的时间,才会真正被删除。
  • 压缩: log.cleanup.policy=compact 表示启用压缩策略,将数据压缩,只保留每个Key最后一个版本的数据。首先在Broker的配置中设置log.cleaner.enable=true 启用 cleaner,这个默认是关闭的。

33、Kafka 日志存储的Message是什么格式

Kafka一个Message由固定长度的**header一个变长的消息体**body组成。将Message存储在日志时采用不同于Producer发送的消息格式。每个日志文件都是一个log entries(日志项)序列:

  1. 每一个log entry包含一个四字节整型数(Message长度,值为1+4+N)。
  2. 1个字节的magic,magic表示本次发布Kafka服务程序协议版本号。
  3. 4个字节的CRC32值,CRC32用于校验Message。
  4. 最终是N个字节的消息数据。每条消息都有一个当前Partition下唯一的64位offset。

Kafka没有限定单个消息的大小,但一般推荐消息大小不要超过1MB,通常一般消息大小都在1~10KB之间。

34、Kafka 是否支持多租户隔离

多租户技术(multi-tenancy technology)是一种软件架构技术,它是实现如何在多用户的环境下共用相同的系统或程序组件,并且仍可确保各用户间数据的隔离性。

解决方案

通过配置哪个主题可以生产或消费数据来启用多租户,也有对配额的操作支持。管理员可以对请求定义和强制配额,以控制客户端使用的Broker资源。

35、Kafka 的日志分段策略与刷新策略

日志分段(**Segment**)策略

  1. log.roll.hours/ms:日志滚动的周期时间,到达指定周期时间时,强制生成一个新的Segment,默认值168h(7day)。
  2. log.Segment.bytes:每个Segment的最大容量。到达指定容量时,将强制生成一个新的Segment。默认值1GB(-1代表不限制)。
  3. log.retention.check.interval.ms:日志片段文件检查的周期时间。默认值60000ms。

日志刷新策略

Kafka的日志实际上是开始是在缓存中的,然后根据实际参数配置的策略定期一批一批写入到日志文件中,以提高吞吐量。

  1. log.flush.interval.Messages:消息达到多少条时将数据写入到日志文件。默认值为10000。
  2. log.flush.interval.ms:当达到该时间时,强制执行一次flush。默认值为null。
  3. log.flush.scheduler.interval.ms:周期性检查,是否需要将信息flush。默认为很大的值。

36、Kafka 中如何进行主从同步

Kafka动态维护了一个同步状态的副本的集合(a set of In-SyncReplicas),简称ISR,在这个集合中的结点都是和Leader保持高度一致的,任何一条消息只有被这个集合中的每个结点读取并追加到日志中,才会向外部通知“这个消息已经被提交”。

kafka 通过配置 producer.type 来确定是异步还是同步,默认是同步

同步复制

Producer 会先通过Zookeeper识别到Leader,然后向 Leader 发送消息,Leader 收到消息后写入到本地 log文件。这个时候Follower 再向 Leader Pull 消息,Pull 回来的消息会写入的本地 log 中,写入完成后会向 Leader 发送 Ack 回执,等到 Leader 收到所有 Follower 的回执之后,才会向 Producer 回传 Ack。

异步复制

Kafka 中 Producer 异步发送消息是基于同步发送消息的接口来实现的,异步发送消息的实现很简单,客户端消息发送过来以后,会先放入一个 BlackingQueue 队列中然后就返回了。Producer 再开启一个线程 ProducerSendTread 不断从队列中取出消息,然后调用同步发送消息的接口将消息发送给 Broker。

Producer的这种在内存缓存消息,当累计达到阀值时批量发送请求,小数据I/O太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。但是如果在达到阀值前,Producer不可用了,缓存的数据将会丢失。

37、Kafka 中什么情况下会出现消息丢失/不一致的问题

消息发送时

消息发送有两种方式:同步 - sync 和 异步 - async。默认是同步的方式,可以通过 producer.type 属性进行配置,kafka 也可以通过配置 acks 属性来确认消息的生产

  • 0:表示不进行消息接收是否成功的确认
  • 1:表示当 leader 接收成功时的确认
  • -1:表示 leader 和 follower 都接收成功的确认

当 acks = 0 时,不和 Kafka 进行消息接收确认,可能会因为网络异常,缓冲区满的问题,导致消息丢失

当 acks = 1 时,只有 leader 同步成功而 follower 尚未完成同步,如果 leader 挂了,就会造成数据丢失

消息消费时

Kafka 有两个消息消费的 consumer 接口,分别是 low-level 和 hign-level

  1. low-level:消费者自己维护 offset 等值,可以实现对 kafka 的完全控制
  2. high-level:封装了对 partition 和 offset,使用简单

如果使用高级接口,可能存在一个消费者提取了一个消息后便提交了 offset,那么还没来得及消费就已经挂了,下次消费时的数据就是 offset + 1 的位置,那么原先 offset 的数据就丢失了。

38、Kafka 作为流处理平台的特点

流处理就是连续、实时、并发和以逐条记录的方式处理数据的意思。Kafka 是一个分布式流处理平台,它的高吞吐量、低延时、高可靠性、容错性、高可扩展性都使得Kafka非常适合作为流式平台。

  1. 它是一个简单的、轻量级的Java类库,能够被集成到任何Java应用中
  2. 除了Kafka之外没有任何其他的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性
  3. 支持本地状态容错,可以执行非常快速有效的有状态操作
  4. 支持 eexactly-once 语义
  5. 支持一次处理一条记录,实现 ms 级的延迟

39、消费者故障,出现活锁问题如何解决

活锁的概念:消费者持续的维持心跳,但没有进行消息处理。

为了预防消费者在这种情况一直持有分区,通常会利用 max.poll.interval.ms活跃检测机制,如果调用 Poll 的频率大于最大间隔,那么消费者将会主动离开消费组,以便其他消费者接管该分区

40、Kafa 中如何保证顺序消费

Kafka 的消费单元是 Partition,同一个 Partition 使用 offset 作为唯一标识保证顺序性,但这只是保证了在 Partition 内部的顺序性而不是 Topic 中的顺序,因此我们需要将所有消息发往统一 Partition 才能保证消息顺序消费,那么可以在发送的时候指定 MessageKey,同一个 key 的消息会发到同一个 Partition 中。

41、讲讲 kafka 维护消费状态跟踪的方法

大部分消息系统在 broker 端的维护消息被消费的记录:一个消息被分发到consumer 后 broker 就马上进行标记或者等待 customer 的通知后进行标记。这样也可以在消息在消费后立马就删除以减少空间占用。
但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的,旦 consumer 处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题,很多消息系统提供了另外一个个功能:当消息被发送出去之后仅仅被标记为已发送状态,当接到 consumer 已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,但产生了新问题,首先如果 consumer处理消息成功了但是向 broker 发送响应时失败了,这条消息将被消费两次。第二个问题时,broker 必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大量的状态数据,比如如果消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状态,Kafka 采用了不同的策略。Topic 被分成了若干分区,每个分区在同一时间只被一个 consumer 消费。这意味着每个分区被消费的消息在日志中的位置仅仅是一个简单的整数:offset。这样就很容易标记每个分区消费状态就很容易了,仅仅需要一个整数而已。这样消费状态的跟踪就很简单了。
这带来了另外一个好处:consumer 可以把 offset 调成一个较老的值,去重新消费老的消息。这对传统的消息系统来说看起来有些不可思议,但确实是非常有用的,谁规定了一条消息只能被消费一次呢?

42、讲一下主从同步

Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topci配置副本的数量。Kafka会自动在每个个副本上备份数据,所以当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。

43、为什么需要消息系统,mysql 不能满足需求吗?

(1)解耦:
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
(2)冗余:
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
(3)扩展性:
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
(4)灵活性 & 峰值处理能力:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
(5)可恢复性:
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
(6)顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
(7)缓冲:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
(8)异步通信:
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

44、Zookeeper 对于 Kafka 的作用是什么?

Zookeeper 是一个开放源码的、高性能的协调服务,它用于 Kafka 的分布式应用。
Zookeeper 主要用于在集群中不同节点之间进行通信
在 Kafka 中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

45、数据传输的事务定义有哪三种?

和 MQTT 的事务定义一样都是 3 种。
(1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
(2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
(3)精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的

46、Kafka 判断一个节点是否还活着有那两个条件?

(1)节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
(2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久

47、Kafka 与传统 MQ 消息系统之间有三个关键区别

(1).Kafka 持久化日志,这些日志可以被重复读取和无限期保留
(2).Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性
(3).Kafka 支持实时的流式处理

48、讲一讲 kafka 的 ack 的三种机制

request.required.acks 有三个值 0 1 -1(all)
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据。
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。
-1(all):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的ack,这样数据不会丢失

49、消费者如何不自动提交偏移量,由应用提交?

将 auto.commit.offset 设为 false,然后在处理一批消息后 commitSync() 或者异步提交 commitAsync()
即:

ConsumerRecords<> records = consumer.poll();
for (ConsumerRecord<> record : records){
    。。。
    tyr{
        consumer.commitSync()
    }
    。。。
}

50、消费者故障,出现活锁问题如何解决?

出现“活锁”的情况,是它持续的发送心跳,但是没有处理。为了预防消费者在这种情况下一直持有分区,我们使用 max.poll.interval.ms 活跃检测机制。 在此基础上,如果你调用的 poll 的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。 发生这种情况时,你会看到 offset 提交失败(调用commitSync()引发的 CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交 offset。所以要留在组中,你必须持续调用 poll。
消费者提供两个配置设置来控制 poll 循环:
max.poll.interval.ms:增大 poll 的间隔,可以为消费者提供更多的时间去处理返回的消息(调用 poll(long)返回的消息,通常返回的消息都是一批)。缺点是此值越大将会延迟组重新平衡。
max.poll.records:此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔,减少重新平衡分组的
对于消息处理时间不可预测地的情况,这些选项是不够的。 处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用 poll。 但是必须注意确保已提交的 offset 不超过实际位置。另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。 还要注意,你需要 pause 暂停分区,不会从 poll 接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致你机器内存溢出)。

51、如何控制消费的位置

kafka 使用 seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最早和最新的 offset 的特殊的方法也可用(seekToBeginning(Collection) 和seekToEnd(Collection))

52、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?

Kafka 分布式的单位是 partition,同一个 partition 用一个 write ahead log 组织,所以可以保证 FIFO 的顺序。不同 partition 之间不能保证顺序。但是绝大多数用户都可以通过 message key 来定义,因为同一个 key 的 message 可以保证只发送到同一个 partition。
Kafka 中发送 1 条消息的时候,可以指定(topic, partition, key) 3 个参数。partiton 和 key 是可选的。如果你指定了 partition,那就是所有消息发往同 1个 partition,就是有序的。并且在消费端,Kafka 保证,1 个 partition 只能被1 个 consumer 消费。或者你指定 key( 比如 order id),具有同 1 个 key 的所有消息,会发往同 1 个 partition。

53、kafka 的高可用机制是什么?

这个问题比较系统,回答出 kafka 的系统特点,leader 和 follower 的关系,消息读写的顺序即可。

54、kafka 如何减少数据丢失

Kafka到底会不会丢数据(data loss)? 通常不会,但有些情况下的确有可能会发生。下面的参数配置及Best practice列表可以较好地保证数据的持久性(当然是trade-off,牺牲了吞吐量)。
block.on.buffer.full = true
acks = all
retries = MAX_VALUE
max.in.flight.requests.per.connection = 1
使用KafkaProducer.send(record, callback)
callback逻辑中显式关闭producer:close(0)
unclean.leader.election.enable=false
replication.factor = 3
min.insync.replicas = 2
replication.factor > min.insync.replicas
enable.auto.commit=false
消息处理完成之后再提交位移

55、kafka 如何不消费重复数据?比如扣款,我们不能重复的扣。

其实还是得结合业务来思考,我这里给几个思路:
比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

#56、谈谈 Kafka 吞吐量为何如此高?

多分区、batch send、kafka Reator 网络模型、pagecache、sendfile 零拷贝、数据压缩

1>顺序读写

上图就展示了Kafka是如何写入数据的, 每一个Partition其实都是一个文件 ,收到消息后Kafka会把数据插入到文件末尾(虚框部分)。

这种方法有一个缺陷—— 没有办法删除数据 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示 读取到了第几条数据 。

2>Page Cache

为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。

3>零拷贝

零拷贝就是一种避免 CPU 将数据从一块存储拷贝到另外一块存储的技术。

linux操作系统 “零拷贝” 机制使用了sendfile方法, 允许操作系统将数据从Page Cache 直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区, 这样避免重新复制数据 。示意图如下:

通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。

4>分区分段+索引

Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。

5>批量读写

Kafka数据读写也是批量的而不是单条的。

6>批量压缩

如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩

57、请谈一谈 Kafka 数据一致性原理

一致性就是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。

假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

58、LEO、HW、LSO、LW等分别代表什么

LEO:是 LogEndOffset 的简称,代表当前日志文件中下一条

HW:水位或水印(watermark)一词,也可称为高水位(high watermark),通常被用在流式处理领域(比如Apache Flink、Apache Spark等),以表征元素或事件在基于时间层面上的进度。在Kafka中,水位的概念反而与时间无关,而是与位置信息相关。严格来说,它表示的就是位置信息,即位移(offset)。取 partition 对应的 ISR中 最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置上一条信息。

LSO:是 LastStableOffset 的简称,对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同

LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值。

59、Kafka 在什么情况下会出现消息丢失?

尽管 Kafka 在设计上致力于保证数据的可靠性,但在某些情况下仍可能出现消息丢失的情况,主要包括以下几种情况:

生产者发送消息失败:如果生产者在发送消息时发生错误,并且没有进行适当的错误处理,消息可能会丢失。例如,如果生产者发送消息的网络连接中断或发生故障,而没有进行重试或错误处理,消息可能会丢失。

ISR 中的副本数量不足:ISR 是与主副本保持同步的一组副本,如果 ISR 中的副本数量不足,可能会导致消息丢失。如果某个分区的 ISR 中的副本数量低于配置的最小副本数,Kafka 将无法保证消息的可靠性。

消费者位移的提交失败:Kafka 中的消费者通过提交位移来跟踪消费进度。如果消费者在处理消息后未能正确提交位移,例如发生错误或异常导致提交失败,那么在重新启动消费者时可能会导致消息重复消费或丢失。

数据丢失的副本未能及时替换:当副本发生故障时,Kafka 会自动将其替换为其他正常的副本。然而,如果替换过程出现延迟或失败,可能会导致数据丢失。例如,如果替换过程中发生网络故障或副本同步失败,可能会导致数据丢失。

为了尽量避免消息丢失,可以采取以下措施:

生产者应该实现错误处理和重试机制,确保消息发送的可靠性。
配置适当的 ISR 复制因子,保证 ISR 中的副本数量足够。
消费者应该正确处理位移的提交,确保消费进度的正确跟踪。
监控 Kafka 集群的运行状态,及时发现并解决副本替换等故障情况。

60、Kafka 如何保证可靠性和一致性

写入确认机制:在 Kafka 中,生产者发送消息时,会等待消息被成功写入到至少一个副本中,并收到服务器的确认响应,才会认为消息写入成功。只有当所有的副本都写入成功后,生产者才会收到确认响应。这样可以确保消息写入的可靠性。

复制机制:Kafka 使用副本机制来保证数据的可靠性。每个分区都可以配置多个副本,每个副本都保存了完整的消息数据。如果某个副本发生故障,Kafka 会自动将其替换为其他正常的副本,确保数据的可用性和一致性。

ISR(In-Sync Replicas)机制:ISR 是指与主副本保持同步的一组副本。Kafka 只会将消息发送给 ISR 中的副本,当 ISR 中的副本都成功写入消息后,才会给生产者发送确认响应。这样可以确保数据的一致性,因为只有在 ISR 中的副本都写入成功后,才会认为消息写入成功。

举例来说,假设一个 Kafka 主题有三个分区,每个分区都配置了三个副本。当生产者发送一条消息时,它会等待消息被至少一个副本写入,并收到确认响应。然后,这个消息会被发送给 ISR 中的副本进行复制。当 ISR 中的副本都成功写入消息后,生产者才会收到确认响应。

如果某个副本发生故障,Kafka 会将其替换为其他正常的副本,并确保新的副本与主副本保持同步。只有当新的副本成功与主副本同步后,才会将其加入到 ISR 中,继续参与消息复制和确认流程。

通过这些机制,Kafka 可以保证数据的可靠性和一致性,即使在发生故障的情况下也能保证消息的正确传递和处理。

61、数据传输的事务有几种?

数据传输的事务定义通常有以下三种级别:
(1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
(2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
(3)精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被

62、Kafka 消费者是否可以消费指定分区消息?

Kafa consumer消费消息时,向broker发出fetch请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的

63、Kafka 高效文件存储设计特点

Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
通过索引信息可以快速定位message和确定response的最大大小。
通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小

64、Kafka创建Topic时如何将分区放置到不同的Broker中

副本因子不能大于 Broker 的个数;
第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的
具体可以参见 Kafka创建Topic时如何将分区放置到不同的Broker中。

65、Kafka新建的分区会在哪个目录下创建

在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。
当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。
如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。
但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic名+分区ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。

66、谈一谈 Kafka 的再均衡

在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:
第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
所以对于Rebalance来说,Coordinator起着至关重要的作用

67、谈谈 Kafka 分区分配策略

参见我这篇文章 Kafka分区分配策略(Partition Assignment Strategy)

68、Kafka Producer 是如何动态感知主题分区数变化的?

参见我这篇文章:Kafka Producer是如何动态感知Topic分区数变化

69、 Kafka 是如何实现高吞吐率的?

Kafka是分布式消息系统,需要处理海量的消息,Kafka的设计是把所有的消息都写入速度低容量大的硬盘,以此来换取更强的存储能力,但实际上,使用硬盘并没有带来过多的性能损失。kafka主要使用了以下几个方式实现了超高的吞吐率:

顺序读写;
零拷贝
文件分段
批量发送
数据压缩。
具体参见:Kafka是如何实现高吞吐率的

70、Kafka 监控都有哪些?

参见我另外几篇文章:Apache Kafka监控之KafkaOffsetMonitor
雅虎开源的Kafka集群管理器(Kafka Manager)
Apache Kafka监控之Kafka Web Console
还有 JMX

71、如何为Kafka集群选择合适的Topics/Partitions数量

参见我另外几篇文章:如何为Kafka集群选择合适的Topics/Partitions数量

72、谈谈你对 Kafka 事务的了解?

参见这篇文章: http://www.jasongj.com/kafka/transaction/

73、谈谈你对 Kafka 幂等的了解?

参见这篇文章: https://www.jianshu.com/p/b1599f46229b

74、Kafka 缺点?

由于是批量发送,数据并非真正的实时;
对于mqtt协议不支持;
不支持物联网传感数据直接接入;
仅支持统一分区内消息有序,无法实现全局消息有序;
监控不完善,需要安装插件;
依赖zookeeper进行元数据管理;

75、Kafka 新旧消费者的区别

旧的 Kafka 消费者 API 主要包括:SimpleConsumer(简单消费者) 和 ZookeeperConsumerConnectir(高级消费者)。SimpleConsumer 名字看起来是简单消费者,但是其实用起来很不简单,可以使用它从特定的分区和偏移量开始读取消息。高级消费者和现在新的消费者有点像,有消费者群组,有分区再均衡,不过它使用 ZK 来管理消费者群组,并不具备偏移量和再均衡的可操控性。
现在的消费者同时支持以上两种行为,所以为啥还用旧消费者 API 呢?

76、Kafka 分区数可以增加或减少吗?为什么?

我们可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。
Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其他分区后面的话那么就破坏了 Kafka 单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂

77、 Kafka Replicas是怎么管理的?

  • AR:分区中的所有 Replica 统称为 AR
  • ISR:所有与 Leader 副本保持一定程度同步的Replica(包括 Leader 副本在内)组成 ISR
  • OSR:与 Leader 副本同步滞后过多的 Replica 组成了 OSR

Leader 负责维护和跟踪 ISR 集合中所有 Follower 副本的滞后状态,当 Follower 副本落后过多时,就会将其放入 OSR 集合,当 Follower 副本追上了 Leader 的进度时,就会将其放入 ISR 集合。

默认情况下,只有 ISR 中的副本才有资格晋升为 Leader

78、如何确定当前能读到哪一条消息?

这个问题要先了解上一个问题的概念

分区相当于一个日志文件,我们先简单介绍几个概念

如上图是一个分区日志文件

  • 标识共有**7**条消息,offset (消息偏移量)分别是0~6
  • 0 代表这个日志文件的开始
  • HW(High Watermark) 为4,0~3 代表这个日志文件可以消费的区间,消费者只能消费到这四条消息
  • LEO 代表即将要写入消息的偏移量 offset

分区 ISR 集合中的每个副本都会维护自己的 LEO**,而** ISR 集合中最小的**LEO** 即为分区的 HW

如上图: 三个分区副本都是 ISR集合当中的,最小的 LEO 为 3,就代表分区的 HW 为3,所以当前分区只能消费到 0~2 之间的三条数据,如下图

79、 发送消息的分区策略有哪些?

· 1.轮询:依次将消息发送该topic下的所有分区,如果在创建消息的时候 key 为 null,Kafka 默认采用这种策略。

· 2.key 指定分区:在创建消息是 key 不为空,并且使用默认分区器,Kafka 会将 key 进行 hash,然后根据**hash**值映射到指定的分区上。这样的好处是 key 相同的消息会在一个分区下,Kafka 并不能保证全局有序,但是在每个分区下的消息是有序的,按照顺序存储,按照顺序消费。在保证同一个 key 的消息是有序的,这样基本能满足消息的顺序性的需求。但是如果 partation 数量发生变化,那就很难保证 key 与分区之间的映射关系了

· 3.自定义策略:实现 Partitioner 接口就能自定义分区策略。

· 4.指定 Partiton 发送

80、 Kafka 的可靠性是怎么保证的?

1.acks

这个参数用来指定分区中有多少个副本收到这条消息,生产者才认为这条消息是写入成功的,这个参数有三个值:

  • 1.acks = 1,默认为1。生产者发送消息,只要 leader 副本成功写入消息,就代表成功。这种方案的问题在于,当返回成功后,如果 leader 副本和 follower 副本还没有来得及同步,leader 就崩溃了,那么在选举后新的 leader 就没有这条消息,也就丢失了
  • 2.acks = 0。生产者发送消息后直接算写入成功,不需要等待响应。这个方案的问题很明显,只要服务端写消息时出现任何问题,都会导致消息丢失
  • 3.acks = -1 或 acks = all。生产者发送消息后,需要等待 ISR 中的所有副本都成功写入消息后才能收到服务端的响应。毫无疑问这种方案的可靠性是最高的,但是如果 ISR 中只有leader 副本,那么就和 acks = 1 毫无差别了。

2.**消息发送的方式**

第6问中我们提到了生产者发送消息有三种方式,发完即忘,同步和异步。我们可以通过同步或者异步获取响应结果,失败做重试来保证消息的可靠性。

3.**手动提交位移**

默认情况下,当消费者消费到消息后,就会自动提交位移。但是如果消费者消费出错,没有进入真正的业务处理,那么就可能会导致这条消息消费失败,从而丢失。我们可以开启手动提交位移,等待业务正常处理完成后,再提交offset。

4.**通过副本** LEO 来确定分区 HW

可参考第四问

#81、 分区再分配是做什么的?解决了什么问题?

分区再分配主要是用来维护 kafka 集群的负载均衡

既然是分区再分配,那么 kafka 分区有什么问题呢?

问题**1**:当集群中的一个节点下线了

  • 如果该节点的分区是单副本的,那么分区将会变得不可用
  • 如果是多副本的,就会进行 leader 选举,在其他机器上选举出新的 leader

kafka 并不会将这些失效的分区迁移到其他可用的 broker ,这样就会影响集群的负载均衡,甚至也会影响服务的可靠性和可用性

问题**2:集群新增 broker 时,只有新的主题分区会分配在该 broker 上,而老的主题分区不会分配在该 broker 上,就造成了老节点和新节点之间的负载不均衡**。

为了解决该问题就出现了分区再分配,它可以在集群扩容,broker 失效的场景下进行分区迁移。

分区再分配的原理就是通化控制器给分区新增新的副本,然后通过网络把旧的副本数据复制到新的副本上,在复制完成后,将旧副本清除。 当然,为了不影响集群正常的性能,在此复制期间还会有一系列保证性能的操作,比如复制限流

82、 Kafka Partition 副本 leader 是怎么选举的?

这个问题设计的点比较多,拓展的也更多一点,建议耐心阅读。

常用选主机制的缺点:

split-brain (脑裂):

这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但是网络延迟,并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致,可能选出多个领导“大脑”,导致“脑裂”。

herd effect (羊群效应):

如果宕机的那个Broker上的Partition比较多, 会造成多个Watch被触发,造成集群内大量的调整,导致大量网络阻塞。

ZooKeeper负载过重:

每个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增加到几千个Partition时ZooKeeper负载会过重。

优势:

Kafka的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。 controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为此作为响应的Broker。

没有使用 zk,所以无 2.3 问题;也没有注册 watch无 2.2 问题 leader 失败了,就通过 controller 继续重新选举即可,所以克服所有问题。

Kafka partition leader**的选举:**

由 controller 执行:

  • 从Zookeeper中读取当前分区的所有ISR(in-sync replicas)集合
  • 调用配置的分区选择算法选择分区的leader

分区选择算法

上面五种分区算法都是选择PreferredReplica(优先副本选举)作为当前Partition的leader。区别仅仅是选择leader之后的操作有所不同。

#83、 分区数越多越好吗?吞吐量就会越高吗?

般类似于这种问题的答案,都是持否定态度的。

但是可以说,在一定条件下,分区数的数量是和吞吐量成正比的,分区数和性能也是成正比的

那么为什么说超过了一定限度,就会对性能造成影响呢?原因如下:

1.**客户端/服务器端需要使用的内存就越多**

服务端在很多组件中都维护了分区级别的缓存,分区数越大,缓存成本也就越大。 消费端的消费线程数是和分区数挂钩的,分区数越大消费线程数也就越多,线程的开销成本也就越大 生产者发送消息有缓存的概念,会为每个分区缓存消息,当积累到一定程度或者时间时会将消息发送到分区,分区越多,这部分的缓存也就越大

2.**文件句柄的开销**

每个 partition 都会对应磁盘文件系统的一个目录。在 Kafka 的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。每个 broker 会为每个日志段文件打开一个 index 文件句柄和一个数据文件句柄。因此,随着 partition 的增多,所需要保持打开状态的文件句柄数也就越多,最终可能超过底层操作系统配置的文件句柄数量限制。

3.**越多的分区可能增加端对端的延迟**

Kafka 会将分区 HW 之前的消息暴露给消费者。分区越多则副本之间的同步数量就越多,在默认情况下,每个 broker 从其他 broker 节点进行数据副本复制时,该 broker 节点只会为此工作分配一个线程,该线程需要完成该 broker 所有 partition 数据的复制。

4.**降低高可用性**

在第 7 问我们提到了分区再分配,会将数据复制到另一份副本当中,分区数量越多,那么恢复时间也就越长,而如果发生宕机的 broker 恰好是 controller 节点时:在这种情况下,新 leader 节点的选举过程在 controller 节点恢复到新的 broker 之前不会启动。controller 节点的错误恢复将会自动地进行,但是新的 controller 节点需要从 zookeeper 中读取每一个 partition 的元数据信息用于初始化数据。例如,假设一个Kafka 集群存在 10000个partition,从 zookeeper 中恢复元数据时每个 partition 大约花费 2 ms,则 controller 的恢复将会增加约 20 秒的不可用时间窗口。

84、kafka 为什么这么快?

· 1.顺序读写磁盘分为顺序读写与随机读写,基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,kafka 这里采用的就是顺序读写。

· 2.Page Cache为了优化读写性能,Kafka 利用了操作系统本身的 Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。

· 3.零拷贝Kafka使用了零拷贝技术,也就是直接将数据从内核空间的读缓冲区直接拷贝到内核空间的 socket 缓冲区,然后再写入到 NIC 缓冲区,避免了在内核空间和用户空间之间穿梭。

· 4.分区分段**+**索引Kafka 的 message 是按 topic分 类存储的,topic 中的数据又是按照一个一个的 partition 即分区存储到不同 broker 节点。每个 partition 对应了操作系统上的一个文件夹,partition 实际上又是按照segment分段存储的。通过这种分区分段的设计,Kafka 的 message 消息实际上是分布式存储在一个一个小的 segment 中的,每次文件操作也是直接操作的 segment。为了进一步的查询优化,Kafka 又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。

· 5.批量读写**Kafka** 数据读写也是批量的而不是单条的,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。

6.批量压缩Kafka 把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络 IO 损耗,通过 mmap 提高 I/O 速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合 sendfile 进行直接读取。

85、 如何解决消息的重复消费?

· 1,首先分析消息队列本身都是保证的”至少一次(At least Once)”的语义,所以如果要通过MQServer配置和业务代码配合来解决。

另外其他几种常见的语义,我们需要保证Exactly Once语义。

    • 最多一次(**At most once**)
    • 至少一次(**At least once**)
    • 仅一次( Exactly once**)**

· 2,分析出现重复消费原因

o (1),生产者发送了重复的消息

一般为了保证消息的可靠性,生产者在发送了信息后需要等待Broker的响应,此时如果出现网络波动等情况,响应超出了时间之后,会导致数据的重复发送,

o (2),消费者在消费消息的重复的

当消费者在消费消息的时候,虽然基本的业务逻辑已经走完了,但是在提交Offset的时候,消费者服务挂了,那么这条被消费但是没有被提交的消息会发送到其他消费者中,导致这条消息被重复消费。

    • (3),补充“幂等性”的概念:通俗的说,同样的接口或者数据去调用同一个接口的时候,无论重复调用多少次,总能保证数据的正确性,不能出错,这里特指的是接口的幂等性。

· 3,解决重复的消费的问题

o (1),基于业务侧的调整:

      • 生产者:可以在发送消息的时候,添加一个唯一的字段,在插入数据的时候在数据库中进行唯一校验即可;但是这样在一定的程度上会影响数据库的性能,一般是需要强检验的场景中会使用。
      • 消费者:首先是开始手动提交**Offset,单独创建一个消费记录表,将提交Offset、执行插入动作的sql和插入消费记录表唯一key的操作都放在同一个事务**中,那么当插入之前先判断这个唯一的key是否已经在消费记录表中,只有不存在的才继续消费。

86、如果出现消息积压,应该怎么办?

· 1,分析:出现消息积压一般是是消息的消费速度比不上生产的速度,从而导致了消息堆积。

o (1),导致出现消息积压的情况可能有以下集中情况:

      • 磁盘写满了,导致数据写入磁盘的时候无法写入,进而导致的消息的积压。
      • 数据写入MySQL,但是此时MySQL的服务出现了异常,也可能直接服务down掉等等,导致的消息积压。
      • 有可能是因为程序中某步操作时时候,线程耗时过长,导致的消息堆积

· 2,解决方法

o 消费者

§ (1),可以适当增加消费者组的机器数量,以提升整体的消费能力;如果是线上的紧急任务,我们可以通过创建一个Topic,Partition是原来的十倍,然后临时写一个Consumer程序,并启动多个线程去进行消费,消费的数据只是临时存储,等待处理消费完积压的数据之后,恢复原先部署的架构,重新用原先Consumer机器来消费消息。

如果可以的话,尽量保留部分现场环境,便于排查消费能力下降的原因。

      • (2),排查出问题之后,我们不仅要解决问题,也要对集群的消费能力再进行一次评估,避免是因为消费能力不够引起的消息堆积,尤其是针对一些顺发的流量,比如大促活动之类的。
      • (3),优化每条消息的消费过程,从业务的角度考虑优化。
    • 生产者:生产者要能及时感知到消费者的能力不足,出现消息积压的时候,可以适当放缓消息放入的速度,可以直接给前端页面提示排队等等。

· 3,补充:数据积压的时间太长了,导致是消息队列中设置了过期时间的的数据丢失问题

答:如果出现消息的丢失问题,想办法找到丢失的部分数据,重新发送到MQ集群里。

87、如何保证数据的一致性问题?

关于数据的一致性问题,这里通过“解耦”场景举例:比如电商场景中,下单成功之后,再通知库房扣减库存。

那么这个时候,我们在同一个事务空间中,先处理下单的数据库操作,然后发送MQ消息;剩下的扣减库存的操作交给消费者进行。

另外在消费的环节,也可能会出现数据不一致的情况,那么我们可以采用最终一致性原则,增加重试的机制。

88、MQ框架是如何实现高吞吐量的?

  • 消息可以批量处理
  • 对消息体进行压缩,从而节省传输的带宽和存储空间
  • 顺序写入磁盘(Kafka):每个分区内是有序的。
  • 零拷贝(Kafka):直接在内核层将消息的内容传递给网络Socket,从而避免了应用层之间的拷贝。
  • 采用页缓存(Page Cache),使用操作的系统的内存,而不是使用JVM的内存,能够避免占用堆内存和GC问题。
  • 采用分区的设计,在每个分区中保持是有序的,另外每个分区可以针对不同的机器消费信息,可以用于并发处理。

89、MQ事务消息是如何实现的?

这里的实现方式类似于“两阶段提交”,在MySQL的事务中也是处理的

90、Kafka为什么不支持读写分离?

如果使用读写分离的策略,必然会有主和副本之间数据同步,要保证其一致性,另外副本在同步的时候如何保证实时性。

  • 数据一致性:如果采用一主多从的方式,Leader副本的数据在同步到Follower副本的时候会存在一定的延迟,那么Follower副本的消息位移也不一样,但是消费者需要通过消费位移来控制消息拉取的进度,多个副本之间要维护统一消费位移的一致性。那么如果要解决这个问题,就需要引入分布式锁,保证锁的安全,非常耗费性能。
  • 实时性:如果网络延迟比较大,在同步的过程中难免会影响效率,从而可能无法满足实时性业务的需求。

91、MQ如何实现高可用?

这里直接以Kafka举例的,其他基本是类似的。

简单来说,就是几个节点之间,选举出主节点(Leader),那么这个时候如果主节点宕机了,可以从其他的节点中进行重新选举。另外每个节点在保存的数据的时候,会在从节点(Follower)中保存相应的副本,通过多副本机制,又是另一个高可用的体现。

92、如何保证MQ的消息是有序的?

这里的讲解主要是以Kafka为例进行讲解的。

  • 方式1,可以强制只有一个分区,那么在一个分区中就是有序的,那么整体就是有序的。但是只有一个分区kafka的吞吐量就不高了。

· 方式2,从业务的角度考虑,可以通过自定义分区的策略(org.apache.kafka.clients.Partitioner)将满足指定规则的数据存储在同一个分区中,从而实现有序:

    • (1),比如,同一个订单的不同状态的消息存储在同一个分区中
    • (2),或者,同一个登录的用户的各类操作存储在同一个分区中

93、如何解决MQ的消息丢失?

这里是针对Kafka进行分析的,其他的框架也会有类似的:

· 1,分析可能出现消息丢失的几种情况:

    • (1),消息队列本身:在写数据的过程中,我们如果只保证写入Leader节点,而不管副本是否同步成功就算写入成功的话,这种情况下是存在单点故障的,即如果Leader节点挂了那么就会出现丢失数据的情况;
    • (2),生产者:由于网络的延迟,导致数据出现发送失败情况,也可以理解为数据丢失的一种情况;
    • (3),消费者:使用自动提交Offset的方式,会出现数据在处理完成之前就把Offset提交了,这样也会出现数据丢失的情况;

· 2,针对以上几种情况提出具体的解决方案:

o 生产者:

      • (1),配置acks=1 `或者 acks=all`参数,在Leader节点写入成功之后,将消息同步到副本列表中
      • (2),数据发送失败了之后,可以指定重试(retries)的次数,在一些强校验的场景下可以设置为Integer.MAX_VALUE

o 可以从Broker的角度考虑:

保证消费者消费到数据之后,再删除Broker中的暂存的信息。

如果是kafka的话,在Broker层面,使用到了ISR`列表+ HW高水位 + Leader Epoch `来防止数据丢失。

o 消费者端:

关闭自动提交,根据回调函数合理处理消息,并手动提交Offset。

94、为什么 Follower 副本不提供读服务?

这个问题,本质上来说是对性能和一致性的取舍。假设follower也提供读写服务,固然会提高性能,但是同时也会出现类似于数据库中幻读、脏读等问题。出现这一情况,主要是因为他们之间的同步的不一定是完全一致的。

95、Leader 和 Follower 是什么?

Partition分区中,分为两种节点:Leader、Follower,这两者之间是主备关系,当Leader节点挂了的时候,会通过选择在Follower节点中生成新的Leader节点。

  • Leader:所有的读写操作都发生在Leader分区上。
  • Follower:所有的Follower节点都需要从Leader节点上同步消息,并做为Leader的备份节点。

96、Kafka 的数据是放在磁盘上还是内存上,为什么速度会快?

kafka 使用的是磁盘存储。

速度快是因为:

顺序写入:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘 “讨厌”随机 I/O, 喜欢顺序 I/O。为了提高读写硬盘的速度,Kafka 就是使用顺序 I/O。

Memory Mapped Files(内存映射文件):64 位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。

Kafka 高效文件存储设计:Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。通过索引信息可以快速定位 message 和确定 response 的 大 小。通过 index 元数据全部映射到 memory(内存映射文件), 可以避免 segment file 的 IO 磁盘操作。通过索引文件稀疏存储,可以大幅降低 index 文件元数据占用空间大小。

注:
Kafka 解决查询效率的手段之一是将数据文件分段,比如有 100 条 Message,它们的 offset 是从 0 到 99。假设将数据文件分成 5 段,第一段为 0-19,第二段为 20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中 小的 offset 命名。这样在查找指定 offset 的 Message 的时候,用二分查找就可以定位到该 Message 在哪个段中。

为数据文件建 索引数据文件分段 使得可以在一个较小的数据文件中查找对应 offset 的 Message 了,但是这依然需要顺序扫描才能找到对应 offset 的 Message。为了进一步提高查找的效率,Kafka 为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index

97、kafka 的消费分区分配策略

一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费 Kafka 有三种分配策略,一是 RoundRobin,一是 Range。高版本还有一个 StickyAssignor 策略 将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)。当以下事件发生时,Kafka 将会进行一次分区分配:

同一个 Consumer Group 内新增消费者。

消费者离开当前所属的 Consumer Group,包括 shuts down 或 crashes。

  • Range 分区分配策略

Range 是对每个 Topic 而言的(即一个 Topic 一个 Topic 分),首先对同一个 Topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用 Partitions 分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。假设 n=分区数/消费者数量,m=分区数 %消费者数量,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量-m)个消费者每个分配 n 个分区。假如有 10 个分区,3 个消费者线程,把分区按照序号排列

0,1,2,3,4,5,6,7,8,9

消费者线程为

C1-0,C2-0,C2-1

那么用 partition 数除以消费者线程的总数来决定每个消费者线程消费几个 partition,如果除不尽,前面几个消费者将会多消费一个分区。在我们的例子里面,我们有 10 个分区,3 个消费者线程,10/3 = 3,而且除除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

C1-0:0,1,2,3

C2-0:4,5,6

C2-1:7,8,9

如果有 11 个分区将会是:

C1-0:0,1,2,3

C2-0:4,5,6,7

C2-1:8,9,10

假如我们有两个主题 T1,T2,分别有 10 个分区,最后的分配结果将会是这样:

C1-0:T1(0,1,2,3) T2(0,1,2,3)

C2-0:T1(4,5,6) T2(4,5,6)

C2-1:T1(7,8,9) T2(7,8,9)

  • RoundRobinAssignor 分区分配策略

RoundRobinAssignor 策略的原理是将消费组内所有消费者以及消费者所订阅的所有 topic 的 partition 按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者. 使用 RoundRobin 策略有两个前提条件必须满足:

同一个消费者组里面的所有消费者的 num.streams(消费者消费线程数)必须相等;每个消费者订阅的主题必须相同。加入按照 hashCode 排序完的 topic-partitions 组依次为

T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9

我们的消费者线程排序为

C1-0, C1-1, C2-0, C2-1

最后分区分配的结果为:

C1-0 将消费 T1-5, T1-2, T1-6 分区

C1-1 将消费 T1-3, T1-1, T1-9 分区

C2-0 将消费 T1-0, T1-4 分区

C2-1 将消费 T1-8, T1-7 分区

  • StickyAssignor 分区分配策略

Kafka 从 0.11.x 版本开始引入这种分配策略,它主要有两个目的:

分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个 分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目的,StickyAssignor 策略的具体实现要比 RangeAssignor 和 RoundRobinAssignor 这两种分配策略要复杂很多。

假设消费组内有 3 个消费者

C0、C1、C2

它们都订阅了 4 个主题:

t0、t1、t2、t3

并且每个主题有 2 个分区,也就是说整个消费组订阅了

t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1 这 8 个分区

最终的分配结果如下:

消费者 C0:t0p0、t1p1、t3p0

消费者 C1:t0p1、t2p0、t3p1

消费者 C2:t1p0、t2p1

这样初看上去似乎与采用 RoundRobinAssignor 策略所分配的结果相同

此时假设消费者 C1 脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用 RoundRobinAssignor 策略,那么此时的分配结果如下:

消费者 C0:t0p0、t1p0、t2p0、t3p0

消费者 C2:t0p1、t1p1、t2p1、t3p1

如分配结果所示,RoundRobinAssignor 策略会按照消费者 C0 和 C2 进行重新轮询分配。而如果此时使用的是 StickyAssignor 策略,那么分配结果为:

消费者 C0:t0p0、t1p1、t3p0、t2p0

消费者 C2:t1p0、t2p1、t0p1、t3p1

可以看到分配结果中保留了上一次分配中对于消费者 C0 和 C2 的所有分配结果,并将原来消费者 C1 的“负担”分配给了剩余的两个消费者 C0 和 C2,最终 C0 和 C2 的分配还保持了均衡。

如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor 策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。

到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。

举例,同样消费组内有 3 个消费者:

C0、C1、C2

集群中有 3 个主题:

t0、t1、t2

这 3 个主题分别有

1、2、3 个分区

也就是说集群中有

t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这 6 个分区

消费者 C0 订阅了主题 t0

消费者 C1 订阅了主题 t0 和 t1

消费者 C2 订阅了主题 t0、t1 和 t2

如果此时采用 RoundRobinAssignor 策略:

消费者 C0:t0p0

消费者 C1:t1p0

消费者 C2:t1p1、t2p0、t2p1、t2p2

如果此时采用的是 StickyAssignor 策略:

消费者 C0:t0p0

消费者 C1:t1p0、t1p1

消费者 C2:t2p0、t2p1、t2p2

此时消费者 C0 脱离了消费组,那么 RoundRobinAssignor 策略的分配结果为:

消费者 C1:t0p0、t1p1

消费者 C2:t1p0、t2p0、t2p1、t2p2

StickyAssignor 策略,那么分配结果为:

消费者 C1:t1p0、t1p1、t0p0

消费者 C2:t2p0、t2p1、t2p2

可以看到 StickyAssignor 策略保留了消费者 C1 和 C2 中原有的 5 个分区的分配:

t1p0、t1p1、t2p0、t2p1、t2p2。

从结果上看 StickyAssignor 策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂

98、kafka 事务是怎么实现的

Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

  • Producer 事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer 获得的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的 Transaction ID 获得原来的 PID。为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

  • Consumer 事务

对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

Exactly Once 语义

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。

At Least Once 可以保证数据不丢失,但是不能保证数据不重复;

相对的,At Most Once 可以保证数据不重复,但是不能保证数据不丢失。

但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

0.11 版本的 Kafka,引入了一项重大特性:幂等性。

开启幂等性 enable.idempotence=true。

所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。即:

At Least Once + 幂等性 = Exactly Once

Kafka 的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而 Broker 端会对< PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。

但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。

补充,在流式计算中怎么 Exactly Once 语义?以 flink 为例

  1. souce:使用执行 ExactlyOnce 的数据源,比如 kafka 等

内部使用 FlinkKafakConsumer,并开启 CheckPoint,偏移量会保存到 StateBackend 中,并且默认会将偏移量写入到 topic 中去,即 _ consumer_offsets Flink 设置 CheckepointingModel.EXACTLY_ONCE

  1. sink

存储系统支持覆盖也即幂等性:如 Redis,Hbase,ES 等 存储系统不支持覆:需要支持事务(预写式日志或者两阶段提交),两阶段提交可参考 Flink 集成的 kafka sink 的实现。

99、Kafka 为什么不支持读写分离?

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:

数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。

延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

100、在生产者中,何时发生QueueFullException

每当Kafka生产者试图以代理的身份在当时无法处理的速度发送消息时,通常都会发生QueueFullException。但是,为了协作处理增加的负载,用户需要添加足够的代理,因为生产者不会阻止。

101、分区Leader选举策略有几种?

分区的Leader副本选举对用户是完全透明的,它是由Controller独立完成的。你需要回答的是,在哪些场景下,需要执行分区Leader选举。每一种场景对应于一种选举策略。

1**、** OfflinePartition Leader选举:每当有分区上线时,就需要执行Leader选举。所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区Leader选举场景。

2、 ReassignPartition Leader选举:当你手动运行Kafka-reassign-partitions命令,或者是调用Admin的alterPartitionReassignments方法执行分区副本重分配时,可能触发此类选举。假设原来的AR是[1,2,3],Leader是1,当执行副本重分配后,副本集合AR被设置成[4,5,6],显然,Leader必须要变更,此时会发生Reassign Partition Leader选举。

3、PreferredReplicaPartition Leader选举:当你手动运行Kafka-preferred-replica-election命令,或自动触发了Preferred Leader选举时,该类策略被激活。所谓的Preferred Leader,指的是AR中的第一个副本。比如AR是[3,2,1],那么,Preferred Leader就是3。

4、 ControlledShutdownPartition Leader选举:当Broker正常关闭时,该Broker上的所有Leader副本都会下线,因此,需要为受影响的分区执行相应的Leader选举。

这4类选举策略的大致思想是类似的,即从AR中挑选首个在ISR中的副本,作为新Leader

102、Kafka Producer如何优化写入速度?

  1. 增加线程

  2. 提高 batch.size

  3. 增加更多 producer 实例

  4. 增加 partition 数

  5. 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解;

  6. 跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置。

作者:admin  创建时间:2023-06-27 22:47
最后编辑:admin  更新时间:2024-04-07 15:40