• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Broker主从同步

武飞扬头像
Forever Happy
帮助1

主从同步概述

众所周知,Broker中有两种角色:Master和Slave。Master主要用于处理生产者、消费者的请求和存储数据,Slave从Broker中同步所有的数据到本地,具体体现以下两个作用:

  • 保证Broker服务高可用:当Broker宕机时,消费者可以通过连接Slave继续消费,这样可以保证服务的高可用
  • 提高服务性能:消费者从Master Broker拉取消息时,发现拉取消息的offset和CommitLog的物理offset相差太多,会转向Slave拉取消息,这样可以减轻Master的压力,从而提高性能。

Broker同步数据的方式有两种:

  • 同步复制:指客户端发送消息到Master,Master将消息同步复制到Slave的过程,可以通过设置参数 brokerRole = BrokerRole.SYNC_MASTER来实现,可靠性高但效率比较低。
  • 异步复制:指客户端发送消息到Master,再由异步线程HAService异步同步到Slvae的过程,可以通过设置参数brokerRole = BrokerRole.ASYNC_MASTER来实现,效率非常高,但是可靠性比同步复制差。

Broker同步的数据有两种:配置数据和消息数据。配置数据主要包含Topic配置、消费者位点信息,延迟消息位点信息、订阅关系配置等。

主从同步流程

名词解释

服务名 功能
SlaveSynchronize Slave从Master同步配置数据的服务
HAService Slave从Master同步CommitLog数据
HAConnection Slave连接信息
HAConnection.WriteSocketService 将CommitLog写入网络,发送给Slave
HAConnection.ReadSocketService 读取Slave发送的offset请求
HAClient Slave处理与Master通信的客户端封装
GroupTransferService 主从同步通知类,实现同步、异步复制提供新数据通知服务
AcceptSocketService Master接受Slave发送的上报offset请求的服务

配置数据同步流程

配置数据的同步包含4中类型:Topic配置(TopicConfigManager)、消费者位点(ConsumerOffsetManager)、延迟位点(ScheduleMessageService)、订阅关系配置(SubscriptionGroupManager),它们都继承自ConfigManager抽象类。

当Slave Broker(brokerRole = BrokerRole.SLAVE)启动时,会初始化SlaveSynchronize方法,调用org.apache.rocketmq.broker.BrokerController#handleSlaveSynchronize方法,每10s调用一次org.apache.rocketmq.broker.slave.SlaveSynchronize.syncAll方法:

    private void handleSlaveSynchronize(BrokerRole role) {
        if (role == BrokerRole.SLAVE) {
            if (null != slaveSyncFuture) {
                // 取消正在运行的slave同步任务,不强制中断
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
            slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.slaveSynchronize.syncAll();
                    }
                    catch (Throwable e) {
                        log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                    }
                }
            }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
        } else {
            //handle the slave synchronise
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
        }
    }
学新通

syncAll方法一次调用四种配置数据(Topic配置、消费者位点、同步延迟位点、订阅关系配置)的同步方法同步全量数据:

    public void syncAll() {
        // 同步Topic配置
        this.syncTopicConfig();
        // 同步消费者位点
        this.syncConsumerOffset();
        // 同步延迟位点
        this.syncDelayOffset();
        // 同步订阅关系配置
        this.syncSubscriptionGroupConfig();
    }

syncAll方法执行的4个方法都是通过调用BrokerOuterAPI中的方法,根据RequestCode.GET_ALL_TOPIC_CONFIG、RequestCode.GET_ALL_CONSUMER_OFFSET、RequestCode.GET_ALL_DELAY_OFFSET、RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG使用Remoting模块远程调用Master Broker,而Master Broker初始化过程中注册了org.apache.rocketmq.broker.processor.AdminBrokerProcessor处理器接受对应的请求,根据RequestCode的值进行不同方法的调用(四种配置数据信息获取的方法很简单,不在赘述)。

Slave根据上面说的流程获取到Master中的配置数据信息同步持久化到磁盘中。

CommitLog数据同步流程

CommitLog的数据同步分为同步复制和异步复制两种。同步复制是生产者生产消息后,等待Master Broker将数据同步到Slave Broker后,再返回生产者数据存储状态;异步复制是生产者在生产消息后,不用等待Slave同步,直接返回Master存储结果。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oWDHoVxA-1649378441486)(C:\Users\liaozhirong\Desktop\learning\Images\webp.webp)]

异步复制

Master Broker启动时,会初始化和启动DefaultMessageStore存储服务,而DefaultMessageStore存储服务会初始化org.apache.rocketmq.store.ha.HAService服务,HAService服务会启动org.apache.rocketmq.store.ha.HAService。AcceptSocketService服务,监听10912端口,用于接收来自一个或多个Slave的注册请求,当有Slave注册请求进来时,会创建一个HAConnection,同时HAConnection会创建WriteSocketServiceReadSocketService服务并启动,开始主从数据同步。

