• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RocketMQ:消费者顺序消费源码

武飞扬头像
我神级欧文
帮助2

RocketMQ对于消费者端顺序消费来说只能保证局部顺序,并不能保证全局顺序消费,局部顺序的意思就是只能对于一个mq的消息达到顺序消费,所以若是想要达到全局顺序消费的效果,对于一个topic来说可以值设置一个mq。

RocketMQ实现顺序消费的原理:因为要保证一个mq的消息能够被顺序消费,第一首先这个mq的消息必须要保证不能被一个以上的消费者所消费(并发消费时如果发生mq的负载均衡就可能会产生一个mq的消息被重复消费了,并且还是乱序消费的),那么其实问题就在于消费者之间是不同的系统而产生的,按照正常的逻辑去想就可以给需要被顺序消费的mq加上分布式锁,而RocketMQ也是这样解决的,不过它并没有使用其他的如redis这些第三方中间件去实现redis锁,而是直接在broker中加锁(因为消费者与broker之间就是分布式的,所以broker可以直接充当redis这些角色去给mq加分布式锁);第二,在第一点的基础上可以保证了一个mq的消息只能被一个消费者所消费,那么消费者自己消费的时候也必须要保证是顺序的,所以在顺序消费的消费线程池中一个线程值对应消费一个mq的消息

1.消费者对分配到的mq加上broker锁

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance

我们来到mq产生负载均衡的时候的代码逻辑,部分代码如下:

  1.  
    for (MessageQueue mq : mqSet) {
  2.  
    // 条件成立: 说明这个mq是新分配给当前消费者的
  3.  
    if (!this.processQueueTable.containsKey(mq)) {
  4.  
     
  5.  
    // 如果当前消费者实例是顺序消费,那么就会先对新分配到的mq进行broker端加锁,如果加锁不成功,直接跳过
  6.  
    if (isOrder && !this.lock(mq)) {
  7.  
    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
  8.  
    continue;
  9.  
    }
  10.  
     
  11.  
    // 内存中可能有该队列的一些脏数据,所以要把这些脏数据移除
  12.  
    this.removeDirtyOffset(mq);
  13.  
    // 给新分配的mq创建一个对应的队列快照对象
  14.  
    ProcessQueue pq = new ProcessQueue();
  15.  
    // 根据用户设置的ConsumeFromWhere去获取新分配的mq下一次起始消费偏移量,ConsumeFromWhere根据setConsumeFromWhere()方法进行设置
  16.  
    long nextOffset = this.computePullFromWhere(mq);
  17.  
    // 如果起始消费偏移量 >= 0, 就创建拉取消息的任务
  18.  
    if (nextOffset >= 0) {
  19.  
    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  20.  
    if (pre != null) {
  21.  
    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  22.  
    } else {
  23.  
    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
  24.  
    PullRequest pullRequest = new PullRequest();
  25.  
    pullRequest.setConsumerGroup(consumerGroup);
  26.  
    pullRequest.setNextOffset(nextOffset);
  27.  
    pullRequest.setMessageQueue(mq);
  28.  
    pullRequest.setProcessQueue(pq);
  29.  
    pullRequestList.add(pullRequest);
  30.  
    changed = true;
  31.  
    }
  32.  
    }
  33.  
    // 如果起始消费偏移量 < 0,就什么都不做,只打印个日志
  34.  
    else {
  35.  
    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
  36.  
    }
  37.  
    }
  38.  
    }
学新通

