WinddSnow

Java面试题15Kafka

字数统计: 4.4k阅读时长: 15 min
2022/10/22

Kafka 是什么?

Kafka 是一款分布式流处理框架,用于实时构建流处理应用。它有一个核心 的功能广为人知,即作为企业级的消息引擎被广泛使用。

明确Kafka 的流处理框架地位

Kafka 的特点?

  • 高吞吐量、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒
  • 可扩展性:kafka 集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为 n,则允许 n-1 个节点失败)
  • 高并发:支持数千个客户端同时读写

什么是消费者组?

消费者组是 Kafka 独有的概念,如果面试官问这 个,就说明他对此是有一定了解的。我先给出标准答案:

  1. 定义:即消费者组是 Kafka 提供的可扩展且具有容错性的消费者机制。
  2. 原理:在 Kafka 中,消费者组是一个由多个消费者实例 构成的组。多个实例共同订阅若干个主题,实现共同消费。同一个组下的每个实例都配置有 相同的组 ID,被分配不同的订阅分区。当某个实例挂掉的时候,其他实例会自动地承担起 它负责消费的分区。

面试小技巧:消费者组的题目,能够帮你在某种程度上掌控下面的面试方向:

  • 如果你擅长位移值原理,就不妨再提一下消费者组的位移提交机制;
  • 如果你擅长 Kafka Broker,可以提一下消费者组与 Broker 之间的交互;
  • 如果你擅长与消费者组完全不相关的 Producer,那么就可以这么说:“消费者组要消费的数据完全来自于 Producer 端生产的消息,我对 Producer 还是比较熟悉的。

Kafka位移提交详解

什么是位移提交,定义

  •  Consumer需要向kafka汇报自己的位移数据,这个汇报过程称为位移提交。因为Consumer可以同时消费多个分区。所以位移提交是按照分区的粒度进行的。即Consumer需要为分配给他的每个分区提交各自的位移数据。
  •  作用是:位移提交表示了Consumer的消费进度。这样当Consumer发生故障重启后,就可以读取之前提交的位移,然后从指定位移开始继续消费。类似于书签。
  •  我们要知道,位移的提交完全由我们掌控,我们可以提交任何位移值,但是我们也就需要为我们提交的位移负责。

位移提交的分类

  • 用户角度分类
    1. 自动提交
    2. 手动提交
  • Consumer角度
    1. 同步提交
    2. 异步提交

自动提交和手动提交:

  • 我们将Consumer端的参数:enable.auto.commit设置为true,就开启了自动提交。如果我们开启了自动提价哪还有一个参数:auto.commit.interval.ms 它是自动提交的时间间隔,默认是5秒钟。
  • 我们开启了自动提交之后就不需要对提交位移进行管理了,并且kafka会保证在开始调用Poll方法时,提交上一次poll返回的所有消息。从顺序上说,是先提价上一批消息的位移,再处理新的消息。因此他能保证消息不丢失,但是会出现重复的情况。
  • 也就是,Consumer是每5秒提价一次位移,如果,Rebalance发生在第三秒的时候。这样当重启之后,前三秒已经消费过的消息就会被重新消费,因为位移数据还是三秒前的位移数据。虽然你可以通过减少提交位移的时间间隔,但是并不能完全避免重复消费,只能减少重复消费的数据。
  • 手动提交:只要将上面的自动提交的参数设置成false就可以了。

同步提交和异步提交

  • 手动提交更加灵活,但是,有一个缺陷:调用commitSync()同步提交时,Consumer会处于阻塞状态。会影响应用程序的TPS。因为这个问题,又出现了另一个API:commitAsync()异步提交。
  • 异步提交不会阻塞Consumer但是它没有重试机制,不能保证提交一定会成功。所以通常情况下是异步提交和同步提交一起使用,先用异步提交进行阶段性提交避免阻塞,然后在Consumer要关闭时再用同步提交最后提交一次,确保Consumer位移的正确性。
  • kafka Consumer API还为手动提交提供了这样的方法:
    commitSync(Map<TopicPartition,OffsetAndMetadata>)和commitAsync(Map<TopicParttion,OffsetAndMetadata>)它们的参数是一个Map对象,键是TopicPartition(消费的分区),值是:OffsetAndMetadata对象(主要保存的是位移数据)。
  • 它可以将一个大的事务分割成若干个小的事务分别提交,能有效的减少错误恢复的时间。

Kafka 中消费者组与 Broker 之间的交互