Slave -> Master
ReadSocketService:Master接收Slave同步数据的请求,并将这些信息保存在HAConnection中。
    public void run() {
      HAConnection.log.info(this.getServiceName()   " service started");

      while (!this.isStopped()) {
        try {
          this.selector.select(1000);
          // 接收Slave的上报请求
          boolean ok = this.processReadEvent();
          if (!ok) {
            HAConnection.log.error("processReadEvent error");
            break;
          }

          long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
          // 判断slave心跳时间是否超过同步存活时间,表示Slave不可用(可能宕机或者网络不可用)
          // 终止获取Slave的上报请求循环,删除Slave的连接
          if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
            log.warn("ha housekeeping, found this connection["   HAConnection.this.clientAddr   "] expired, "   interval);
            break;
          }
        } catch (Exception e) {
          HAConnection.log.error(this.getServiceName()   " service has exception.", e);
          break;
        }
      }
      // 暂停该Slave的read、write服务,并移除该Slave连接
      this.makeStop();

      writeSocketService.makeStop();

      haService.removeConnection(HAConnection.this);

      HAConnection.this.haService.getConnectionCount().decrementAndGet();

      SelectionKey sk = this.socketChannel.keyFor(this.selector);
      if (sk != null) {
        sk.cancel();
      }

      try {
        this.selector.close();
        this.socketChannel.close();
      } catch (IOException e) {
        HAConnection.log.error("", e);
      }

      HAConnection.log.info(this.getServiceName()   " service end");
    }

    private boolean processReadEvent() {
      // 读取到0的次数
      int readSizeZeroTimes = 0;
      // 读取到最大子节,flip重置position为0,processPosition=0
      if (!this.byteBufferRead.hasRemaining()) {
        this.byteBufferRead.flip();
        this.processPosition = 0;
      }
      // byteBufferRead是否可以还可以写入数据
      while (this.byteBufferRead.hasRemaining()) {
        try {
          // 读取数据到缓存中
          int readSize = this.socketChannel.read(this.byteBufferRead);
          // readSize>0表示读取到子节数据
          if (readSize > 0) {
            // 设置读取到0的次数为0、最近读取数据的时间
            readSizeZeroTimes = 0;
            this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
            // 读取的位点-已读位点 大于等于 8,表示读取到同步请求的偏移位点信息
            if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
              // slave上报的最大偏移量占8个子节,正常情况下byteBufferRead的position是8的倍数,但是不能确定出现粘包情况的出现
              // 所以pos重新计算,规避该情况的出现,保证获取的位置是正确的
              // 获取Slave请求同步的最新偏移位点信息
              int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
              long readOffset = this.byteBufferRead.getLong(pos - 8);
              // 设置已读位点 = pos
              this.processPosition = pos;
              // 设置slave应答同步完成的偏移位点
              HAConnection.this.slaveAckOffset = readOffset;
              // 设置slave请求同步开始的偏移位点
              HAConnection.this.slaveRequestOffset = readOffset;
              if (HAConnection.this.slaveRequestOffset < 0) {
                log.info("slave["   HAConnection.this.clientAddr   "] request offset "   readOffset);
              }
              // 比较更新push2SlaveMaxOffset,并唤醒GroupTransferService服务
              // GroupTransferService服务则比较push2SlaveMaxOffset判断是否完成同步复制返回future结果
              HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
            }
            // 连续读取三次 0 则结束本次读取,返回true
          } else if (readSize == 0) {
            if (  readSizeZeroTimes >= 3) {
              break;
            }
            // 读取到负数,发生异常,关闭Slave连接
          } else {
            log.error("read socket["   HAConnection.this.clientAddr   "] < 0");
            return false;
          }
        } catch (IOException e) {
          log.error("processReadEvent exception", e);
          return false;
        }
      }

      return true;
    }
    }
学新通

因为slave上报最大偏移量时,占用8个字节,byteBufferRead的长度为1M的长度,肯定能存储完整的上报请求包,所以不会出现不完整的包,直接重置byteBufferRead的position。

而且master获取slave请求的上报位点总是获取读取到的最新的请求位点:int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);,读取到的salve请求位点会被当做slave存储的数据也是持久化成功的确认位置slaveAckOffset和slave需要同步消息的开始位置slaveRequestOffsetWriteSocketService服务则根据slaveRequestOffset同步消息。