在消费者获取到新分配的mq之后,如果这个消费者是顺序消费的,那么就需要对这个mq进行加broker锁,具体加锁的代码如下:

  1.  
    /**
  2.  
    * 当前消费者实例对目标mq进行加锁
  3.  
    * @param mq 目标mq
  4.  
    * @return true表示加broker锁成功,反之不成功
  5.  
    */
  6.  
    public boolean lock(final MessageQueue mq) {
  7.  
    // 找到master节点的地址
  8.  
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
  9.  
    if (findBrokerResult != null) {
  10.  
     
  11.  
    // 加锁请求体对象
  12.  
    LockBatchRequestBody requestBody = new LockBatchRequestBody();
  13.  
    // 请求加锁的消费者组
  14.  
    requestBody.setConsumerGroup(this.consumerGroup);
  15.  
    // 请求加锁的客户端
  16.  
    requestBody.setClientId(this.mQClientFactory.getClientId());
  17.  
    // 请求加锁的客户端所分配到的mq
  18.  
    requestBody.getMqSet().add(mq);
  19.  
     
  20.  
    try {
  21.  
    // 当前消费者实例对新分配到的mq进行加锁,如果成功,返回该mq
  22.  
    Set<MessageQueue> lockedMq =
  23.  
    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
  24.  
    for (MessageQueue mmqq : lockedMq) {
  25.  
    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
  26.  
    if (processQueue != null) {
  27.  
    processQueue.setLocked(true);
  28.  
    processQueue.setLastLockTimestamp(System.currentTimeMillis());
  29.  
    }
  30.  
    }
  31.  
     
  32.  
    // 加锁成功,lockOK == true
  33.  
    boolean lockOK = lockedMq.contains(mq);
  34.  
    log.info("the message queue lock {}, {} {}",
  35.  
    lockOK ? "OK" : "Failed",
  36.  
    this.consumerGroup,
  37.  
    mq);
  38.  
    return lockOK;
  39.  
    } catch (Exception e) {
  40.  
    log.error("lockBatchMQ exception, " mq, e);
  41.  
    }
  42.  
    }
  43.  
     
  44.  
    return false;
  45.  
    }
学新通

如果这个mq加锁成功就会返回true,否则就返回false

2.消费者本地队列快照ProcessQueue加锁

上面负载均衡的mq加了broker锁之后消费者端就应该开始拉取消息消费了吧,但是由于消费者是直接消费的是快照队列中的消息,所以队列快照也需要根据mq的加锁情况去进行标记是否已经加锁

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

  1.  
    if (processQueue.isLocked()) {
  2.  
    if (!pullRequest.isLockedFirst()) {
  3.  
    // 计算出将要从该mq的哪个位置开始消费
  4.  
    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
  5.  
    boolean brokerBusy = offset < pullRequest.getNextOffset();
  6.  
    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
  7.  
    pullRequest, offset, brokerBusy);
  8.  
    if (brokerBusy) {
  9.  
    log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
  10.  
    pullRequest, offset);
  11.  
    }
  12.  
     
  13.  
    pullRequest.setLockedFirst(true);
  14.  
    pullRequest.setNextOffset(offset);
  15.  
    }
  16.  
    }
  17.  
    // 代码走到这里是什么情况?说明这个mq是刚通过负载均衡新分配到的,并且此时这个新分配的mq在broker端加锁了,而对应的ProcessQueue并没有加锁
  18.  
    // 因为此时消费者实例还没有对此mq对应的ProcessQueue进行加锁(broker端已经加锁),那什么时候这个会对新分配的mq对应的ProcessQueue进行加锁?
  19.  
    // 答案就是顺序消费服务ConsumeMessageOrderlyService每20s会通过负载均衡服务rebalanceImpl去对当前消费者实例分配到的mq进行broker端的加锁(或者续约),而同时也会对mq对应的ProcessQueue进行加锁
  20.  
    // 而在mq对应的ProcessQueue加锁之前,拉取任务都会延迟3s再执行,直到ProcessQueue进行了加锁之后就可以走上面的顺序拉取逻辑分支了
  21.  
    else {
  22.  
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  23.  
    log.info("pull message later because not locked in broker, {}", pullRequest);
  24.  
    return;
  25.  
    }
学新通

