• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RocketMQ消息不均衡的和解决

武飞扬头像
书唐瑞
帮助2



此篇文章不仅仅在于分析RocketMQ的消息不均衡原因与解决思路, 更在于分析MQ消息不均衡的原因和解决思路, 只是拿RocketMQ作为例子.

从生产者角度考虑,是不是生产者发送消息的时候,只钟爱某几个Queue,只将消息发送到个别Queue中,才造成消息倾斜呢?
这种情况可能会发生,但概率比较低. 而且也不是生产者只钟爱某几个Queue, 多半情况是某些条件不满足了, 才导致把消息发送给了某些Queue中(例如个别broker延迟比较大,则生产者把消息发送给了延时较小的broker上的Queue). RocketMQ是按照Queue队列轮询的方式将消息发送出去的, 保证生产者(皇帝)能临幸所有的嫔妃(指Queue), 当然RocketMQ也考虑了发送消息延迟的情况, 尽量将消息发送到消息延迟低的broker上. 即便发送顺序消息, 基于key的哈希值, 也基本可以认为消息是均衡发送到各个Queue中的.

假如我们有2个broker,topic有4个队列, 那么我们看下,作为生产者它拿到的队列信息长啥样.
学新通
因为每个broker有4个队列, 生产者获取到了8个队列. 而且队列的顺序不是杂乱的, 是按照相同的broker,不同的queueId的顺序排列的.

假如不存在消息发送延迟的情况, TOPIC-A有4个队列, 则broker1和broker2各有4个队列, 那么生产者发送消息的时候,先发送给broker1的4个队列,然后再发送给broker2的4个队列,之后再次发送给broker1的4个队列,循环往复进行…

【正常发送消息情况】

学新通

如上图所示,生产者依次将消息发送给broker1-0,broker1-1,broker1-2,broker1-3,broker2-0,broker2-1,broker2-2,broker2-3的Queue队列中,循环往复进行…


【发送失败情况】

学新通

如上图所示,假如向broker2的某个队列发送消息的时候,发送失败了,根据重试机制,那么生产者会从broker1中选择一个队列进行发送,暂时不再向broker2发送消息. 这种情况,似乎会造成消息倾斜的可能,毕竟生产者把消息发送到了一个broker上,接下来讲解消费端的时候再说如何应对这种情况.

如果只有一个broker2, 没有broker1,即便发送消息给broker2失败了,也只能从broker2中再选择一个其他的Queue进行重试发送.



【延迟容错情况】

生产者每次发送消息之后,都会记录消息发送给每个broker的延迟情况. 假如准备向broker2的某个队列发送消息的时候, 发现broker2的延迟比较大, 那么生产者会暂时舍弃broker2, 而是从broker1中选择一个队列进行发送, 和上图一样, 这种情况,似乎也会造成消息倾斜的可能,毕竟生产者把消息发送到了一个broker上,接下来讲解消费端的时候再说如何应对这种情况.

【总结】

总之,由于生产者导致消息不均衡的概率很低, 而且也不是生产者有意为之让消息不均衡,肯定是某些异常情况,才导致生产者临时叛变, 改变了发送消息的方向. 相对于消费端, 可以忽略生产者导致消息不均衡的情况了. 可以认为生产者是均匀的将消息发送到不同的Queue中的.

在排查消息不均衡的情况时, 可以排查一下是否存在消息发送失败发送延迟的情况. 而且即便由于这2个原因造成发送消息时出现消息发送不均衡的情况, 在消费端也可以避免.

生产者尽量会保证消息发送,从全局的Queue队列来看都是均衡的, 消息会均衡的发送到每个Queue. 即便某个broker出现了问题, 也会保证消息发送到其他broker的队列也是均衡的.

RocketMQ消息投递的具体策略在 org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue 方法中. 生产者发送消息的时候, 会调用到这个方法, 选择对应的队列.

【特别注意】

学新通如上图所示, 在RocketMQ中可以配置读写队列, 这是个很关键的配置. 当写队列数量=8, 大于读队列数量=5时, 由于生产者会将消息发送到8个写队列中, 消费者会从5个读队列中读取消息, 就会出现下图惨案

学新通


有3个队列的消息永远无法被消费. 所以一定要保证读队列数量>=写队列数量, 最佳实践是读队列数量=写队列数量. 如果读队列数量>写队列数量就会出现消息不均衡的情况, 如下图所示
学新通

5个写队列, 8个读队列, 虽然2个消费者平均分配8个读队列, 各4个队列, 但是消费者2分配的4个读队列中, 有3个队列永远也不会有数据, 无形之中就造成了消息不均衡的惨状.

RocketMQ设置读写队列数的目的在于方便队列的缩容和扩容, 但是在我们系统稳定运行过程中, 需要把读写队列数量设置成相等.

【总结】
1.写队列数量大于读队列数量, 会导致部分消息永远无法被消费
2.写队列数量小于读队列数量, 会导致消息不均衡
3.因此必须配置写队列数量等于读队列数量(在不需要缩容或扩容的前提下)



以上介绍了生产者发送消息和读写队列数量的配置, 可能会导致消息不均衡, 以及解决办法, 接下来分析消费端如何造成消息不均衡的情况.

我们三假设
1.假设生产者发送的消息都是均衡的发送到不同的Queue队列中.
2.假设每个消费者的消费能力没有太大差别.
3.假设读写队列数量相等.

说明一点的是,消息不均衡,即便增加消费者,也无法解决消息不均衡.

我亲自经历的,以及我知道别人咨询阿里的结论, 阿里工程师针对消息不均衡的情况,是让你增加消费者的消费能力,其实是不全对的,甚至是无法解决消息不均衡的.

