Kafka 是什么?
Kafka 是一款分布式流处理框架,用于实时构建流处理应用。它有一个核心 的功能广为人知,即作为企业级的消息引擎被广泛使用。
明确Kafka 的流处理框架地位
Kafka 的特点?
- 高吞吐量、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒
- 可扩展性:kafka 集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为 n,则允许 n-1 个节点失败)
- 高并发:支持数千个客户端同时读写
什么是消费者组?
消费者组是 Kafka 独有的概念,如果面试官问这 个,就说明他对此是有一定了解的。我先给出标准答案:
- 定义:即消费者组是 Kafka 提供的可扩展且具有容错性的消费者机制。
- 原理:在 Kafka 中,消费者组是一个由多个消费者实例 构成的组。多个实例共同订阅若干个主题,实现共同消费。同一个组下的每个实例都配置有 相同的组 ID,被分配不同的订阅分区。当某个实例挂掉的时候,其他实例会自动地承担起 它负责消费的分区。
面试小技巧:消费者组的题目,能够帮你在某种程度上掌控下面的面试方向:
- 如果你擅长位移值原理,就不妨再提一下消费者组的位移提交机制;
- 如果你擅长 Kafka Broker,可以提一下消费者组与 Broker 之间的交互;
- 如果你擅长与消费者组完全不相关的 Producer,那么就可以这么说:“消费者组要消费的数据完全来自于 Producer 端生产的消息,我对 Producer 还是比较熟悉的。”
Kafka位移提交详解
什么是位移提交,定义
- Consumer需要向kafka汇报自己的位移数据,这个汇报过程称为位移提交。因为Consumer可以同时消费多个分区。所以位移提交是按照分区的粒度进行的。即Consumer需要为分配给他的每个分区提交各自的位移数据。
- 作用是:位移提交表示了Consumer的消费进度。这样当Consumer发生故障重启后,就可以读取之前提交的位移,然后从指定位移开始继续消费。类似于书签。
- 我们要知道,位移的提交完全由我们掌控,我们可以提交任何位移值,但是我们也就需要为我们提交的位移负责。
位移提交的分类
- 用户角度分类
- 自动提交
- 手动提交
- Consumer角度
- 同步提交
- 异步提交
自动提交和手动提交:
- 我们将Consumer端的参数:enable.auto.commit设置为true,就开启了自动提交。如果我们开启了自动提价哪还有一个参数:auto.commit.interval.ms 它是自动提交的时间间隔,默认是5秒钟。
- 我们开启了自动提交之后就不需要对提交位移进行管理了,并且kafka会保证在开始调用Poll方法时,提交上一次poll返回的所有消息。从顺序上说,是先提价上一批消息的位移,再处理新的消息。因此他能保证消息不丢失,但是会出现重复的情况。
- 也就是,Consumer是每5秒提价一次位移,如果,Rebalance发生在第三秒的时候。这样当重启之后,前三秒已经消费过的消息就会被重新消费,因为位移数据还是三秒前的位移数据。虽然你可以通过减少提交位移的时间间隔,但是并不能完全避免重复消费,只能减少重复消费的数据。
- 手动提交:只要将上面的自动提交的参数设置成false就可以了。
同步提交和异步提交
- 手动提交更加灵活,但是,有一个缺陷:调用commitSync()同步提交时,Consumer会处于阻塞状态。会影响应用程序的TPS。因为这个问题,又出现了另一个API:commitAsync()异步提交。
- 异步提交不会阻塞Consumer但是它没有重试机制,不能保证提交一定会成功。所以通常情况下是异步提交和同步提交一起使用,先用异步提交进行阶段性提交避免阻塞,然后在Consumer要关闭时再用同步提交最后提交一次,确保Consumer位移的正确性。
- kafka Consumer API还为手动提交提供了这样的方法:
commitSync(Map<TopicPartition,OffsetAndMetadata>)和commitAsync(Map<TopicParttion,OffsetAndMetadata>)它们的参数是一个Map对象,键是TopicPartition(消费的分区),值是:OffsetAndMetadata对象(主要保存的是位移数据)。 - 它可以将一个大的事务分割成若干个小的事务分别提交,能有效的减少错误恢复的时间。
Kafka 中消费者组与 Broker 之间的交互
Kafka中消费者的消费方式
consumer采用pull(拉)模式从broker中读取数据。 拉取模式也有不足,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,kafka消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间后再返回,这段时长即为timeout
Kafka的分区分配策略
一个消费者组中有多个消费者,一个broker有多个分区,所有必然会涉及到分区分配问题,即确定哪一个分区由哪一个consumer来消费。kafka有两种分区分配策略:RoundRobin和Range
- RoundRobinAssignor分配策略(轮询分区)
- RangeAssignor 分配策略(范围分区) 默认采用 Range 范围分区
offset的维护
由于kafka可能出现故障,故障之后要恢复到上一次消费的位置,往下继续进行消费。因此consumer需要实时记录自己消费到了哪一个offset,从0.9版本开始,consumer默认将offset保存在kafka的内置topic中,该topic为_consumer_offsets(0.9版本前放在zk中)
Kafka的Producer深入
消息发送流程 【面试】
- Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。
- main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。
相关参数:
- batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。(默认 16384byte, 16KB)
- linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。(默认为0)
参数详解
acks
指的是producer的消息发送确认机制
retries
生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
消息发送
异步发送
不带回调函数的 API
带回调函数的 API
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试
同步发送
- 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。
- 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。
消息发送流程:
producer->interceptor->serializer->partitioner -> RecordAccumulator(累加消息,批量batch.size, liner.ms)-> send -> kafka broker
acks: 生产者确认机制
0: fire and forgot,快速放下,短信通知。消息发送完成不报错即可
1: 快递需要签收,等leader 确认。消息到了leader, 还没同步给集群
all: 快递跟亲戚 确认,leader把消息同步给集群,之后才返回确认给生产者。 消息不会丢失
retries: 发送失败时会自动重试次数
异步:异常时处理
Kafka的Consumer深入
工作原理 【面试】
- Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
- 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
- 所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。
offset(偏移量)
offset消费者消费到了第几个消息
- 每次调用poll()方法,它会返回由生产者写入kafka但还没有被消费者读取过来的记录,我们由此可以追踪到哪些记录是被群组里的哪个消费者读取的,kafka不会像其他JMS队列那样需要得到消费者的确认,这是kafka的一个独特之处,相反,消费者可以使用kafka来追踪消息在分区的位置(偏移量)
- 消费者会往一个叫做
_consumer_offset
的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。 - 如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
- 消费者处理完消息后再提交offset,有可能导致消息重复消费(解决:幂等)
- 如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
- 拉取到消息还没消费完就提交offset, 导致可能消息丢失。解决(消息回溯)
配置参数详解
enable.auto.commit
- 当
enable.auto.commit
被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去。提交时间间隔有auto.commot.interval.ms
控制,默认值是5秒。
- 当
需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
auto.offset.reset
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest 默认
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
anything else
向consumer抛出异常
自动提交offset
手动提交offset
虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。手动提交 offset 的方法有两种:
分别是 commitSync(同步提交)和 commitAsync(异步提交)。
- 两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;
- 不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);
- 而 commitAsync 则没有失败重试机制,故有可能提交失败。
- 同步提交 offset:由于同步提交 offset 有失败重试机制,故更加可靠
- 异步提交 offset:虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
消息重复消费处理
幂等:不管执行多少次,结果都一样
- 判断数据的状态, status=1 需要审核, 如果status!=1不处理
- 只有一个消费者组,一个消息只会被一个消费者消费
- 加乐观锁,取消息记录版本,更新数据时判断版本是否一致,一致则更新,不一致则不更新数据, affected rows: 0代表没更新成功,其它消费者已经处理过了
- 发送消息时,给消息唯一标识, 当消费消息时,先检查这个唯一的id是否存在,存在则不处理,不存在则存入redis|mysql,开始处理。
发送及消费小结
消费流程:
- client去请求消息数据
- 消息存入一个队列里
- 消费者另一线程去队列poll获取,最大500条消息
- 反序列化
- 拦截器
- 业务处理, 提交offset
offset: 记录消费者消费到第几个消息
offset: 提交:auto, 手工
- 如果提交的offset小于服务器维护的offset,则消息重复消费
- 如果提交的offset大于服务器维护的offset,则消息丢失
消费者重平衡(消费者数量在组内有变化时, 加入新的组,消费者挂了重启)
earliest, latest, none, anythiny else报错
Kafka 中,ZooKeeper 的作用是什么?
目前,Kafka 使用 ZooKeeper 存放集群元数据、成员管理、Controller 选举,以及其他一些管理类任务。之后,等 KIP-500 提案完成后,Kafka 将完全不再依赖 于 ZooKeeper。突出“目前”,以彰显你非常了解社区的演进计划。
- 存放元数据”是指主题 分区的所有数据都保存在 ZooKeeper 中,且以它保存的数据为权威,其他“人” 都要与它 保持对齐。
- 成员管理”是指 Broker 节点的注册、注销以及属性变更等
- Controller 选举”是指选举集群 Controller
- 其他管理类任务包括但不限于主题 删除、参数配置等。
不过,抛出 KIP-500 也可能是个双刃剑。碰到非常资深的面试官,他可能会进一步追问你 KIP-500 是做的。一言以蔽之:KIP-500 思想,是使用社区自研的基于 Raft 的共识算法, 替代 ZooKeeper,实现 Controller 自选举。
如何估算 Kafka 集群的机器数量?
这道题目考查的是机器数量和所用资源之间的关联关系。所谓资源,也就是 CPU、内存、磁盘和带宽。
- 通常来说,CPU 和内存资源的充足是比较容易保证的,因此,你需要从磁盘空间和带宽占用两个维度去评估机器数量。
- 在预估磁盘的占用时,你一定不要忘记计算副本同步的开销。如果一条消息占用1KB 的磁 盘空间,那么,在有 3 个副本的主题中,你就需要 3KB 的总空间来保存这条消息。显式地 将这些考虑因素答出来,能够彰显你考虑问题的全面性,是一个难得的加分项
- 对于评估带宽来说,常见的带宽有 1Gbps 和 10Gbps,但你要切记,这两个数字仅仅是最大值。因此,你最好和面试官确认一下给定的带宽是多少。然后,明确阐述出当带宽占用接 近总带宽的 90% 时,丢包情形就会发生。这样能显示出你的网络基本功。
Kafka 为什么不支持读写分离?
这道题目考察的是你对 Leader/Follower 模型的思考。
- Leader/Follower 模型并没有规定 Follower 副本不可以对外提供读服务。很多框架都是允 许这么做的,只是 Kafka 最初为了避免不一致性的问题,而采用了让 Leader 统一提供服 务的方式。
- 不过,在开始回答这道题时,你可以率先亮出观点:自 Kafka 2.4 之后,Kafka 提供了有限度的读写分离,也就是说,Follower 副本能够对外提供读服务。
说完这些之后,你可以再给出之前的版本不支持读写分离的理由:
- 场景不适用:读写分离适用于那种读负载很大,而写操作相对不频繁的场景,可Kafka 不属于这样的场景。
- 同步机制:Kafka 采用 PULL 方式实现 Follower 的同步,因此,Follower 与 Leader存在不一致性窗口。如果允许读 Follower 副本,就势必要处理消息滞后(Lagging)的问题。