可以看到在消费者拉取消息之前会去根据ProcessQueue的加锁状态去判断对应的mq有没有加了broker锁,如果加了broker锁就先去获取到mq的消费起始位置,设置好了消费起始位置之后就开始拉取消息了。这里有一个问题,就是ProcessQueue是什么时候开始加锁的?

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start

  1.  
    public void start() {
  2.  
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
  3.  
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  4.  
    @Override
  5.  
    public void run() {
  6.  
    // 延迟1s执行,每20s对当前消费者实例所分配到的mq进行broker端加锁或者续约锁
  7.  
    ConsumeMessageOrderlyService.this.lockMQPeriodically();
  8.  
    }
  9.  
    }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
  10.  
    }
  11.  
    }
  1.  
    public synchronized void lockMQPeriodically() {
  2.  
    if (!this.stopped) {
  3.  
    this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
  4.  
    }
  5.  
    }
  1.  
    /**
  2.  
    * 对于顺序消费来说,对当前消费者实例所分配到的mq进行broker端加锁
  3.  
    */
  4.  
    public void lockAll() {
  5.  
    // 对当前消费者实例所分配到的所有mq根据brokerName进行分组
  6.  
    HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
  7.  
     
  8.  
     
  9.  
    Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
  10.  
    while (it.hasNext()) {
  11.  
    Entry<String, Set<MessageQueue>> entry = it.next();
  12.  
    final String brokerName = entry.getKey();
  13.  
    final Set<MessageQueue> mqs = entry.getValue();
  14.  
     
  15.  
    if (mqs.isEmpty())
  16.  
    continue;
  17.  
     
  18.  
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
  19.  
    if (findBrokerResult != null) {
  20.  
    LockBatchRequestBody requestBody = new LockBatchRequestBody();
  21.  
    requestBody.setConsumerGroup(this.consumerGroup);
  22.  
    requestBody.setClientId(this.mQClientFactory.getClientId());
  23.  
    requestBody.setMqSet(mqs);
  24.  
     
  25.  
    try {
  26.  
    // 对mq集合进行broker端加锁,返回加锁成功的mq
  27.  
    Set<MessageQueue> lockOKMQSet =
  28.  
    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
  29.  
     
  30.  
    // 遍历在broker端加锁成功的mq,然后在消费者端也对该mq对应的ProcessQueue进行加锁
  31.  
    for (MessageQueue mq : lockOKMQSet) {
  32.  
    ProcessQueue processQueue = this.processQueueTable.get(mq);
  33.  
    if (processQueue != null) {
  34.  
    if (!processQueue.isLocked()) {
  35.  
    log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
  36.  
    }
  37.  
    // ProcessQueue标记已加broker锁
  38.  
    processQueue.setLocked(true);
  39.  
    // 更新加broker锁的时间
  40.  
    processQueue.setLastLockTimestamp(System.currentTimeMillis());
  41.  
    }
  42.  
    }
  43.  
     
  44.  
    for (MessageQueue mq : mqs) {
  45.  
    // 找到在broker端加锁未成功的mq
  46.  
    if (!lockOKMQSet.contains(mq)) {
  47.  
    ProcessQueue processQueue = this.processQueueTable.get(mq);
  48.  
    if (processQueue != null) {
  49.  
    // 把ProcessQueue设置成未加锁状态
  50.  
    processQueue.setLocked(false);
  51.  
    log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
  52.  
    }
  53.  
    }
  54.  
    }
  55.  
    } catch (Exception e) {
  56.  
    log.error("lockBatchMQ exception, " mqs, e);
  57.  
    }
  58.  
    }
  59.  
    }
  60.  
    }
学新通

在顺序消费服务启动的时候会开始一个定时任务,这个定时任务每隔20s就会把当前消费者所分配到的所有mq向broker申请加锁,broker端返回加锁结果,加锁成功的mq会把对应的ProcessQueue的加锁标记设置为true,同样的加锁不成功的mq对应的ProcessQueue加锁标记就会设置为false

3.往顺序消费服务ConsumeMessageOrderlyService中提交消费任务

org.apache.rocketmq.client.consumer.PullCallback#onSuccess

  1.  
    // 把从broker端拉取下来并且经过客户端过滤之后的消息放到ProcessQueue中
  2.  
    // 可以根据返回的dispatchToConsume去决定是否需要向消费线程池中提交消费任务,该变量用于保证顺序消费
  3.  
    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
  4.  
    // 把消费者自定义过滤后的消息交给消费服务(并发消费或者顺序消费)去进行处理,里面包括回调执行我们自己的消费业务逻辑
  5.  
    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
  6.  
    pullResult.getMsgFoundList(),
  7.  
    processQueue,
  8.  
    pullRequest.getMessageQueue(),
  9.  
    dispatchToConsume);
  1.  
    public void submitConsumeRequest(
  2.  
    final List<MessageExt> msgs,
  3.  
    final ProcessQueue processQueue,
  4.  
    final MessageQueue messageQueue,
  5.  
    final boolean dispathToConsume) {
  6.  
    if (dispathToConsume) {
  7.  
    ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
  8.  
    this.consumeExecutor.submit(consumeRequest);
  9.  
    }
  10.  
    }