假如我们有2个broker,topic有4个队列, 有2个消费者,如下图所示

学新通


如上图,共有8个队列,那么这8个队列该如何分配给2个消费者呢?
常规的方案就是平均分配, 8个队列,2个消费者, 那么每个消费者分配4个队列, 如下图所示

学新通

上面说的这个策略对应的类是AllocateMessageQueueAveragely

在RocketMQ的源码中,默认就是指定了AllocateMessageQueueAveragely策略.

public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {

	/**
     * Queue allocation algorithm
     */
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();

}

看似完美,而且这也是RocketMQ消费端默认的分配队列的情况. 然而它却有着天生的缺陷. 假如此时生产者有4条消息需要发送, 按照轮询的策略,生产者会把消息发送给broker1的queueId=0,1,2,3 这4个队列, 意味着只有消费者1可以消费到消息, 而消费者2一直空闲. 过了一会, 生产者再发送4条消息, 会把消息发送给broker2的queueId=0,1,2,3 这4个队列, 意味着只有消费者2可以消费到消息, 而消费者1变成了一直空闲. 就会造成一个消费者忙碌一个消费者空闲的情况, 间接出现了消息不均衡情况. 如果出现了上面说的生产者发送失败或发送延迟的情况, 可能消息都会发送到broker1上, 造成消费者1一直忙碌, 而消费者2无所事事, 间接出现了消息不均衡情况.

AllocateMessageQueueAveragely策略, 从全局看的话, 队列是平均分配给每个消费者的, 但是从局部看的话, 队列没有平均分配给每个消费者. 这就是关键所在.


其实生产者没有过失, 在队列可用,消息发送延迟时间可控的情况下, 生产者一直是致力于把消息均衡发送给每个队列, 即便broker2已经不可使用了,在局部看来, 生产者依然是将消息均衡的发送到broker1的每个队列上. 之所以导致消息不均衡的罪魁祸首其实是消费端的分配队列的策略. 按照上面的分配队列策略, 是不完美的, 天生会造成消息不均衡. 只要生产者的生产消息的速度足够快, 消费者消费消息的速度明显跟不上, 这种消息不均衡的现象就会明显出现.

RocketMQ还提供了一个分配队列的策略, 它是AllocateMessageQueueAveragelyByCircle , 这个策略是环形平均分配, 如下图所示

学新通就好像消费者1和消费者2轮流从8个队列里面挨个拿礼物一样, 不管从全局,还是局部来看, 队列都是平均的分配给2个消费者. 即便broker2此时不可用, 消息都被发送到了broker1上, 依然是有2个消费者来消费消息的, 而不像之前的那样, 只有一个消费者消费消息, 避免了部分消费者处于饥饿的状态, 间接避免了消息不均衡的问题.

RocketMQ中的RebalanceImpl类负责消费端的负载均衡, 用于给消费端分配对应的队列,源码如下

// 源码位置 : org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            ...
        }
        case CLUSTERING: {
            if (mqSet != null && cidAll != null) {

                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try {
                    // 调用具体的分配队列策略
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) { }
				...
            }
            break;
        }
        default:
            break;
    }
}



学新通

【说明】
1.在原生RocketMQ v4.5.2源码中并没有看到可以手动指定不同分配队列策略的入口, 也许是我没找到
2.如果使用的是商业版的RocketMQ, 那么使用的ons-client包, 在这个包的1.8.7.4.Final中,如果实例化Consumer的时候指定了AllocateMessageQueueAveragelyByCircle策略的话, 那么就可以使用它.

ons-client底层代码,根据程序员的配置, 使用不同的分配队列的策略,如下
学新通

而我们能做的,就是在实例化Consumer的时候,指定AVG_BY_CIRCLE属性, 那么商业版RocketMQ底层就是使用AllocateMessageQueueAveragelyByCircle策略用来给每个消费者分配队列.
学新通

读者朋友只需要知道采用什么策略可以达到什么效果即可, 具体的实现与具体的版本和写法有关,根据实际情况调整下即可.

3.当然RocketMQ还有其他的分配队列的策略,这里就没有介绍.
4.脚本工具, 由于工作中使用的是阿里云商业版的RocketMQ, 有时候需要模拟发送和接收消息,基于官方API文档,写了一个HTTP协议的Python版的生产者和消费者, 以及TCP协议的Java版的生产者和消费者, 其实没有啥技术含量,就是把官网的API拿过来,改吧改吧而已.

虽然我们可以采取环形平均分配的策略, 但是还需要考虑一个问题,如下图

学新通当消费者数量和队列数量不匹配的时候, 总会有一些消费者比其他消费者多分配几个队列, 一旦生产者生产的消息速度比较快, 而消费者消费消息的速度比较慢的话, 那么多分配的几个队列就会造成消息倾斜消息不均衡的悲惨状况.

导致消费者消费慢的原因,比如RPC调用慢, 数据库查询慢, 硬件等原因

【总结】
第一点消费端需要采取合适的队列分配策略,比如按照环形平均分配策略(AllocateMessageQueueAveragelyByCircle),

第二点消费者的数量和队列的数量尽量保持公平平均分配, 前面这两点就是在说, 不仅每个消费者要分配到相同数量的队列, 而且这些队列还需要按照环形的方式分配给消费者.

第三点就是消费者的消费速度不能太慢, 尽量把消息尽快消费掉, 而且消费者彼此之间消费能力差别不大.

第四点就是读写队列数量必须相等





个人站点
语雀

公众号

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiacjea
系列文章
更多 icon
同类精品
更多 icon
继续加载