当master读取到slave的请求上报位点后,就会调用notifyTransferSome方法,尝试唤醒可能在同步等待的GroupTransferService服务确认同步结果。

    public void notifyTransferSome(final long offset) {
        // 比较更新push2SlaveMaxOffset
        // 循环的原因是存在多个Slave,可能会导致push2SlaveMaxOffset的compareAndSet失败,尝试重新获取value再次比较更新
        for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
            boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
            if (ok) {
                // 尝试唤醒groupTransferService
                this.groupTransferService.notifyTransferSome();
                break;
            } else {
                value = this.push2SlaveMaxOffset.get();
            }
        }
    }

因为一个master同时保持多个slave连接,并且每个slave上报的偏移量可能都不太一致,所以该放入通过原子长整型对象push2SlaveMaxOffset,来保证原子性。当offset确实大于了value值,并且push2SlaveMaxOffset更新成功,那么才能执行groupTransferService.notifyTransferSome();方法。如果更新失败,那么就会重新获取push2SlaveMaxOffset再进行比较更新。

而唤醒GroupTransferService服务则可以让master存储消息后同步消息等待同步结果:详情看 Broker存储机制详解.md

WriteSocketService:Master根据HAConnection中保存的Slave同步请求,从CommitLog中查询数据,并发送给Slave。