在消费者从mq中成功拉取到消息之后会存放到对应的队列快照中,并且会返回dispatchToConsume这个变量,这个变量能够控制是否给线程池提交消费任务,因为我们上面也说过了消费者端要保证一个mq的顺序消费的话,必须是一个消费线程对应一个mq才能够保证,那么dispatchToConsume变量什么时候才能为true呢?我们去到processQueue.putMessage方法看看

  1.  
    /**
  2.  
    * 把从broker端拉取下来并且经过客户端过滤之后的消息放到msgTreeMap
  3.  
    * @param msgs 从broker端拉取下来并且经过客户端过滤之后的msg
  4.  
    * @return 该返回值值针对于顺序消费,当dispatchToConsume == true,表示顺序消费服务需要提交一个消费任务到消费线程池,反之不提交
  5.  
    * 而控制dispatchToConsume == true的是当consuming == false的时候,也就是当顺序消费服务把msgTreeMap的消息都消息完了之后才会再次向消费线程池中提交消费任务,这样就可以保证消息的顺序消费
  6.  
    */
  7.  
    public boolean putMessage(final List<MessageExt> msgs) {
  8.  
    // 是否让顺序消费服务去消费消息
  9.  
    boolean dispatchToConsume = false;
  10.  
    try {
  11.  
    // 加写锁
  12.  
    this.lockTreeMap.writeLock().lockInterruptibly();
  13.  
    try {
  14.  
    int validMsgCnt = 0;
  15.  
    // 遍历拉取到的所有消息
  16.  
    for (MessageExt msg : msgs) {
  17.  
    // 把消息放到msgTreeMap中
  18.  
    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
  19.  
    if (null == old) {
  20.  
    validMsgCnt ;
  21.  
    // 得到消费队列的最大消息偏移量
  22.  
    this.queueOffsetMax = msg.getQueueOffset();
  23.  
    // 递增记录还未被消费的消息大小
  24.  
    msgSize.addAndGet(msg.getBody().length);
  25.  
    }
  26.  
    }
  27.  
    // 递增记录还未被消费的消息数量
  28.  
    msgCount.addAndGet(validMsgCnt);
  29.  
     
  30.  
    // 如果msgTreeMap中有未被消费的消息并且该ProcessQueue还没有被消费者去消费, 那么dispatchToConsume = true, consuming = true
  31.  
    if (!msgTreeMap.isEmpty() && !this.consuming) {
  32.  
    dispatchToConsume = true;
  33.  
    this.consuming = true;
  34.  
    }
  35.  
     
  36.  
    if (!msgs.isEmpty()) {
  37.  
    MessageExt messageExt = msgs.get(msgs.size() - 1);
  38.  
    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
  39.  
    if (property != null) {
  40.  
    // 计算出还有多少消息未被拉取下来
  41.  
    long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
  42.  
    if (accTotal > 0) {
  43.  
    this.msgAccCnt = accTotal;
  44.  
    }
  45.  
    }
  46.  
    }
  47.  
    } finally {
  48.  
    this.lockTreeMap.writeLock().unlock();
  49.  
    }
  50.  
    } catch (InterruptedException e) {
  51.  
    log.error("putMessage exception", e);
  52.  
    }
  53.  
     
  54.  
    return dispatchToConsume;
  55.  
    }
学新通

可以看到当第一次放入消息在队列快照中时,!msgTreeMap.isEmpty() == true,consuming默认等于false,所以dispatchToConsume = true,this.consuming = true,但是当第二次放入消息的时候,由于this.consuming = true,所以dispatchToConsume = false,也就是不会给线程池添加消费任务,那么consuming这个变量什么时候会变成false呢?我们现在去看takeMessage方法

  1.  
    /**
  2.  
    * 该方法只针对顺序消费服务去使用
  3.  
    * 从msgTreeMap中获取msg进行消费,如果msgTreeMap中的msg被消费完了,那么会把consuming属性设置为false
  4.  
    * @param batchSize 从msgTreeMap中获取多少个msg去消费
  5.  
    * @return 返回获取到的msg集合
  6.  
    */
  7.  
    public List<MessageExt> takeMessages(final int batchSize) {
  8.  
    List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
  9.  
    final long now = System.currentTimeMillis();
  10.  
    try {
  11.  
    // 加写锁
  12.  
    this.lockTreeMap.writeLock().lockInterruptibly();
  13.  
    this.lastConsumeTimestamp = now;
  14.  
    try {
  15.  
    if (!this.msgTreeMap.isEmpty()) {
  16.  
    for (int i = 0; i < batchSize; i ) {
  17.  
    // 获取到最小偏移量的消息,并在msgTreeMap中移除
  18.  
    Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
  19.  
    if (entry != null) {
  20.  
    result.add(entry.getValue());
  21.  
    consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
  22.  
    } else {
  23.  
    break;
  24.  
    }
  25.  
    }
  26.  
    }
  27.  
     
  28.  
    if (result.isEmpty()) {
  29.  
    consuming = false;
  30.  
    }
  31.  
    } finally {
  32.  
    this.lockTreeMap.writeLock().unlock();
  33.  
    }
  34.  
    } catch (InterruptedException e) {
  35.  
    log.error("take Messages exception", e);
  36.  
    }
  37.  
     
  38.  
    return result;
  39.  
    }
