1、获得 Topic-Broker 的映射关系。
Producer 启动时,也需要指定 Namesrv 的地址,从 Namesrv 集群中选一台建立长连接。如果该Namesrv 宕机,会自动连其他 Namesrv ,直到有可用的 Namesrv 为止。
生产者每 30 秒从 Namesrv 获取 Topic 跟 Broker 的映射关系,更新到本地内存中。然后再跟 Topic 涉及的所有 Broker 建立长连接,每隔 30 秒发一次心跳。
在 Broker 端也会每 10 秒扫描一次当前注册的 Producer ,如果发现某个 Producer 超过 2 分钟都没有发心跳,则断开连接。2、生产者端的负载均衡。
生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上。
这里需要注意一点:假如某个 Broker 宕机,意味生产者最长需要 30 秒才能感知到。在这期间会向宕机的 Broker 发送消息。当一条消息发送到某个 Broker 失败后,会自动再重发 2 次,假如还是发送失败,则抛出发送失败异常。
客户端里会自动轮询另外一个 Broker 重新发送,这个对于用户是透明的。
Producer 发送消息有几种方式?
Producer 发送消息,有三种方式:1. 同步方式2. 异步方式3. Oneway 方式
其中,方式 1 和 2 比较常见,具体使用哪一种方式需要根据业务情况来判断。而方式 3 ,适合大数据场景,允许有一定消息丢失的场景。
请说说你对 Consumer 的了解?
1、获得 Topic-Broker 的映射关系。
Consumer 启动时需要指定 Namesrv 地址,与其中一个 Namesrv 建立长连接。消费者每隔 30 秒从Namesrv 获取所有Topic 的最新队列情况,这意味着某个 Broker 如果宕机,客户端最多要 30 秒才能感知。连接建立后,从 Namesrv 中获取当前消费 Topic 所涉及的 Broker,直连 Broker 。
Consumer 跟 Broker 是长连接,会每隔 30 秒发心跳信息到Broker 。Broker 端每 10 秒检查一次当前存活的 Consumer ,若发现某个 Consumer 2 分钟内没有心跳,就断开与该 Consumer 的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡。2、消费者端的负载均衡。根据消费者的消费模式不同,负载均衡方式也不同。消费者有两种消费模式:集群消费和广播消费。
集群消费:一个 Topic 可以由同一个消费这分组( Consumer Group )下所有消费者分担消费。 具体例子:假如 TopicA 有 6 个队列,某个消费者分组起了 2 个消费者实例,那么每个消费者负责消费 3 个队列。如果再增加一个消费者分组相同消费者实例,即当前共有 3 个消费者同时消费 6 个队列,那每个消费者负责 2 个队列的消费。广播消费:每个消费者消费 Topic 下的所有队列。
消费者消费模式有几种?
消费者消费模式有两种:集群消费和广播消费。
?? 1. 集群消费
消费者的一种消费模式。一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。
实际上,每个 Consumer 是平均分摊 Message Queue 的去做拉取消费。例如某个 Topic 有 3 个队列,其中一个 Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 1 个队列。
而由 Producer 发送消息的时候是轮询所有的队列,所以消息会平均散落在不同的队列上,可以认为队列上的消息是平均的。那么实例也就平均地消费消息了。这种模式下,消费进度的存储会持久化到 Broker 。
当新建一个 Consumer Group 时,默认情况下,该分组的消费者会从 min o?set 开始重新消费消息。?? 2. 广播消费
消费者的一种消费模式。消息将对一 个Consumer Group 下的各个 Consumer 实例都投递一遍。即即使这些Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了 Topic 下面的每个 Message Queue 去拉取消费。所以消息会投递到每个消费者实例。
这种模式下,消费进度会存储持久化到实例本地。
消费者获取消息有几种模式?
消费者获取消息有两种模式:推送模式和拉取模式。?? 1. PushConsumer
推送模式(虽然 RocketMQ 使用的是长轮询)的消费者。消息的能及时被消费。使用非常简单,内部已处理如线程池消费、流控、负载均衡、异常处理等等的各种场景。
长轮询,就是我们在 《 精尽【消息队列 】面试题》 提到的,push + pull 模式结合的方式。?? 2. PullConsumer
拉取模式的消费者。应用主动控制拉取的时机,怎么拉取,怎么消费等。主动权更高。但要自己处理各种场景。
决绝绝大多数场景下,我们只会使用 PushConsumer 推送模式。?? 至少目前,暂时还没用过 PullConsumer 。
如何对消息进行重放?
消费位点就是一个数字,把 Consumer O?set 改一下,就可以达到重放的目的了。
什么是顺序消息?如何实现?
消费消息的顺序要同发送消息的顺序一致。由于 Consumer 消费消息的时候是针对 Message Queue 顺序拉取并开始消费,且一条 Message Queue 只会给一个消费者(集群模式下),所以能够保证同一个消费者实例对于Queue 上消息的消费是顺序地开始消费(不一定顺序消费完成,因为消费可能并行)。
Consumer :在 RocketMQ 中,顺序消费主要指的是都是 Queue 级别的局部顺序。这一类消息为满足顺序
性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送的顺序去消费消息。
Producer :生产者发送的时候可以用 MessageQueueSelector 为某一批消息(通常是有相同的唯一标示id)选择同一个 Queue ,则这一批消息的消费将是顺序消息(并由同一个consumer完成消息)。或者 MessageQueue 的数量只有 1 ,但这样消费的实例只能有一个,多出来的实例都会空跑。当然,上面的文字比较绕,总的来说,RocketMQ 提供了两种顺序级别:
普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。严格顺序消息 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。也就说,顺序消息包括两块:Producer 的顺序发送,和 Consumer 的顺序消费。
?? 1. 普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生异常,Broker 宕机或重启,由于队列总数发生发化,消费者会触发负载均衡,而默认地负载均衡算法采取哈希取模平均,这样负载均衡分配到定位的队列会发化,使得队列可能分配到别的实例上,则会短暂地出现消息顺序不一致。
如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
?? 2. 严格顺序消息
顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。
如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前并未实现)?? 小结
目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。?? 实现原理
顺序消息的实现,相对比较复杂,想要深入理解的胖友,可以看看 《RocketMQ 源码分析 —— Message 顺序发送与消费》 。
顺序消息扩容的过程中,如何在不停写的情况下保证消息顺序?
1. 成倍扩容,实现扩容前后,同样的 key,hash 到原队列,或者 hash 到新扩容的队列。2. 扩容前,记录旧队列中的最大位点。
3. 对于每个 Consumer Group ,保证旧队列中的数据消费完,再消费新队列,也即:先对新队列进行禁读即可。
什么是定时消息?如何实现?
定时消息,是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
目前,开源版本的 RocketMQ 只支持固定延迟级别的延迟消息,不支持任一时刻的延迟消息。如下表格:
延迟级别123456789101112131415161718
时间1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h
可通过配置文件,自定义每个延迟级别对应的延迟时间。当然,这是全局的。
如果胖友想要实现任一时刻的延迟消息,比较简单的方式是插入延迟消息到数据库中,然后通过定时任务轮询,到达指定时间,发送到 RocketMQ 中。?? 实现原理
1、 定时消息发送到 Broker 后,会被存储 Topic 为 SCHEDULE_TOPIC_XXXX 中,并且所在 Queue 编号为延迟级别 - 1 。
需要 -1 的原因是,延迟级别是从 1 开始的。如果延迟级别为 0 ,意味着无需延迟。
2、Broker 针对每个 SCHEDULE_TOPIC_XXXX 的队列,都创建一个定时任务,顺序扫描到达时间的延迟消息,重新存储到延迟消息原始的 Topic 的原始 Queue 中,这样它就可以被 Consumer 消费到。此处会有两个问题:
为什么是“顺序扫描到达时间的延迟消息”?因为先进 SCHEDULE_TOPIC_XXXX 的延迟消息,在其所在的队列,意味着先到达延迟时间。
会不会存在重复扫描的情况?每个 SCHEDULE_TOPIC_XXXX 的扫描进度,会每 10s 存储到
config/delayOffset.json 文件中,所以正常情况下,不会存在重复扫描。如果异常关闭,则可能导
致重复扫描。详细的,胖友可以看看 《RocketMQ 源码分析 —— 定时消息与消息重试》 。
什么是消息重试?如何实现?
消息重试,Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。
Consumer 会将消费失败的消息发回 Broker,进入延迟消息队列。即,消费失败的消息,不会立即消费。也就是说,消息重试是构建在定时消息之上的功能。?? 消息重试的主要流程
1. Consumer 消费失败,将消息发送回 Broker 。2. Broker 收到重试消息之后置换 Topic ,存储消息。3. Consumer 会拉取该 Topic 对应的 retryTopic 的消息。
4. Consumer 拉取到 retryTopic 消息之后,置换到原始的 Topic ,把消息交给 Listener 消费。这里,可能有几个点,胖友会比较懵逼,简单解释下:
1. Consumer 消息失败后,会将消息的 Topic 修改为 %RETRY% + Topic 进行,添加 \ 属性为原始 Topic ,然后再返回给 Broker 中。
2. Broker 收到重试消息之后,会有两次修改消息的 Topic 。
首先,会将消息的 Topic 修改为 %RETRY% + ConsumerGroup ,因为这个消息是当前消费这分组消费失败,只能被这个消费组所重新消费。?? 注意噢,消费者会默认订阅 Topic 为 %RETRY% +ConsumerGroup 的消息。
然后,会将消息的 Topic 修改为 SCHEDULE_TOPIC_XXXX ,添加 \ 属性为 %RETRY% +ConsumerGroup ,因为重试消息需要延迟消费。3. Consumer 会拉取该 Topic 对应的 retryTopic 的消息,此处的 retryTopic 为 %RETRY% + ConsumerGroup。
4. Consumer 拉取到 retryTopic 消息之后,置换到原始的 Topic ,因为有消息的 \ 属性是原始Topic ,然后把消息交给 Listener 消费。
?? 有一丢丢复杂,胖友可以在思考思考~详细的,胖友可以看看 《RocketMQ 源码分析 —— 定时消息与消息重试》 。
多次消费失败后,怎么办?
默认情况下,当一条消息被消费失败 16 次后,会被存储到 Topic 为 \ + ConsumerGroup 到死信队列。为什么 Topic 是 \ + ConsumerGroup 呢?因为,是这个 ConsumerGroup 对消息的消费失败,所以Topic 里要以 ConsumerGroup 为维度。
后续,我们可以通过订阅 \ + ConsumerGroup ,做相应的告警。
什么是事务消息?如何实现?
关于事务消息的概念和原理,胖友可以看看官方对这块的解答,即 《RocketMQ 4.3 正式发布,支持分布式事务》的 「四 事务消息」 小节。