WinddSnow

RabbitMQ

字数统计: 4.9k阅读时长: 17 min
2022/09/08

RabbitMQ

微服务通信

微服务间通讯有同步和异步两种方式:

  • 同步通讯:就像打电话,需要实时响应。-Feign调用
  • 异步通讯:就像发邮件,不需要马上回复 -消息队列
    两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。

事件模式

  • 在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。
  • 订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。
  • 为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。

MQ对比

几种常见MQ的对比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里(apache) Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 1万 高 10W 非常高 百万级
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般
  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ
  • 追求可靠性:RabbitMQ、RocketMQ
  • 追求吞吐能力:RocketMQ、Kafka
  • 追求消息低延迟:RabbitMQ、Kafka

为什么要使用 MQ?

  • 解耦
  • 异步
  • 削峰

RocketMQ 消费模式有几种?

消费模型由 Consumer 决定,消费维度为 Topic。

  • 集群消费
    • 一条消息只会被同 Group 中的一个 Consumer 消费
    • 多个 Group 同时消费一个 Topic 时,每个 Group 都会有一个 Consumer 消费到数据
  • 广播消费
    • 消息将对一 个 Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个 Consumer Group ,消息也会被 Consumer Group 中的每个Consumer 都消费一次。

RocketMQ 如何做负载均衡?

通过 Topic 在多 Broker 中分布式存储实现。

  1. producer 端

    发送端指定 message queue 发送消息到相应的 broker,来达到写入时的负载均衡

    • 提升写入吞吐量,当多个 producer 同时向一个 broker 写入数据的时候,性能会下降
    • 消息分布在多 broker 中,为负载消费做准备

    默认策略是随机选择:

    • producer 维护一个 index
    • 每次取节点会自增
    • index 向所有 broker 个数取余
    • 自带容错策略

    其他实现:

    • SelectMessageQueueByHash
    • hash 的是传入的 args
    • SelectMessageQueueByRandom
    • SelectMessageQueueByMachineRoom 没有实现
    • 也可以自定义实现 MessageQueueSelector 接口中的 select 方法
  2. consumer 端

    采用的是平均分配算法来进行负载均衡。

    其他负载均衡算法

    • 平均分配策略(默认)(AllocateMessageQueueAveragely)
    • 环形分配策略(AllocateMessageQueueAveragelyByCircle)
    • 手动配置分配策略(AllocateMessageQueueByConfig)
    • 机房分配策略(AllocateMessageQueueByMachineRoom)
    • 一致性哈希分配策略(AllocateMessageQueueConsistentHash)
    • 靠近机房策略(AllocateMachineRoomNearby)

当消费负载均衡 consumer 和 queue 不对等的时候会发生什么?

  • Consumer 和 queue 会优先平均分配,如果 Consumer 少于 queue 的个数,则会存在部分 Consumer 消费多个 queue 的情况,
  • 如果 Consumer 等于 queue 的个数,那就是一个 Consumer 消费一个 queue,
  • 如果 Consumer 个数大于 queue 的个数,那么会有部分 Consumer 空余出来,白白的浪费了。

消息重复消费如何解决?

影响消息正常发送和消费的重要原因是网络的不确定性。

正常情况下在 consumer 真正消费完消息后应该发送 ack,通知 broker 该消息已正常消费,从 queue 中剔除

当 ack 因为网络原因无法发送到 broker,broker 会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到 consumer。
消费模式:在 CLUSTERING 模式下,消息在 broker 中会保证相同 group 的 consumer 消费一次,但是针对不同 group 的 consumer 会推送多次

解决方案

  • 数据库表:处理消息前,使用消息主键在表中带有约束的字段中 insert
  • Map:单机时可以使用 map 做限制,消费时查询当前消息 id 是不是已经存在
  • Redis:使用分布式锁。

如何让 RocketMQ 保证消息的顺序消费

  1. 首先多个 queue 只能保证单个 queue 里的顺序,queue 是典型的 FIFO,天然顺序。
  2. 多个 queue 同时消费是无法绝对保证消息的有序性的。所以总结如下:
    • 同一 topic,同一个 QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个 queue 里的消息。

怎么保证消息发到同一个 queue?

  • Rocket MQ 给我们提供了 MessageQueueSelector 接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断 i % 2 == 0,那就都放到 queue1 里,否则放到 queue2 里。