学新通

首先先说明一下takeMessage方法是在消费消息的时候被调用的,其中我们可以看到每次消费的时候都会从msgTreeMap中去获取到消息并从其中移除掉,当msgTreeMap中的消息都被消费完了的时候,此时consuming就重置为false,那么当下一次调用putMessage方法的时候返回的dispatchToConsume变量值就等于true了。总结地也就是说,为了保证顺序消费,消费者每拉取到一批数据放入到队列快照中的时候都会检查一下队列快照中是否还存在未消费的消息,如果有则不会往线程池中添加消费任务,反之则添加。

4.执行顺序消费任务

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run

  1.  
    if (this.processQueue.isDropped()) {
  2.  
    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
  3.  
    return;
  4.  
    }
  5.  
     
  6.  
    // 获取到mq对应的锁对象
  7.  
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
  8.  
    // 先加锁,避免有多个消费任务同时执行,保证消费消息的顺序性
  9.  
    synchronized (objLock) {
  10.  
    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
  11.  
    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
  12.  
    final long beginTime = System.currentTimeMillis();
  13.  
    for (boolean continueConsume = true; continueConsume; ) {
  14.  
     
  15.  
    // 如果该mq已经被dropped,那么停止对该mq的消费
  16.  
    if (this.processQueue.isDropped()) {
  17.  
    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
  18.  
    break;
  19.  
    }
  20.  
     
  21.  
    // 条件成立:集群模式并且队列没有被加锁
  22.  
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
  23.  
    && !this.processQueue.isLocked()) {
  24.  
    log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
  25.  
    // 尝试对队列进行重新加锁,并且重新往线程池中添加消费任务
  26.  
    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
  27.  
    // 消费线程结束
  28.  
    break;
  29.  
    }
  30.  
     
  31.  
    // 条件成立:集群模式并且锁已过期
  32.  
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
  33.  
    && this.processQueue.isLockExpired()) {
  34.  
    log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
  35.  
    // 尝试对队列进行重新加锁,并且重新往线程池中添加消费任务
  36.  
    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
  37.  
    // 消费线程结束
  38.  
    break;
  39.  
    }
  40.  
     
  41.  
    // 如果该消费线程已经持续消费工作超过了1分钟,那么就结束当前消费线程重新提交一个消费任务
  42.  
    long interval = System.currentTimeMillis() - beginTime;
  43.  
    if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
  44.  
    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
  45.  
    // 消费线程结束
  46.  
    break;
  47.  
    }
  48.  
     
  49.  
    final int consumeBatchSize =
  50.  
    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
  51.  
     
  52.  
    // 从ProcessQueue中获取指定数量的msg
  53.  
    List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
  54.  
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
  55.  
    if (!msgs.isEmpty()) {
  56.  
    final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
  57.  
     
  58.  
    ConsumeOrderlyStatus status = null;
  59.  
    ConsumeMessageContext consumeMessageContext = null;
  60.  
     
  61.  
    // 执行消息消费的前置钩子方法
  62.  
    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  63.  
    consumeMessageContext = new ConsumeMessageContext();
  64.  
    consumeMessageContext
  65.  
    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
  66.  
    consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
  67.  
    consumeMessageContext.setMq(messageQueue);
  68.  
    consumeMessageContext.setMsgList(msgs);
  69.  
    consumeMessageContext.setSuccess(false);
  70.  
    // init the consume context type
  71.  
    consumeMessageContext.setProps(new HashMap<String, String>());
  72.  
    ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
  73.  
    }
  74.  
     
  75.  
    long beginTimestamp = System.currentTimeMillis();
  76.  
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
  77.  
    boolean hasException = false;
  78.  
    try {
  79.  
    this.processQueue.getLockConsume().lock();
  80.  
    if (this.processQueue.isDropped()) {
  81.  
    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
  82.  
    this.messageQueue);
  83.  
    break;
  84.  
    }
  85.  
     
  86.  
    // 执行用户自定义的顺序消费回调
  87.  
    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
  88.  
    } catch (Throwable e) {
  89.  
    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
  90.  
    RemotingHelper.exceptionSimpleDesc(e),
  91.  
    ConsumeMessageOrderlyService.this.consumerGroup,
  92.  
    msgs,
  93.  
    messageQueue);
  94.  
    hasException = true;
  95.  
    } finally {
  96.  
    this.processQueue.getLockConsume().unlock();
  97.  
    }
  98.  
     
  99.  
    if (null == status
  100.  
    || ConsumeOrderlyStatus.ROLLBACK == status
  101.  
    || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
  102.  
    log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
  103.  
    ConsumeMessageOrderlyService.this.consumerGroup,
  104.  
    msgs,
  105.  
    messageQueue);
  106.  
    }
  107.  
     
  108.  
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
  109.  
     
  110.  
    // 根据消费回调返回值去得到returnType
  111.  
    if (null == status) {
  112.  
    if (hasException) {
  113.  
    returnType = ConsumeReturnType.EXCEPTION;
  114.  
    } else {
  115.  
    returnType = ConsumeReturnType.RETURNNULL;
  116.  
    }
  117.  
    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
  118.  
    returnType = ConsumeReturnType.TIME_OUT;
  119.  
    } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
  120.  
    returnType = ConsumeReturnType.FAILED;
  121.  
    } else if (ConsumeOrderlyStatus.SUCCESS == status) {
  122.  
    returnType = ConsumeReturnType.SUCCESS;
  123.  
    }
  124.  
     
  125.  
    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  126.  
    consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
  127.  
    }
  128.  
     
  129.  
    if (null == status) {
  130.  
    status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  131.  
    }
  132.  
     
  133.  
    // 执行消息消费的后置钩子方法
  134.  
    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  135.  
    consumeMessageContext.setStatus(status.toString());
  136.  
    consumeMessageContext
  137.  
    .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
  138.  
    ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
  139.  
    }
  140.  
     
  141.  
    ConsumeMessageOrderlyService.this.getConsumerStatsManager()
  142.  
    .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
  143.  
     
  144.  
    // 根据消费结果执行不同的逻辑
  145.  
    continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
  146.  
    } else {
  147.  
    continueConsume = false;
  148.  
    }
  149.  
    }
  150.  
    } else {
  151.  
    if (this.processQueue.isDropped()) {
  152.  
    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
  153.  
    return;
  154.  
    }
  155.  
     
  156.  
    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
  157.  
    }
  158.  
    }
  159.  
    }