源码解析可以分为以下几步

  • 准备阶段

    slaveRequestOffset等于-1说明slave还没有上报请求偏移量,master无法确定同步消息的偏移量,无法同步所以等待10ms后再尝试同步。

    nextTransferFromWhere等于-1表示master还没同步消息,需要确认slaveRequestOffset,如果slaveRequestOffset等于0表示slave还没同步过的新服务,那么master只会从最新的MappedFile开头位置开始同步消息。如果slaveRequestOffset大于0表示ReadSocketService服务有接收到Slave的上报请求,master会以slaveRequestOffset作为同步的消息开始位点。

        // slave请求的同步偏移量 = -1 表示Slave没有上报位点信息,等待10ms继续下次
        if (-1 == HAConnection.this.slaveRequestOffset) {
          Thread.sleep(10);
          continue;
        }
    		// 等于-1表示master还没有推送过,需要确认slaveRequestOffset
        if (-1 == this.nextTransferFromWhere) {
          // slave请求的同步偏移量 = 0 表示之前没同步过,salve是新服务
          // 则尝试同步最新的MappedFile的消息或者从0同步
          if (0 == HAConnection.this.slaveRequestOffset) {
            // 获取Master CommitLog最大偏移量所在的MappedFile的开始偏移量
            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
            masterOffset =
              masterOffset
              - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                 .getMappedFileSizeCommitLog());
    
            if (masterOffset < 0) {
              masterOffset = 0;
            }
            // 设置下次同步的开始偏移量为0或者CommitLog最大偏移量所在的MappedFile的开始偏移量
            this.nextTransferFromWhere = masterOffset;
          } else {
            // slaveRequestOffset不为0
            // 设置下次同步的开始偏移量为slave请求的同步偏移量
            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
          }
    
          log.info("master transfer data from "   this.nextTransferFromWhere   " to slave["   HAConnection.this.clientAddr
                     "], and slave request "   HAConnection.this.slaveRequestOffset);
        }
    
    学新通
  • 同步数据

    需要知道的是:master同步给slave的消息按照消息头(8byte 消息物理偏移量 4byte 消息体大小) 消息体格式且分开同步给slave,所以slave会先读取到消息头信息,然后再读取到消息体信息。

    lastWriteOver表示是否同步完完整的消息头或者消息(消息头 消息体),只有lastWriteOver = true时表示同步完了,才会继续读取新的消息或者没有消息则等待100ms。

    这里我们可以看到同步了消息体大小为0的消息头,且没有消息体信息,作用就是为了维持master与slave心跳长连接,当消息体大小为0时,slave不会写入任何数据,避免长连接由于空闲被关闭。

    传输完一条消息头或者消息数据,就可以根据nextTransferFromWhere获取最大32K的消息内容,并设置消息头和消息体调用transferData同步给Slaver。如果没有消息,则等待100ms后再同步。

        // 传输完完整消息头或者消息(消息头 消息体)
        if (this.lastWriteOver) {
          // 当前距离最近一次Slave请求同步Master的时间间隔
          long interval =
            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
          // interval > 发送心跳的时间间隔默认5s
          if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
              .getHaSendHeartbeatInterval()) {
    
            // Build Header
            // 发送消息物理位置为nextTransferFromWhere 消息体大小size = 0的消息头信息,用于维持Slave长连接
            this.byteBufferHeader.position(0);
            this.byteBufferHeader.limit(headerSize);
            this.byteBufferHeader.putLong(this.nextTransferFromWhere);
            this.byteBufferHeader.putInt(0);
            this.byteBufferHeader.flip();
            // 传输数据
            this.lastWriteOver = this.transferData();
            if (!this.lastWriteOver)
              continue;
          }
          // Master
        } else {
          // 没传输完完整的,则继续传输
          this.lastWriteOver = this.transferData();
          if (!this.lastWriteOver)
            continue;
        }
        // 获取nextTransferFromWhere对应的MappedFile的记录的消息内容
        SelectMappedBufferResult selectResult =
          HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
        if (selectResult != null) {
          // 默认每次同步数据最大 1024 * 32 即32K
          int size = selectResult.getSize();
          if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
          }
    
          long thisOffset = this.nextTransferFromWhere;
          // 更新下次同步的偏移位点
          this.nextTransferFromWhere  = size;
          // 设置消息体
          selectResult.getByteBuffer().limit(size);
          this.selectMappedBufferResult = selectResult;
          // 设置消息头
          // Build Header
          this.byteBufferHeader.position(0);
          this.byteBufferHeader.limit(headerSize);
          this.byteBufferHeader.putLong(thisOffset);
          this.byteBufferHeader.putInt(size);
          this.byteBufferHeader.flip();
          // 立即同步消息
          this.lastWriteOver = this.transferData();
        } else {
          // 没有消息则等待100ms
          HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
        }
    
    学新通

    tansferData方法主要逻辑就是首先同步消息头,保证同步完消息头之后,才能同步消息内容且保证消息内容同步完全。有个注意点,就是将selectMappedBufferResult进行release释放。因为从commitLog中获取selectMappedBufferResult时,标记为使用状态。如果用完,需要主动释放。假设此次没有写完时,在下个循环会继续执行。

        private boolean transferData() throws Exception {
          // 写入大小为0的次数
          int writeSizeZeroTimes = 0;
          // Write Header Slave
          // byteBufferHeader存在字节信息
          while (this.byteBufferHeader.hasRemaining()) {
            // 传输消息头
            int writeSize = this.socketChannel.write(this.byteBufferHeader);
            // writeSize > 0 表示传输数据成功
            if (writeSize > 0) {
              writeSizeZeroTimes = 0;
              // 更新Salve最近请求Master的时间戳
              this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
              // writeSize == 0 出现3次退出本次请求
            } else if (writeSize == 0) {
              if (  writeSizeZeroTimes >= 3) {
                break;
              }
            } else {
              throw new Exception("ha master write header error < 0");
            }
          }
          // selectMappedBufferResult=null表示没有消息要同步,只需要同步消息头
          // 如果byteBufferHeader有剩余则直接跳过后续进入下次循环进来直到传输完所有消息头数据
          // 如果byteBufferHeader没有剩余字节则进行后续同步新的消息
          if (null == this.selectMappedBufferResult) {
            return !this.byteBufferHeader.hasRemaining();
          }
    
          writeSizeZeroTimes = 0;
    
          // Write Body Master
          // 如果byteBufferHeader中的数据传输完,且selectMappedBufferResult有消息要同步
          if (!this.byteBufferHeader.hasRemaining()) {
            // 传输完所有的消息体信息
            while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
              // 向Slave写入同步的消息
              int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
              // 同上
              if (writeSize > 0) {
                writeSizeZeroTimes = 0;
                this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
              } else if (writeSize == 0) {
                if (  writeSizeZeroTimes >= 3) {
                  break;
                }
              } else {
                throw new Exception("ha master write body error < 0");
              }
            }
          }
          // result = true表示消息头和消息体都已经同步完,则进行后续同步新的消息
          // 如果任一个一个地个没同步完,返回false,则会跳过后续,直到同步完为止
          boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
          // 发送完CommitLog同步的消息,释放对应mappedFile
          if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
            this.selectMappedBufferResult.release();
            this.selectMappedBufferResult = null;
          }
    
          return result;
        }
    
    学新通

    从上面可以知道master规定了每次同步消息内容最大32K,假设slave是全新的服务器或者长时间没有同步,导致slave与master服务的最大偏移量差会很大。那么每次推送的数据中肯定都不是非常完整的消息内容了。所以只要偏差小于32k,基本上就能推送完整的消息了。

Master -> Slave

HAClient:Slave接收Master的同步消息数据,并将这些数据保存在CommitLog中。并且适时地给Master上报本地同步的最大偏移位点。