角色

  • publisher:生产者 发布者
  • consumer:消费者 订阅者
  • exchange个:交换机,负责消息路由,转发消息给队列,注:如果找不到队列,则消息会丢失
  • queue:队列,存储消息,一旦被消费,消息就会被删除
  • virtualHost:虚拟主机,隔离不同用户|环境的exchange、queue、消息的隔离

publisher

  • 建立连接
  • 创建Channel
  • 声明队列
  • 发送消息
  • 关闭连接和channel

consumer

  • 建立连接
  • 创建Channel
  • 声明队列
  • 订阅消息

综述

基本消息队列的消息发送流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列的消息接收流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定

SpringAMQP

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

发布/订阅

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。
    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

Fanout(分裂模式)

在广播模式下,消息发送流程是这样的:

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

Direct(路由模式)

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

Topic(主题模式)

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割

消息可靠性

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

RabbitMQ解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

  • SpringAMQP则允许配置三种确认模式:
    •manual:手动ack,需要在业务代码结束后,调用api发送ack。
    •auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
    •none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
    • returncallback, RabbitTemplate.setReturnCallback 交换机到队列时失败
    • callback, correlatedData.getFuture().addCallback() 生产者到交换机失败, 异常
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
    • 交换机 springAMQP 默认
    • 队列 springAMQP 默认
    • 消息 springAMQP 默认持久化 MessageBuilder.withBody(xxxx).setDeliverMode(xxx.PERSISTENT)
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
    • none 关闭,丢弃
    • auto 出异常,重入。辅以重试策略
    • manual 手工确认。channel.basicAck|basicNack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

RocketMQ 如何保证消息不丢失

首先在如下三个部分都可能会出现丢失消息的情况:

  • Producer 端
  • Broker 端
  • Consumer 端
  1. Producer 端如何保证消息不丢失
    • 采取 send()同步发消息,发送结果是同步感知的。
    • 发送失败后可以重试,设置重试次数。默认 3 次。
    • producer.setRetryTimesWhenSendFailed(10);
    • 集群部署,比如发送失败了的原因可能是当前 Broker 宕机了,重试的时候会发送到其他 Broker 上。
  2. Broker 端如何保证消息不丢失
  • 修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
  • flushDiskType = SYNC_FLUSH
  • 集群部署,主从模式,高可用。
  1. Consumer 端如何保证消息不丢失
    • 完全消费正常后在进行手动 ack 确认。

RocketMQ 的消息堆积如何处理

  1. 如果可以添加消费者解决,就添加消费者的数据量.
  2. 如果出现了 queue,但是消费者多的情况。可以使用准备一个临时的 topic,同时创建一些 queue,在临时创建一个消费者来把这些消息转移到 topic 中,让消费者消费。

RocketMQ 如何实现分布式事务?

  1. 生产者向 MQ 服务器发送 half 消息。
  2. half 消息发送成功后,MQ 服务器返回确认消息给生产者。
  3. 生产者开始执行本地事务。
  4. 根据本地事务执行的结果(UNKNOW、commit、rollback)向 MQ Server 发送提交或回滚消息。
  5. 如果错过了(可能因为网络异常、生产者突然宕机等导致的异常情况)提交/回滚消息,则 MQ 服务器将向同一组中的每个生产者发送回查消息以获取事务状态。
  6. 回查生产者本地事物状态。
  7. 生产者根据本地事务状态发送提交/回滚消息。
  8. MQ 服务器将丢弃回滚的消息,但已提交(进行过二次确认的 half 消息)的消息将投递给消费者进行消费。
  • Half Message : 预 处 理 消 息 , 当 broker 收 到 此 类 消 息 后 , 会 存 储 到RMQ_SYS_TRANS_HALF_TOPIC 的消息消费队列中
  • 检查事务状态:Broker 会开启一个定时任务,消费 RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker 会定时去回调在重新检查。
  • 超时:如果超过回查次数,默认回滚消息。
    也就是他并未真正进入 Topic 的 queue,而是用了临时 queue 来放所谓的 half message,等提交事务后才会真正的将 half message 转移到 topic 下的 queue。

任何一台 Broker 突然宕机了怎么办?