Kafka中消费者的消费方式

consumer采用pull(拉)模式从broker中读取数据。 拉取模式也有不足,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,kafka消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间后再返回,这段时长即为timeout

Kafka的分区分配策略

一个消费者组中有多个消费者,一个broker有多个分区,所有必然会涉及到分区分配问题,即确定哪一个分区由哪一个consumer来消费。kafka有两种分区分配策略:RoundRobin和Range

  • RoundRobinAssignor分配策略(轮询分区)
  • RangeAssignor 分配策略(范围分区) 默认采用 Range 范围分区

offset的维护

由于kafka可能出现故障,故障之后要恢复到上一次消费的位置,往下继续进行消费。因此consumer需要实时记录自己消费到了哪一个offset,从0.9版本开始,consumer默认将offset保存在kafka的内置topic中,该topic为_consumer_offsets(0.9版本前放在zk中)

Kafka的Producer深入

消息发送流程 【面试】

  • Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。
  • main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

相关参数:

  • batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。(默认 16384byte, 16KB)
  • linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。(默认为0)

参数详解

  • acks

    指的是producer的消息发送确认机制

  • retries

    生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

消息发送

  • 异步发送

    • 不带回调函数的 API

    • 带回调函数的 API

      回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试

  • 同步发送
    • 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。
    • 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。

消息发送流程:

producer->interceptor->serializer->partitioner -> RecordAccumulator(累加消息,批量batch.size, liner.ms)-> send -> kafka broker

  • acks: 生产者确认机制

    • 0: fire and forgot,快速放下,短信通知。消息发送完成不报错即可

    • 1: 快递需要签收,等leader 确认。消息到了leader, 还没同步给集群

  • all: 快递跟亲戚 确认,leader把消息同步给集群,之后才返回确认给生产者。 消息不会丢失

  • retries: 发送失败时会自动重试次数

  • 异步:异常时处理

Kafka的Consumer深入

工作原理 【面试】

  • Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
  • 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
  • 所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

offset(偏移量)

offset消费者消费到了第几个消息

  • 每次调用poll()方法,它会返回由生产者写入kafka但还没有被消费者读取过来的记录,我们由此可以追踪到哪些记录是被群组里的哪个消费者读取的,kafka不会像其他JMS队列那样需要得到消费者的确认,这是kafka的一个独特之处,相反,消费者可以使用kafka来追踪消息在分区的位置(偏移量)
  • 消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
  • 如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
  • 消费者处理完消息后再提交offset,有可能导致消息重复消费(解决:幂等)
  • 如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
  • 拉取到消息还没消费完就提交offset, 导致可能消息丢失。解决(消息回溯)

配置参数详解

  • enable.auto.commit

    • enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去。提交时间间隔有auto.commot.interval.ms控制,默认值是5秒。
  • 需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

  • auto.offset.reset

    • earliest

      当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

    • latest 默认

      当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

    • none

      topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

    • anything else

      向consumer抛出异常

自动提交offset

手动提交offset

虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。手动提交 offset 的方法有两种:

分别是 commitSync(同步提交)和 commitAsync(异步提交)。

  • 两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;
  • 不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);
  • 而 commitAsync 则没有失败重试机制,故有可能提交失败。
    • 同步提交 offset:由于同步提交 offset 有失败重试机制,故更加可靠
    • 异步提交 offset:虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

消息重复消费处理

幂等:不管执行多少次,结果都一样

  1. 判断数据的状态, status=1 需要审核, 如果status!=1不处理
  2. 只有一个消费者组,一个消息只会被一个消费者消费
  3. 加乐观锁,取消息记录版本,更新数据时判断版本是否一致,一致则更新,不一致则不更新数据, affected rows: 0代表没更新成功,其它消费者已经处理过了
  4. 发送消息时,给消息唯一标识, 当消费消息时,先检查这个唯一的id是否存在,存在则不处理,不存在则存入redis|mysql,开始处理。

发送及消费小结

  1. 消费流程:

    • client去请求消息数据
    • 消息存入一个队列里
    • 消费者另一线程去队列poll获取,最大500条消息
    • 反序列化
    • 拦截器
    • 业务处理, 提交offset
  2. offset: 记录消费者消费到第几个消息

  3. offset: 提交:auto, 手工

    • 如果提交的offset小于服务器维护的offset,则消息重复消费
    • 如果提交的offset大于服务器维护的offset,则消息丢失
  4. 消费者重平衡(消费者数量在组内有变化时, 加入新的组,消费者挂了重启)

    earliest, latest, none, anythiny else报错