HAClient服务在HAService中创建并启动,HAClient会执行run方法,接收Master同步消息并上报本地offset,源码解析如下:

    @Override
    public void run() {
      log.info(this.getServiceName()   " service started");

      while (!this.isStopped()) {
        try {
          // 1、尝试与Master建立连接
          if (this.connectMaster()) {
            // 2、是否到时间上报请求位点,默认每隔5s
            if (this.isTimeToReportOffset()) {
              // Slave上报当前CommitLog最大的偏移位点
              boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
              // 上报3次后仍然没有上报成功(发生异常或者没有发送完所有的子节信息)
              if (!result) {
                // 关闭当前master连接
                this.closeMaster();
              }
            }

            this.selector.select(1000);
            // 4、读取Master传输的数据
            boolean ok = this.processReadEvent();
            if (!ok) {
              // 关闭Master连接
              this.closeMaster();
            }
            // 尝试上报当前Slave当前CommitLog最大的偏移位点
            // 失败直接下次循环,因为里面已经关闭了master连接
            if (!reportSlaveMaxOffsetPlus()) {
              continue;
            }
            // 超过同步连接存活时间,关闭master连接
            long interval =
              HAService.this.getDefaultMessageStore().getSystemClock().now()
              - this.lastWriteTimestamp;
            if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                .getHaHousekeepingInterval()) {
              log.warn("HAClient, housekeeping, found this connection["   this.masterAddress
                         "] expired, "   interval);
              this.closeMaster();
              log.warn("HAClient, master not response some time, so close connection");
            }
          } else {
            // 与master连接失败等待5s
            this.waitForRunning(1000 * 5);
          }
        } catch (Exception e) {
          // 发生异常等待5s
          log.warn(this.getServiceName()   " service has exception. ", e);
          this.waitForRunning(1000 * 5);
        }
      }

      log.info(this.getServiceName()   " service end");
    }
学新通

大致分为以下几步:

  1. 首先尝试建立与Master的Socket连接,如果过建立成功,注册一个读OP_READ的SelectionKey,然后设置当前Slave上报的请求位点currentReportedOffset为Slave的当前CommitLog最大的偏移位点与最近同步消息的时间lastWriteTimestamp
    private boolean connectMaster() throws ClosedChannelException {
      if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
          // 建立master socket连接
          SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
          if (socketAddress != null) {
            this.socketChannel = RemotingUtil.connect(socketAddress);
            if (this.socketChannel != null) {
              this.socketChannel.register(this.selector, SelectionKey.OP_READ);
            }
          }
        }
        // 设置当前Slave上报的请求位点为Slave的当前CommitLog最大的偏移位点
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
        // 最近同步消息的时间
        this.lastWriteTimestamp = System.currentTimeMillis();
      }

      return this.socketChannel != null;
    }
学新通

这里的masterAddress是通过以下两种方式进行更新的:

  • BrokerController初始化方法initialize中,如果当前Broker是Slave,则会从配置信息中获取配置的Master Broker地址设置HAClientmasterAddress
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
      if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
      } else {
        this.updateMasterHAServerAddrPeriodically = true;
      }
  • 当启动Broker进行Broker注册、注册Broker信息的定时任务、Broker角色变更(变成Slave或者Master)、更新Broker配置、自动创建了新的Topic时,都会执行registerBrokerAll方法注册Broker信息,在registerBrokerAll方法出现Broker信息不一致,或者需要强制注册Broker 信息时都会尝试去更新HAClient的master address
    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
      this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
    }

    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
  1. 判断是否到达上报最大请求位点的时间,默认是每隔5s上报,达到时间则将当前最大的上报位放进reposrtOffset缓存中,并将其写入连接Master的socketChannel,更新上报时间lastWriteTimestamp。如果上报失败,则关闭master连接,重置记录的参数

如果currentReportedOffset=0则表示Slave没有上报过请求位点

    private boolean isTimeToReportOffset() {
      // 判断距离上次同步消息的时间是否大于同步心跳时间
      long interval =
        HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
      boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
        .getHaSendHeartbeatInterval();

      return needHeart;
    }

    private boolean reportSlaveMaxOffset(final long maxOffset) {
      this.reportOffset.position(0);
      this.reportOffset.limit(8);
      // 8byte
      this.reportOffset.putLong(maxOffset);
      // 重设position、limit便于写入信息于Socket通道
      this.reportOffset.position(0);
      this.reportOffset.limit(8);
      // 尝试将同步请求的位点信息传输给master,最多重试2次
      for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i  ) {
        try {
          this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
          log.error(this.getServiceName()
                      "reportSlaveMaxOffset this.socketChannel.write exception", e);
          return false;
        }
      }
      // 更新最近的上报时间
      lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
      return !this.reportOffset.hasRemaining();
    }

    private void closeMaster() {
      if (null != this.socketChannel) {
        try {

          SelectionKey sk = this.socketChannel.keyFor(this.selector);
          if (sk != null) {
            sk.cancel();
          }

          this.socketChannel.close();

          this.socketChannel = null;
        } catch (IOException e) {
          log.warn("closeMaster exception. ", e);
        }

        this.lastWriteTimestamp = 0;
        this.dispatchPosition = 0;

        this.byteBufferBackup.position(0);
        this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);

        this.byteBufferRead.position(0);
        this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
      }
    }