Broker 主从架构以及多副本策略。Master 收到消息后会同步给 Slave,这样一条消息就不止一份了,Master 宕机了还有 slave 中的消息可用,保证了 MQ 的可靠性和高可用性。而且 Rocket MQ4.5.0 开始就支持了 Dlegder 模式,基于 raft 的,做到了真正意义的 HA。

死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递
    如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

什么样的消息会成为死信?

  • 消息被消费者reject或者返回nack
  • 消息超时未消费
  • 队列满了
    死信交换机的使用场景是什么?
  • 如果队列绑定了死信交换机,死信会投递到死信交换机;
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

消息超时的两种方式是?

  • 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
    QueueBuilder.ttl()毫秒
  • 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
    MessageBuilder.setExpiration(字符串数值) 毫秒

如何实现发送一个消息20秒后消费者才收到消息?

  • 给消息的目标队列指定死信交换机 -> 让消息变成死信
  • 将消费者监听的队列绑定到死信交换机绑定的队列
  • 发送消息时给消息设置超时时间为20秒 【推荐】

延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

惰性队列

消息堆积问题

  • 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
    解决消息堆积有两种思路:
  • 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
  • 扩大队列容积,提高堆积上限
  • 队列过长的话会占用系统较多内存,RabbitMQ为了释放内存,会将队列消息转储到硬盘,称为 page out 。 如果队列很长,Page out 操作会消耗较长时间,且page out 过程中队列不能处理消息。因此会出现间歇性的暂停状态、并发时出现波浪性的忽高忽低现象

  • 队列过长同时会加长RabbitMQ重启时间,因为启动时候需要重建索引。

  • 队列过长还会导致集群之间节点同步消息时间变长。

Lazy Queues的概念,也就是惰性队列

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

惰性队列的优点缺点?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定
    缺点:
  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

MQ集群

RabbitMQ的集群有两种模式:

普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

普通集群

或者叫标准集群(classic cluster),具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机,队列中的消息就会丢失

镜像集群

本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
  • 一个队列的主节点可能是另一个队列的镜像节点
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主

仲裁队列

仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致
CATALOG
  1. 1. RabbitMQ
    1. 1.1. 微服务通信
    2. 1.2. 事件模式
    3. 1.3. MQ对比
    4. 1.4. 为什么要使用 MQ?
    5. 1.5. RocketMQ 消费模式有几种?
    6. 1.6. RocketMQ 如何做负载均衡?
    7. 1.7. 当消费负载均衡 consumer 和 queue 不对等的时候会发生什么?
    8. 1.8. 消息重复消费如何解决?
    9. 1.9. 如何让 RocketMQ 保证消息的顺序消费
    10. 1.10. 角色
    11. 1.11. publisher
    12. 1.12. consumer
    13. 1.13. 综述
    14. 1.14. SpringAMQP
    15. 1.15. 发布/订阅
    16. 1.16. Fanout(分裂模式)
    17. 1.17. Direct(路由模式)
    18. 1.18. Topic(主题模式)
    19. 1.19. 消息可靠性
      1. 1.19.1. 生产者消息确认
      2. 1.19.2. 消息持久化
      3. 1.19.3. 消费者消息确认
    20. 1.20. 如何确保RabbitMQ消息的可靠性?
    21. 1.21. RocketMQ 如何保证消息不丢失
    22. 1.22. RocketMQ 的消息堆积如何处理
    23. 1.23. RocketMQ 如何实现分布式事务?
    24. 1.24. 任何一台 Broker 突然宕机了怎么办?
    25. 1.25. 死信交换机
      1. 1.25.1. 什么是死信?
      2. 1.25.2. 什么样的消息会成为死信?
      3. 1.25.3. 消息超时的两种方式是?
      4. 1.25.4. 如何实现发送一个消息20秒后消费者才收到消息?
    26. 1.26. 延迟队列
    27. 1.27. 惰性队列
      1. 1.27.1. 消息堆积问题
      2. 1.27.2. Lazy Queues的概念,也就是惰性队列
      3. 1.27.3. 惰性队列的优点缺点?
    28. 1.28. MQ集群
      1. 1.28.1. RabbitMQ的集群有两种模式:
      2. 1.28.2. 普通集群
      3. 1.28.3. 镜像集群
      4. 1.28.4. 仲裁队列