Kafka 中,ZooKeeper 的作用是什么?

目前,Kafka 使用 ZooKeeper 存放集群元数据、成员管理、Controller 选举,以及其他一些管理类任务。之后,等 KIP-500 提案完成后,Kafka 将完全不再依赖 于 ZooKeeper。突出“目前”,以彰显你非常了解社区的演进计划。

  • 存放元数据”是指主题 分区的所有数据都保存在 ZooKeeper 中,且以它保存的数据为权威,其他“人” 都要与它 保持对齐。
  • 成员管理”是指 Broker 节点的注册、注销以及属性变更等
  • Controller 选举”是指选举集群 Controller
  • 其他管理类任务包括但不限于主题 删除、参数配置等。

不过,抛出 KIP-500 也可能是个双刃剑。碰到非常资深的面试官,他可能会进一步追问你 KIP-500 是做的。一言以蔽之:KIP-500 思想,是使用社区自研的基于 Raft 的共识算法, 替代 ZooKeeper,实现 Controller 自选举。

如何估算 Kafka 集群的机器数量?

这道题目考查的是机器数量和所用资源之间的关联关系。所谓资源,也就是 CPU、内存、磁盘和带宽。

  • 通常来说,CPU 和内存资源的充足是比较容易保证的,因此,你需要从磁盘空间和带宽占用两个维度去评估机器数量。
  • 在预估磁盘的占用时,你一定不要忘记计算副本同步的开销。如果一条消息占用1KB 的磁 盘空间,那么,在有 3 个副本的主题中,你就需要 3KB 的总空间来保存这条消息。显式地 将这些考虑因素答出来,能够彰显你考虑问题的全面性,是一个难得的加分项
  • 对于评估带宽来说,常见的带宽有 1Gbps 和 10Gbps,但你要切记,这两个数字仅仅是最大值。因此,你最好和面试官确认一下给定的带宽是多少。然后,明确阐述出当带宽占用接 近总带宽的 90% 时,丢包情形就会发生。这样能显示出你的网络基本功。

Kafka 为什么不支持读写分离?

这道题目考察的是你对 Leader/Follower 模型的思考。

  • Leader/Follower 模型并没有规定 Follower 副本不可以对外提供读服务。很多框架都是允 许这么做的,只是 Kafka 最初为了避免不一致性的问题,而采用了让 Leader 统一提供服 务的方式。
  • 不过,在开始回答这道题时,你可以率先亮出观点:自 Kafka 2.4 之后,Kafka 提供了有限度的读写分离,也就是说,Follower 副本能够对外提供读服务。

说完这些之后,你可以再给出之前的版本不支持读写分离的理由:

  • 场景不适用:读写分离适用于那种读负载很大,而写操作相对不频繁的场景,可Kafka 不属于这样的场景。
  • 同步机制:Kafka 采用 PULL 方式实现 Follower 的同步,因此,Follower 与 Leader存在不一致性窗口。如果允许读 Follower 副本,就势必要处理消息滞后(Lagging)的问题。
CATALOG
  1. 1. Kafka 是什么?
  2. 2. Kafka 的特点?
  3. 3. 什么是消费者组?
  4. 4. Kafka位移提交详解
    1. 4.1. 什么是位移提交,定义
    2. 4.2. 位移提交的分类
    3. 4.3. 自动提交和手动提交:
    4. 4.4. 同步提交和异步提交
  5. 5. Kafka 中消费者组与 Broker 之间的交互
    1. 5.1. Kafka中消费者的消费方式
    2. 5.2. Kafka的分区分配策略
    3. 5.3. offset的维护
  6. 6. Kafka的Producer深入
    1. 6.1. 消息发送流程 【面试】
    2. 6.2. 消息发送
      1. 6.2.0.1. 同步发送
  7. 6.3. 消息发送流程:
  • 7. Kafka的Consumer深入
    1. 7.1. 工作原理 【面试】
    2. 7.2. offset(偏移量)
    3. 7.3. 配置参数详解
    4. 7.4. 自动提交offset
    5. 7.5. 手动提交offset
  • 8. 消息重复消费处理
  • 9. 发送及消费小结
  • 10. Kafka 中,ZooKeeper 的作用是什么?
  • 11. 如何估算 Kafka 集群的机器数量?
  • 12. Kafka 为什么不支持读写分离?