学新通
  1. 执行processReadEvent方法,接收Master传输的消息数据,并将消息写入CommitLog中。

    processReadEvent方法中,根据byteBufferRead是否还有可写入空间判断循环读入Master传输过来的数据,如果读入的数据大小大于0表示有数据读入,则执行dispatchReadRequest进行解析。如果连续读入3次0表明读取到了末尾,退出循环。

    进到dispatchReadRequest方法,通过记录dispatchPosition分配位点,计算byteBufferReaddispatchPosition的差值diff,当diff大于等于msgHeaderSize(8位CommitLog物理偏移量 4位消息大小)时,表示读取到了一个消息的消息头,通过继续这个消息的消息头,我们可以知道这个消息在Master CommitLog的物理偏移量和消息的大小,比较slave当前最大的CommitLog偏移量和消息的偏移量,不相等则表明出错,停止读取信息,关闭master连接,这样的目的是保证slave与master数据同步一致性,保证不会丢失数据。

    如果diff还大于消息头大小 消息体大小,表示读取到整条消息数据,解析消息数据,保存到CommitLog中。并且唤醒reputMessageService服务,生成新添加的消息对应的索引。

    完成消息存储后,更新dispatchPosition为下一个消息的开始位点。并且执行reportSlaveMaxOffsetPlus方法,判断当前Slave CommitLog最大的物理偏移量如果大于当前上报请求的偏移量(这里表示成功存储一条数据)则更新当前请求上报的偏移量并上报给Master,如果上报失败则关闭master连接。

    执行完processReadEvent方法后,程序也会立即进行上报保持心跳连接,如果距离上一次上报时间相隔haHousekeepingInterval(默认20s)时间,则关闭master连接

    private boolean processReadEvent() {
      // 连续读取到0的次数
      int readSizeZeroTimes = 0;
      // byteBufferRead是否还有写入数据的空间
      while (this.byteBufferRead.hasRemaining()) {
        try {
          // 从Master读取接收字节信息到byteBufferRead中
          int readSize = this.socketChannel.read(this.byteBufferRead);
          if (readSize > 0) {
            readSizeZeroTimes = 0;
            // 分派读取CommitLog信息
            boolean result = this.dispatchReadRequest();
            if (!result) {
              log.error("HAClient, dispatchReadRequest error");
              return false;
            }
          } else if (readSize == 0) {
            if (  readSizeZeroTimes >= 3) {
              break;
            }
          } else {
            log.info("HAClient, processReadEvent read socket < 0");
            return false;
          }
        } catch (IOException e) {
          log.info("HAClient, processReadEvent read socket exception", e);
          return false;
        }
      }

      return true;
    }

    private boolean dispatchReadRequest() {
      // 消息header头的大小
      final int msgHeaderSize = 8   4; // phyoffset   size

      while (true) {
        // 当前读入的消息位点与已分派的位点差
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        // diff大于等于msgHeaderSize 表示读取到了消息头信息
        // 小于msgHeaderSize表示读取完byteBufferRead缓存中的所有消息头
        if (diff >= msgHeaderSize) {
          // 获取Master消息在CommitLog的物理偏移位点
          long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
          // 获取消息大小
          int bodySize = this.byteBufferRead.getInt(this.dispatchPosition   8);
          // 获取Slave当前CommitLog存储的最大物理偏移位点
          long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
          // 如果slave最大偏移量和master的推送的消息的偏移量不一致,返回false
          if (slavePhyOffset != 0) {
            if (slavePhyOffset != masterPhyOffset) {
              log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                          slavePhyOffset   " MASTER: "   masterPhyOffset);
              return false;
            }
          }
          // 如果diff大于等于msgHeaderSize   bodySize 表示读取到了整个消息内容
          if (diff >= (msgHeaderSize   bodySize)) {
            byte[] bodyData = byteBufferRead.array();
            // 消息内容开始的数组位点
            int dataStart = this.dispatchPosition   msgHeaderSize;
            // 添加进Slave CommitLog
            HAService.this.defaultMessageStore.appendToCommitLog(
              masterPhyOffset, bodyData, dataStart, bodySize);
            // 更新下次分派的位点
            this.dispatchPosition  = msgHeaderSize   bodySize;
            // 如果存储完消息后最大的CommitLog偏移量已经大于slave上报的偏移量,立即上报offset
            if (!reportSlaveMaxOffsetPlus()) {
              return false;
            }
            // 继续读取下一条同步的消息
            continue;
          }
        }
        // 如果byteBufferRead中都是完整的消息且都已读取完
        if (!this.byteBufferRead.hasRemaining()) {
          // 交换byteBufferBackup和byteBufferRead,这样byteBufferRead可以继续下次读
          this.reallocateByteBuffer();
        }

        break;
      }

      return true;
    }

    public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
            return false;
        }
        // 将data添加到fileChannel中
        boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength);
        if (result) {
            // 唤醒reputMessageService服务,生成新添加的消息对应的索引
            this.reputMessageService.wakeup();
        } else {
            log.error("appendToPhyQueue failed "   startOffset   " "   data.length);
        }

        return result;
    }

    public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
        putMessageLock.lock();
        try {
            // 根据startOffset获取最新的MappedFile或者创建一个新的MappedFile
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
            if (null == mappedFile) {
                log.error("appendData getLastMappedFile error  "   startOffset);
                return false;
            }
            // 写入消息
            return mappedFile.appendMessage(data, dataStart, dataLength);
        } finally {
            putMessageLock.unlock();
        }
    }

    private boolean reportSlaveMaxOffsetPlus() {
      boolean result = true;
      // 当前Slave CommitLog最大物理偏移量
      long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
      // 大于当前Slave上报的偏移量,更新Slave上报偏移量值,并再次上报
      if (currentPhyOffset > this.currentReportedOffset) {
        this.currentReportedOffset = currentPhyOffset;
        // 立即再次上报
        result = this.reportSlaveMaxOffset(this.currentReportedOffset);
        // 上报失败则关闭master连接
        if (!result) {
          this.closeMaster();
          log.error("HAClient, reportSlaveMaxOffset error, "   this.currentReportedOffset);
        }
      }

      return result;
    }

    private void reallocateByteBuffer() {
      int remain = READ_MAX_BUFFER_SIZE - this.dispatchPosition;
      // 如果byteBufferRead存在剩余没使用的空间
      if (remain > 0) {
        this.byteBufferRead.position(this.dispatchPosition);

        this.byteBufferBackup.position(0);
        this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
        // byteBufferBackup写入 byteBufferRead dispatchPosition 到 READ_MAX_BUFFER_SIZE的数据
        // byteBufferRead 的position会增加remain
        this.byteBufferBackup.put(this.byteBufferRead);
      }
      // byteBufferRead 与 byteBufferBackup交换
      this.swapByteBuffer();
      // 交换后。byteBufferRead的position应该是remain
      this.byteBufferRead.position(remain);
      this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
      // 重置分派的位点为0
      this.dispatchPosition = 0;
    }