学新通

可以看到一开始会从messageQueueLock中根据mq去获取到对应的锁对象,这也进一步保证了一个mq被顺序消费。

接着会有下面几步的处理:

1.如果该mq已经被dropped,结束消费线程

2.如果是集群模式并且队列没有加锁,会尝试对队列重新加锁,延迟提交一个新的消费任务,结束当前消费线程

3.如果是集群模式并且队列的锁已经过期了,会尝试对队列重新加锁,延迟提交一个新的消费任务,结束当前消费线程

4.如果该消费线程已经持续消费工作超过了1分钟,那么就结束当前消费线程重新提交一个消费任务

5.从ProcessQueue中获取指定数量的消息

6.执行消息消费的前置钩子方法

7.执行用户自定义的监听回调方法

8.执行消息消费的后置钩子方法

9.根据监听回调方法返回值去执行不同的处理

其中第9步就是根据我们在监听回调方法的返回值去做出不同的处理,而对于顺序消费来说,我们一般都返回两个值,分别是ConsumeOrderlyStatus.SUCCESS以及ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,下面我们来看下消费者在收到用户返回的这两种类型的返回值的时候会分别怎样去处理

  1.  
    public boolean processConsumeResult(
  2.  
    final List<MessageExt> msgs,
  3.  
    final ConsumeOrderlyStatus status,
  4.  
    final ConsumeOrderlyContext context,
  5.  
    final ConsumeRequest consumeRequest
  6.  
    ) {
  7.  
    // 控制外层的消费任务是否继续执行
  8.  
    boolean continueConsume = true;
  9.  
    // 已消费消息的最大偏移量
  10.  
    long commitOffset = -1L;
  11.  
    // 自动提交
  12.  
    if (context.isAutoCommit()) {
  13.  
    switch (status) {
  14.  
    case COMMIT:
  15.  
    case ROLLBACK:
  16.  
    log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
  17.  
    consumeRequest.getMessageQueue());
  18.  
     
  19.  
    // 消息消费成功
  20.  
    case SUCCESS:
  21.  
    // 返回已消费的消息中最大的偏移量
  22.  
    commitOffset = consumeRequest.getProcessQueue().commit();
  23.  
    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
  24.  
    break;
  25.  
     
  26.  
    // 需要挂起
  27.  
    case SUSPEND_CURRENT_QUEUE_A_MOMENT:
  28.  
    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
  29.  
    if (checkReconsumeTimes(msgs)) {
  30.  
    // 把消费失败的消息重新放入到ProcessQueue中
  31.  
    consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
  32.  
    // 延迟(默认1s)提交消费任务,保证消费失败的消息能够被重新消费
  33.  
    this.submitConsumeRequestLater(
  34.  
    consumeRequest.getProcessQueue(),
  35.  
    consumeRequest.getMessageQueue(),
  36.  
    context.getSuspendCurrentQueueTimeMillis());
  37.  
    // 本次消费任务结束
  38.  
    continueConsume = false;
  39.  
    }
  40.  
    // 消息已经超过最大重试次数了,直接提交,忽略该消息(当成消费成功)
  41.  
    else {
  42.  
    commitOffset = consumeRequest.getProcessQueue().commit();
  43.  
    }
  44.  
    break;
  45.  
    default:
  46.  
    break;
  47.  
    }
  48.  
    }
  49.  
    // 手动提交
  50.  
    else {
  51.  
    switch (status) {
  52.  
    case SUCCESS:
  53.  
    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
  54.  
    break;
  55.  
    case COMMIT:
  56.  
    commitOffset = consumeRequest.getProcessQueue().commit();
  57.  
    break;
  58.  
    case ROLLBACK:
  59.  
    consumeRequest.getProcessQueue().rollback();
  60.  
    this.submitConsumeRequestLater(
  61.  
    consumeRequest.getProcessQueue(),
  62.  
    consumeRequest.getMessageQueue(),
  63.  
    context.getSuspendCurrentQueueTimeMillis());
  64.  
    continueConsume = false;
  65.  
    break;
  66.  
    case SUSPEND_CURRENT_QUEUE_A_MOMENT:
  67.  
    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
  68.  
    if (checkReconsumeTimes(msgs)) {
  69.  
    consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
  70.  
    this.submitConsumeRequestLater(
  71.  
    consumeRequest.getProcessQueue(),
  72.  
    consumeRequest.getMessageQueue(),
  73.  
    context.getSuspendCurrentQueueTimeMillis());
  74.  
    continueConsume = false;
  75.  
    }
  76.  
    break;
  77.  
    default:
  78.  
    break;
  79.  
    }
  80.  
    }
  81.  
     
  82.  
    if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  83.  
    // 向broker提交最大的已消费偏移量
  84.  
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
  85.  
    }
  86.  
     
  87.  
    return continueConsume;
  88.  
    }
学新通

我们只看自动提交就可以了,如果监听回调中返回的是SUCCESS,那么就调用commit方法返回已消费的消息中的最大偏移量,然后再向broker提交这个最大已消费偏移量;如果监听回调返回的是SUSPEND_CURRENT_QUEUE_A_MOMENT,那么此时就需要判断下该消息是否已超过重试次数,如果没有超过,把该消息重新放回到ProcessQueue中然后再次提交一个消费任务,然后就会结束当前的消费任务,等待新提交的消费任务去对消费失败的消息进行重试,这也是顺序消费与并发消费在处理消费失败时不同的一点,顺序消费为了保证消费是顺序被消费的,所以就不能像并发消费那样对消息进行回退重试。但是如果顺序消费时该消息重试次数满了呢?这种情况消费者会忽略该消息,也就是把该消息当做是成功消费了,然后再直接向broker提交最大消费偏移量

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiaejeb
系列文章
更多 icon
同类精品
更多 icon
继续加载