学新通

byteBufferRead最为存储通信数据的载体,长度为4M。在目前数据获取时,byteBufferRead是一直在增加,即pos在增加,即有读取到新得数据,byteBufferRead的pos一直变大,直到limit=pos时无法添加内容。为保证数据持续获取,肯定需要将byteBufferRead重置,然后再读取。所以才会有reallocateByteBuffer方法。

当byteBufferRead数据存满,但是仍然会出现最后一个数据包是不完整的。肯定需要将最后一段数据进行保存起来,下次继续使用。在这里,他采用了byteBufferBackup一个备份的字节缓存。remain是指一个数据包的部分数据长度。当remain大于0,说明包不完整,就会将byteBufferRead剩余部分复制给重置后的byteBufferBackup,此时byteBufferBackup是存在数据,并且当前pos为remain。swapByteBuffer方法就是将byteBufferRead和byteBufferBackup执行对象互相交换,即现在的byteBufferRead就是原来的byteBufferBackup对象。然后又重置了byteBufferRead的limit值,并且dispatchPosition的位置也变成0了。下次byteBufferRead再从socketChannel中读取的位置就是从remain开始了。

同步复制

同步复制和异步复制的区别只在于存储消息后,同步消息到Slave时,根据BrokerRole判断进行同步还是异步复制。

如果是BrokerRole.SYNC_MASTER则表示同步复制,创建GroupCommitRequest请求对象,放到GroupTransferService服务中requestsWrite列表中,等待获取PutMessageSpinLock锁将请求转换到requestsRead列表中。GroupTransferService服务会根据requestsRead中的请求在同步超时时间内(默认5s)中循环判断当前master记录的多个Slave中最大的同步偏移量push2SlaveMaxOffset(该值通过master的每个Slave连接的ReadSocketService服务上报每个Slave同步最大位点,比较并只设置当前同步最大的)是否大于请求对象的同步位置nextOffset,如果大于则直接返回同步成功,返回future结果PutMessageStatus.PUT_OK结束future的同步阻塞;如果超时则返回PutMessageStatus.FLUSH_SLAVE_TIMEOUT结束future同步阻塞,超时返回不会阻断消息存储后续流程,只会打印报错返回同步结果。并且会通过调用WaitNotifyObject().wakeupAll()方法主动尝试唤醒可能在等待的Master的多个Slave连接的WriteSocketService服务线程,让各个Slave连接的WriteSocketService复制消息服务复制消息给各个Slave。

而如果是异步复制,则submitReplicaRequest方法则不会管复制结果,默认复制成功,直接返回PutMessageStatus.PUT_OK,至于何时复制给Slave,决定于各个Slave连接的WriteSocketService服务线程是否处于等待过程中(最大100ms)还是在运行中。

部分相关源码处于存储消息的方法中:org.apache.rocketmq.store.CommitLog#asyncPutMessage方法中:

        // 复制操作
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
                    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
                }
            }
            return putMessageResult;
        });

通过调用submitReplicaRequest方法实现上述流程逻辑:

    public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
        // brokerRole = BrokerRole.SYNC_MASTER 表示同步复制
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            // 是否等待消息存储完成,默认true
            if (messageExt.isWaitStoreMsgOK()) {
                // wroteOffset 表示消息开始写入的位置,wroteBytes 表示消息字节长度
                // 判断Slave是否可连接
                if (service.isSlaveOK(result.getWroteBytes()   result.getWroteOffset())) {
                    // syncFlushTimeout 同步刷盘超时时间,默认5s
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset()   result.getWroteBytes(),
                            this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    // 激活GroupTransferService线程,用于异步指定时间内循环判断已经同步的所有Slave中最大的偏移量是否大于request中nextOffset(CommitLog最大偏移量)
                    service.putRequest(request);
                    // 一个Master Broker可以配置多个Slave,当需要同步数据时,通过wakeupAll来唤醒全部的Slave复制线程
                    service.getWaitNotifyObject().wakeupAll();
                    return request.future();
                } else {
                    return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
学新通

GroupTransferService服务负责在规定时间内监听push2SlaveMaxOffset是否大于req.getNextOffset(),返回future结果停止future的阻塞等待。

    class GroupTransferService extends ServiceThread {

        private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
        private final PutMessageSpinLock lock = new PutMessageSpinLock();
        private volatile LinkedList<CommitLog.GroupCommitRequest> requestsWrite = new LinkedList<>();
        private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();

        public void putRequest(final CommitLog.GroupCommitRequest request) {
            lock.lock();
            try {
                this.requestsWrite.add(request);
            } finally {
                lock.unlock();
            }
            // 激活当前GroupTransferService线程
            this.wakeup();
        }

        public void notifyTransferSome() {
            this.notifyTransferObject.wakeup();
        }

        private void swapRequests() {
            lock.lock();
            try {
                LinkedList<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
                this.requestsWrite = this.requestsRead;
                this.requestsRead = tmp;
            } finally {
                lock.unlock();
            }
        }

        private void doWaitTransfer() {
            if (!this.requestsRead.isEmpty()) {
                // 遍历requestsRead
                for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                    // push2SlaveMaxOffset表示所有Slave同步的最大偏移量 >= Master CommitLog下次写入的开始偏移量表示同步成功
                    boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                    // 等待同步刷盘超时的时间戳
                    long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
                              HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
                    while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                        // CAS尝试获取锁
                        this.notifyTransferObject.waitForRunning(1000);
                        // 是否已经同步成功
                        transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                    }

                    if (!transferOK) {
                        log.warn("transfer messsage to slave timeout, "   req.getNextOffset());
                    }
                    // future任务完成并设置完成状态
                    req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                }

                this.requestsRead = new LinkedList<>();
            }
        }

        public void run() {
            log.info(this.getServiceName()   " service started");

            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10);
                    this.doWaitTransfer();
                } catch (Exception e) {
                    log.warn(this.getServiceName()   " service has exception. ", e);
                }
            }

            log.info(this.getServiceName()   " service end");
        }

        @Override
        protected void onWaitEnd() {
            this.swapRequests();
        }

        @Override
        public String getServiceName() {
            return GroupTransferService.class.getSimpleName();
        }
    }
学新通

从上面可以知道:

  1. 同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
  2. 异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量。

当slave复制超时,如果刷盘成功,则不会丢失消息,否则在还没完成刷盘前master宕机,就会有消息丢失。

而且RocketMQ主从同步复制中,只确保至少一个Slave同步了消息(只要Master的多个Slave中最大的同步偏移量大于给定的偏移量)

参考文档

  • rocketMQ HA实现

  • RocketMQ分布式消息中间件核心原理与最佳实践 —— 李伟 著

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiaecee
系列文章
更多 icon
同类精品
更多 icon
继续加载