Broker主从同步
主从同步概述
众所周知,Broker中有两种角色:Master和Slave。Master主要用于处理生产者、消费者的请求和存储数据,Slave从Broker中同步所有的数据到本地,具体体现以下两个作用:
- 保证Broker服务高可用:当Broker宕机时,消费者可以通过连接Slave继续消费,这样可以保证服务的高可用
- 提高服务性能:消费者从Master Broker拉取消息时,发现拉取消息的offset和CommitLog的物理offset相差太多,会转向Slave拉取消息,这样可以减轻Master的压力,从而提高性能。
Broker同步数据的方式有两种:
- 同步复制:指客户端发送消息到Master,Master将消息同步复制到Slave的过程,可以通过设置参数
brokerRole = BrokerRole.SYNC_MASTER
来实现,可靠性高但效率比较低。 - 异步复制:指客户端发送消息到Master,再由异步线程HAService异步同步到Slvae的过程,可以通过设置参数
brokerRole = BrokerRole.ASYNC_MASTER
来实现,效率非常高,但是可靠性比同步复制差。
Broker同步的数据有两种:配置数据和消息数据。配置数据主要包含Topic配置、消费者位点信息,延迟消息位点信息、订阅关系配置等。
主从同步流程
名词解释
服务名 | 功能 |
---|---|
SlaveSynchronize | Slave从Master同步配置数据的服务 |
HAService | Slave从Master同步CommitLog数据 |
HAConnection | Slave连接信息 |
HAConnection.WriteSocketService | 将CommitLog写入网络,发送给Slave |
HAConnection.ReadSocketService | 读取Slave发送的offset请求 |
HAClient | Slave处理与Master通信的客户端封装 |
GroupTransferService | 主从同步通知类,实现同步、异步复制提供新数据通知服务 |
AcceptSocketService | Master接受Slave发送的上报offset请求的服务 |
配置数据同步流程
配置数据的同步包含4中类型:Topic配置(TopicConfigManager)、消费者位点(ConsumerOffsetManager)、延迟位点(ScheduleMessageService)、订阅关系配置(SubscriptionGroupManager),它们都继承自ConfigManager抽象类。
当Slave Broker(brokerRole = BrokerRole.SLAVE
)启动时,会初始化SlaveSynchronize方法,调用org.apache.rocketmq.broker.BrokerController#handleSlaveSynchronize方法,每10s调用一次org.apache.rocketmq.broker.slave.SlaveSynchronize.syncAll方法:
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
// 取消正在运行的slave同步任务,不强制中断
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
syncAll方法一次调用四种配置数据(Topic配置、消费者位点、同步延迟位点、订阅关系配置)的同步方法同步全量数据:
public void syncAll() {
// 同步Topic配置
this.syncTopicConfig();
// 同步消费者位点
this.syncConsumerOffset();
// 同步延迟位点
this.syncDelayOffset();
// 同步订阅关系配置
this.syncSubscriptionGroupConfig();
}
syncAll方法执行的4个方法都是通过调用BrokerOuterAPI中的方法,根据RequestCode.GET_ALL_TOPIC_CONFIG、RequestCode.GET_ALL_CONSUMER_OFFSET、RequestCode.GET_ALL_DELAY_OFFSET、RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG
使用Remoting模块远程调用Master Broker,而Master Broker初始化过程中注册了org.apache.rocketmq.broker.processor.AdminBrokerProcessor处理器接受对应的请求,根据RequestCode
的值进行不同方法的调用(四种配置数据信息获取的方法很简单,不在赘述)。
Slave根据上面说的流程获取到Master中的配置数据信息同步持久化到磁盘中。
CommitLog数据同步流程
CommitLog的数据同步分为同步复制和异步复制两种。同步复制是生产者生产消息后,等待Master Broker将数据同步到Slave Broker后,再返回生产者数据存储状态;异步复制是生产者在生产消息后,不用等待Slave同步,直接返回Master存储结果。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oWDHoVxA-1649378441486)(C:\Users\liaozhirong\Desktop\learning\Images\webp.webp)]
异步复制
Master Broker启动时,会初始化和启动DefaultMessageStore存储服务,而DefaultMessageStore存储服务会初始化org.apache.rocketmq.store.ha.HAService服务,HAService服务会启动org.apache.rocketmq.store.ha.HAService。AcceptSocketService服务,监听10912端口,用于接收来自一个或多个Slave的注册请求,当有Slave注册请求进来时,会创建一个HAConnection,同时HAConnection会创建WriteSocketService和ReadSocketService服务并启动,开始主从数据同步。
Slave -> Master
ReadSocketService:Master接收Slave同步数据的请求,并将这些信息保存在HAConnection中。
public void run() {
HAConnection.log.info(this.getServiceName() " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 接收Slave的上报请求
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
// 判断slave心跳时间是否超过同步存活时间,表示Slave不可用(可能宕机或者网络不可用)
// 终止获取Slave的上报请求循环,删除Slave的连接
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" HAConnection.this.clientAddr "] expired, " interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() " service has exception.", e);
break;
}
}
// 暂停该Slave的read、write服务,并移除该Slave连接
this.makeStop();
writeSocketService.makeStop();
haService.removeConnection(HAConnection.this);
HAConnection.this.haService.getConnectionCount().decrementAndGet();
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() " service end");
}
private boolean processReadEvent() {
// 读取到0的次数
int readSizeZeroTimes = 0;
// 读取到最大子节,flip重置position为0,processPosition=0
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}
// byteBufferRead是否可以还可以写入数据
while (this.byteBufferRead.hasRemaining()) {
try {
// 读取数据到缓存中
int readSize = this.socketChannel.read(this.byteBufferRead);
// readSize>0表示读取到子节数据
if (readSize > 0) {
// 设置读取到0的次数为0、最近读取数据的时间
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 读取的位点-已读位点 大于等于 8,表示读取到同步请求的偏移位点信息
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// slave上报的最大偏移量占8个子节,正常情况下byteBufferRead的position是8的倍数,但是不能确定出现粘包情况的出现
// 所以pos重新计算,规避该情况的出现,保证获取的位置是正确的
// 获取Slave请求同步的最新偏移位点信息
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
// 设置已读位点 = pos
this.processPosition = pos;
// 设置slave应答同步完成的偏移位点
HAConnection.this.slaveAckOffset = readOffset;
// 设置slave请求同步开始的偏移位点
HAConnection.this.slaveRequestOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
log.info("slave[" HAConnection.this.clientAddr "] request offset " readOffset);
}
// 比较更新push2SlaveMaxOffset,并唤醒GroupTransferService服务
// GroupTransferService服务则比较push2SlaveMaxOffset判断是否完成同步复制返回future结果
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
// 连续读取三次 0 则结束本次读取,返回true
} else if (readSize == 0) {
if ( readSizeZeroTimes >= 3) {
break;
}
// 读取到负数,发生异常,关闭Slave连接
} else {
log.error("read socket[" HAConnection.this.clientAddr "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
}
因为slave上报最大偏移量时,占用8个字节,byteBufferRead
的长度为1M的长度,肯定能存储完整的上报请求包,所以不会出现不完整的包,直接重置byteBufferRead
的position。
而且master获取slave请求的上报位点总是获取读取到的最新的请求位点:int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);,读取到的salve请求位点会被当做slave存储的数据也是持久化成功的确认位置slaveAckOffset
和slave需要同步消息的开始位置slaveRequestOffset
。WriteSocketService服务则根据slaveRequestOffset
同步消息。
当master读取到slave的请求上报位点后,就会调用notifyTransferSome方法,尝试唤醒可能在同步等待的GroupTransferService服务确认同步结果。
public void notifyTransferSome(final long offset) {
// 比较更新push2SlaveMaxOffset
// 循环的原因是存在多个Slave,可能会导致push2SlaveMaxOffset的compareAndSet失败,尝试重新获取value再次比较更新
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
// 尝试唤醒groupTransferService
this.groupTransferService.notifyTransferSome();
break;
} else {
value = this.push2SlaveMaxOffset.get();
}
}
}
因为一个master同时保持多个slave连接,并且每个slave上报的偏移量可能都不太一致,所以该放入通过原子长整型对象push2SlaveMaxOffset
,来保证原子性。当offset确实大于了value值,并且push2SlaveMaxOffset
更新成功,那么才能执行groupTransferService.notifyTransferSome();方法。如果更新失败,那么就会重新获取push2SlaveMaxOffset
再进行比较更新。
而唤醒GroupTransferService服务则可以让master存储消息后同步消息等待同步结果:详情看 Broker存储机制详解.md
WriteSocketService:Master根据HAConnection中保存的Slave同步请求,从CommitLog中查询数据,并发送给Slave。
源码解析可以分为以下几步
-
准备阶段
slaveRequestOffset
等于-1说明slave还没有上报请求偏移量,master无法确定同步消息的偏移量,无法同步所以等待10ms后再尝试同步。nextTransferFromWhere
等于-1表示master还没同步消息,需要确认slaveRequestOffset
,如果slaveRequestOffset
等于0表示slave还没同步过的新服务,那么master只会从最新的MappedFile开头位置开始同步消息。如果slaveRequestOffset
大于0表示ReadSocketService服务有接收到Slave的上报请求,master会以slaveRequestOffset
作为同步的消息开始位点。// slave请求的同步偏移量 = -1 表示Slave没有上报位点信息,等待10ms继续下次 if (-1 == HAConnection.this.slaveRequestOffset) { Thread.sleep(10); continue; } // 等于-1表示master还没有推送过,需要确认slaveRequestOffset if (-1 == this.nextTransferFromWhere) { // slave请求的同步偏移量 = 0 表示之前没同步过,salve是新服务 // 则尝试同步最新的MappedFile的消息或者从0同步 if (0 == HAConnection.this.slaveRequestOffset) { // 获取Master CommitLog最大偏移量所在的MappedFile的开始偏移量 long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() .getMappedFileSizeCommitLog()); if (masterOffset < 0) { masterOffset = 0; } // 设置下次同步的开始偏移量为0或者CommitLog最大偏移量所在的MappedFile的开始偏移量 this.nextTransferFromWhere = masterOffset; } else { // slaveRequestOffset不为0 // 设置下次同步的开始偏移量为slave请求的同步偏移量 this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; } log.info("master transfer data from " this.nextTransferFromWhere " to slave[" HAConnection.this.clientAddr "], and slave request " HAConnection.this.slaveRequestOffset); }
-
同步数据
需要知道的是:master同步给slave的消息按照消息头(8byte 消息物理偏移量 4byte 消息体大小) 消息体格式且分开同步给slave,所以slave会先读取到消息头信息,然后再读取到消息体信息。
而
lastWriteOver
表示是否同步完完整的消息头或者消息(消息头 消息体),只有lastWriteOver = true
时表示同步完了,才会继续读取新的消息或者没有消息则等待100ms。这里我们可以看到同步了消息体大小为0的消息头,且没有消息体信息,作用就是为了维持master与slave心跳长连接,当消息体大小为0时,slave不会写入任何数据,避免长连接由于空闲被关闭。
传输完一条消息头或者消息数据,就可以根据
nextTransferFromWhere
获取最大32K的消息内容,并设置消息头和消息体调用transferData同步给Slaver。如果没有消息,则等待100ms后再同步。// 传输完完整消息头或者消息(消息头 消息体) if (this.lastWriteOver) { // 当前距离最近一次Slave请求同步Master的时间间隔 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; // interval > 发送心跳的时间间隔默认5s if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() .getHaSendHeartbeatInterval()) { // Build Header // 发送消息物理位置为nextTransferFromWhere 消息体大小size = 0的消息头信息,用于维持Slave长连接 this.byteBufferHeader.position(0); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(this.nextTransferFromWhere); this.byteBufferHeader.putInt(0); this.byteBufferHeader.flip(); // 传输数据 this.lastWriteOver = this.transferData(); if (!this.lastWriteOver) continue; } // Master } else { // 没传输完完整的,则继续传输 this.lastWriteOver = this.transferData(); if (!this.lastWriteOver) continue; } // 获取nextTransferFromWhere对应的MappedFile的记录的消息内容 SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); if (selectResult != null) { // 默认每次同步数据最大 1024 * 32 即32K int size = selectResult.getSize(); if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); } long thisOffset = this.nextTransferFromWhere; // 更新下次同步的偏移位点 this.nextTransferFromWhere = size; // 设置消息体 selectResult.getByteBuffer().limit(size); this.selectMappedBufferResult = selectResult; // 设置消息头 // Build Header this.byteBufferHeader.position(0); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(thisOffset); this.byteBufferHeader.putInt(size); this.byteBufferHeader.flip(); // 立即同步消息 this.lastWriteOver = this.transferData(); } else { // 没有消息则等待100ms HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); }
tansferData方法主要逻辑就是首先同步消息头,保证同步完消息头之后,才能同步消息内容且保证消息内容同步完全。有个注意点,就是将selectMappedBufferResult进行release释放。因为从commitLog中获取selectMappedBufferResult时,标记为使用状态。如果用完,需要主动释放。假设此次没有写完时,在下个循环会继续执行。
private boolean transferData() throws Exception { // 写入大小为0的次数 int writeSizeZeroTimes = 0; // Write Header Slave // byteBufferHeader存在字节信息 while (this.byteBufferHeader.hasRemaining()) { // 传输消息头 int writeSize = this.socketChannel.write(this.byteBufferHeader); // writeSize > 0 表示传输数据成功 if (writeSize > 0) { writeSizeZeroTimes = 0; // 更新Salve最近请求Master的时间戳 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); // writeSize == 0 出现3次退出本次请求 } else if (writeSize == 0) { if ( writeSizeZeroTimes >= 3) { break; } } else { throw new Exception("ha master write header error < 0"); } } // selectMappedBufferResult=null表示没有消息要同步,只需要同步消息头 // 如果byteBufferHeader有剩余则直接跳过后续进入下次循环进来直到传输完所有消息头数据 // 如果byteBufferHeader没有剩余字节则进行后续同步新的消息 if (null == this.selectMappedBufferResult) { return !this.byteBufferHeader.hasRemaining(); } writeSizeZeroTimes = 0; // Write Body Master // 如果byteBufferHeader中的数据传输完,且selectMappedBufferResult有消息要同步 if (!this.byteBufferHeader.hasRemaining()) { // 传输完所有的消息体信息 while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { // 向Slave写入同步的消息 int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer()); // 同上 if (writeSize > 0) { writeSizeZeroTimes = 0; this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); } else if (writeSize == 0) { if ( writeSizeZeroTimes >= 3) { break; } } else { throw new Exception("ha master write body error < 0"); } } } // result = true表示消息头和消息体都已经同步完,则进行后续同步新的消息 // 如果任一个一个地个没同步完,返回false,则会跳过后续,直到同步完为止 boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining(); // 发送完CommitLog同步的消息,释放对应mappedFile if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { this.selectMappedBufferResult.release(); this.selectMappedBufferResult = null; } return result; }
从上面可以知道master规定了每次同步消息内容最大32K,假设slave是全新的服务器或者长时间没有同步,导致slave与master服务的最大偏移量差会很大。那么每次推送的数据中肯定都不是非常完整的消息内容了。所以只要偏差小于32k,基本上就能推送完整的消息了。
Master -> Slave
HAClient:Slave接收Master的同步消息数据,并将这些数据保存在CommitLog中。并且适时地给Master上报本地同步的最大偏移位点。
HAClient服务在HAService中创建并启动,HAClient会执行run方法,接收Master同步消息并上报本地offset,源码解析如下:
@Override
public void run() {
log.info(this.getServiceName() " service started");
while (!this.isStopped()) {
try {
// 1、尝试与Master建立连接
if (this.connectMaster()) {
// 2、是否到时间上报请求位点,默认每隔5s
if (this.isTimeToReportOffset()) {
// Slave上报当前CommitLog最大的偏移位点
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
// 上报3次后仍然没有上报成功(发生异常或者没有发送完所有的子节信息)
if (!result) {
// 关闭当前master连接
this.closeMaster();
}
}
this.selector.select(1000);
// 4、读取Master传输的数据
boolean ok = this.processReadEvent();
if (!ok) {
// 关闭Master连接
this.closeMaster();
}
// 尝试上报当前Slave当前CommitLog最大的偏移位点
// 失败直接下次循环,因为里面已经关闭了master连接
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
// 超过同步连接存活时间,关闭master连接
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" this.masterAddress
"] expired, " interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
// 与master连接失败等待5s
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
// 发生异常等待5s
log.warn(this.getServiceName() " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() " service end");
}
大致分为以下几步:
- 首先尝试建立与Master的Socket连接,如果过建立成功,注册一个读
OP_READ
的SelectionKey,然后设置当前Slave上报的请求位点currentReportedOffset
为Slave的当前CommitLog最大的偏移位点与最近同步消息的时间lastWriteTimestamp
。
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
// 建立master socket连接
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 设置当前Slave上报的请求位点为Slave的当前CommitLog最大的偏移位点
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 最近同步消息的时间
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
这里的masterAddress
是通过以下两种方式进行更新的:
- 在BrokerController初始化方法initialize中,如果当前Broker是Slave,则会从配置信息中获取配置的Master Broker地址设置HAClient的
masterAddress
:
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
- 当启动Broker进行Broker注册、注册Broker信息的定时任务、Broker角色变更(变成Slave或者Master)、更新Broker配置、自动创建了新的Topic时,都会执行registerBrokerAll方法注册Broker信息,在registerBrokerAll方法出现Broker信息不一致,或者需要强制注册Broker 信息时都会尝试去更新HAClient的master address
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
- 判断是否到达上报最大请求位点的时间,默认是每隔5s上报,达到时间则将当前最大的上报位放进
reposrtOffset
缓存中,并将其写入连接Master的socketChannel
,更新上报时间lastWriteTimestamp
。如果上报失败,则关闭master连接,重置记录的参数
如果
currentReportedOffset=0
则表示Slave没有上报过请求位点
private boolean isTimeToReportOffset() {
// 判断距离上次同步消息的时间是否大于同步心跳时间
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
// 8byte
this.reportOffset.putLong(maxOffset);
// 重设position、limit便于写入信息于Socket通道
this.reportOffset.position(0);
this.reportOffset.limit(8);
// 尝试将同步请求的位点信息传输给master,最多重试2次
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i ) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
"reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
// 更新最近的上报时间
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
private void closeMaster() {
if (null != this.socketChannel) {
try {
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
this.socketChannel.close();
this.socketChannel = null;
} catch (IOException e) {
log.warn("closeMaster exception. ", e);
}
this.lastWriteTimestamp = 0;
this.dispatchPosition = 0;
this.byteBufferBackup.position(0);
this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
this.byteBufferRead.position(0);
this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
}
}
-
执行processReadEvent方法,接收Master传输的消息数据,并将消息写入CommitLog中。
processReadEvent方法中,根据
byteBufferRead
是否还有可写入空间判断循环读入Master传输过来的数据,如果读入的数据大小大于0表示有数据读入,则执行dispatchReadRequest进行解析。如果连续读入3次0表明读取到了末尾,退出循环。进到dispatchReadRequest方法,通过记录
dispatchPosition
分配位点,计算byteBufferRead
与dispatchPosition
的差值diff
,当diff
大于等于msgHeaderSize(8位CommitLog物理偏移量 4位消息大小)
时,表示读取到了一个消息的消息头,通过继续这个消息的消息头,我们可以知道这个消息在Master CommitLog的物理偏移量和消息的大小,比较slave当前最大的CommitLog偏移量和消息的偏移量,不相等则表明出错,停止读取信息,关闭master连接,这样的目的是保证slave与master数据同步一致性,保证不会丢失数据。如果
diff
还大于消息头大小 消息体大小,表示读取到整条消息数据,解析消息数据,保存到CommitLog中。并且唤醒reputMessageService服务,生成新添加的消息对应的索引。完成消息存储后,更新
dispatchPosition
为下一个消息的开始位点。并且执行reportSlaveMaxOffsetPlus方法,判断当前Slave CommitLog最大的物理偏移量如果大于当前上报请求的偏移量(这里表示成功存储一条数据)则更新当前请求上报的偏移量并上报给Master,如果上报失败则关闭master连接。执行完processReadEvent方法后,程序也会立即进行上报保持心跳连接,如果距离上一次上报时间相隔
haHousekeepingInterval(默认20s)
时间,则关闭master连接
private boolean processReadEvent() {
// 连续读取到0的次数
int readSizeZeroTimes = 0;
// byteBufferRead是否还有写入数据的空间
while (this.byteBufferRead.hasRemaining()) {
try {
// 从Master读取接收字节信息到byteBufferRead中
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
// 分派读取CommitLog信息
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if ( readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
private boolean dispatchReadRequest() {
// 消息header头的大小
final int msgHeaderSize = 8 4; // phyoffset size
while (true) {
// 当前读入的消息位点与已分派的位点差
int diff = this.byteBufferRead.position() - this.dispatchPosition;
// diff大于等于msgHeaderSize 表示读取到了消息头信息
// 小于msgHeaderSize表示读取完byteBufferRead缓存中的所有消息头
if (diff >= msgHeaderSize) {
// 获取Master消息在CommitLog的物理偏移位点
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
// 获取消息大小
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition 8);
// 获取Slave当前CommitLog存储的最大物理偏移位点
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 如果slave最大偏移量和master的推送的消息的偏移量不一致,返回false
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
slavePhyOffset " MASTER: " masterPhyOffset);
return false;
}
}
// 如果diff大于等于msgHeaderSize bodySize 表示读取到了整个消息内容
if (diff >= (msgHeaderSize bodySize)) {
byte[] bodyData = byteBufferRead.array();
// 消息内容开始的数组位点
int dataStart = this.dispatchPosition msgHeaderSize;
// 添加进Slave CommitLog
HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);
// 更新下次分派的位点
this.dispatchPosition = msgHeaderSize bodySize;
// 如果存储完消息后最大的CommitLog偏移量已经大于slave上报的偏移量,立即上报offset
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
// 继续读取下一条同步的消息
continue;
}
}
// 如果byteBufferRead中都是完整的消息且都已读取完
if (!this.byteBufferRead.hasRemaining()) {
// 交换byteBufferBackup和byteBufferRead,这样byteBufferRead可以继续下次读
this.reallocateByteBuffer();
}
break;
}
return true;
}
public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
if (this.shutdown) {
log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
return false;
}
// 将data添加到fileChannel中
boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength);
if (result) {
// 唤醒reputMessageService服务,生成新添加的消息对应的索引
this.reputMessageService.wakeup();
} else {
log.error("appendToPhyQueue failed " startOffset " " data.length);
}
return result;
}
public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
putMessageLock.lock();
try {
// 根据startOffset获取最新的MappedFile或者创建一个新的MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
if (null == mappedFile) {
log.error("appendData getLastMappedFile error " startOffset);
return false;
}
// 写入消息
return mappedFile.appendMessage(data, dataStart, dataLength);
} finally {
putMessageLock.unlock();
}
}
private boolean reportSlaveMaxOffsetPlus() {
boolean result = true;
// 当前Slave CommitLog最大物理偏移量
long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 大于当前Slave上报的偏移量,更新Slave上报偏移量值,并再次上报
if (currentPhyOffset > this.currentReportedOffset) {
this.currentReportedOffset = currentPhyOffset;
// 立即再次上报
result = this.reportSlaveMaxOffset(this.currentReportedOffset);
// 上报失败则关闭master连接
if (!result) {
this.closeMaster();
log.error("HAClient, reportSlaveMaxOffset error, " this.currentReportedOffset);
}
}
return result;
}
private void reallocateByteBuffer() {
int remain = READ_MAX_BUFFER_SIZE - this.dispatchPosition;
// 如果byteBufferRead存在剩余没使用的空间
if (remain > 0) {
this.byteBufferRead.position(this.dispatchPosition);
this.byteBufferBackup.position(0);
this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
// byteBufferBackup写入 byteBufferRead dispatchPosition 到 READ_MAX_BUFFER_SIZE的数据
// byteBufferRead 的position会增加remain
this.byteBufferBackup.put(this.byteBufferRead);
}
// byteBufferRead 与 byteBufferBackup交换
this.swapByteBuffer();
// 交换后。byteBufferRead的position应该是remain
this.byteBufferRead.position(remain);
this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
// 重置分派的位点为0
this.dispatchPosition = 0;
}
byteBufferRead最为存储通信数据的载体,长度为4M。在目前数据获取时,byteBufferRead是一直在增加,即pos在增加,即有读取到新得数据,byteBufferRead的pos一直变大,直到limit=pos时无法添加内容。为保证数据持续获取,肯定需要将byteBufferRead重置,然后再读取。所以才会有reallocateByteBuffer方法。
当byteBufferRead数据存满,但是仍然会出现最后一个数据包是不完整的。肯定需要将最后一段数据进行保存起来,下次继续使用。在这里,他采用了byteBufferBackup一个备份的字节缓存。remain是指一个数据包的部分数据长度。当remain大于0,说明包不完整,就会将byteBufferRead剩余部分复制给重置后的byteBufferBackup,此时byteBufferBackup是存在数据,并且当前pos为remain。swapByteBuffer方法就是将byteBufferRead和byteBufferBackup执行对象互相交换,即现在的byteBufferRead就是原来的byteBufferBackup对象。然后又重置了byteBufferRead的limit值,并且dispatchPosition的位置也变成0了。下次byteBufferRead再从socketChannel中读取的位置就是从remain开始了。
同步复制
同步复制和异步复制的区别只在于存储消息后,同步消息到Slave时,根据BrokerRole
判断进行同步还是异步复制。
如果是BrokerRole.SYNC_MASTER则表示同步复制,创建GroupCommitRequest请求对象,放到GroupTransferService服务中requestsWrite
列表中,等待获取PutMessageSpinLock锁将请求转换到requestsRead
列表中。GroupTransferService服务会根据requestsRead
中的请求在同步超时时间内(默认5s)中循环判断当前master记录的多个Slave中最大的同步偏移量push2SlaveMaxOffset(该值通过master的每个Slave连接的
ReadSocketService服务上报每个Slave同步最大位点,比较并只设置当前同步最大的)
是否大于请求对象的同步位置nextOffset
,如果大于则直接返回同步成功,返回future结果PutMessageStatus.PUT_OK
结束future的同步阻塞;如果超时则返回PutMessageStatus.FLUSH_SLAVE_TIMEOUT
结束future同步阻塞,超时返回不会阻断消息存储后续流程,只会打印报错返回同步结果。并且会通过调用WaitNotifyObject().wakeupAll()方法主动尝试唤醒可能在等待的Master的多个Slave连接的WriteSocketService服务线程,让各个Slave连接的WriteSocketService复制消息服务复制消息给各个Slave。
而如果是异步复制,则submitReplicaRequest方法则不会管复制结果,默认复制成功,直接返回PutMessageStatus.PUT_OK
,至于何时复制给Slave,决定于各个Slave连接的WriteSocketService服务线程是否处于等待过程中(最大100ms)还是在运行中。
部分相关源码处于存储消息的方法中:org.apache.rocketmq.store.CommitLog#asyncPutMessage方法中:
// 复制操作
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
}
}
return putMessageResult;
});
通过调用submitReplicaRequest方法实现上述流程逻辑:
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
// brokerRole = BrokerRole.SYNC_MASTER 表示同步复制
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
// 是否等待消息存储完成,默认true
if (messageExt.isWaitStoreMsgOK()) {
// wroteOffset 表示消息开始写入的位置,wroteBytes 表示消息字节长度
// 判断Slave是否可连接
if (service.isSlaveOK(result.getWroteBytes() result.getWroteOffset())) {
// syncFlushTimeout 同步刷盘超时时间,默认5s
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
// 激活GroupTransferService线程,用于异步指定时间内循环判断已经同步的所有Slave中最大的偏移量是否大于request中nextOffset(CommitLog最大偏移量)
service.putRequest(request);
// 一个Master Broker可以配置多个Slave,当需要同步数据时,通过wakeupAll来唤醒全部的Slave复制线程
service.getWaitNotifyObject().wakeupAll();
return request.future();
} else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
GroupTransferService服务负责在规定时间内监听push2SlaveMaxOffset
是否大于req.getNextOffset()
,返回future结果停止future的阻塞等待。
class GroupTransferService extends ServiceThread {
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
private final PutMessageSpinLock lock = new PutMessageSpinLock();
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsWrite = new LinkedList<>();
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
public void putRequest(final CommitLog.GroupCommitRequest request) {
lock.lock();
try {
this.requestsWrite.add(request);
} finally {
lock.unlock();
}
// 激活当前GroupTransferService线程
this.wakeup();
}
public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}
private void swapRequests() {
lock.lock();
try {
LinkedList<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
} finally {
lock.unlock();
}
}
private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
// 遍历requestsRead
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
// push2SlaveMaxOffset表示所有Slave同步的最大偏移量 >= Master CommitLog下次写入的开始偏移量表示同步成功
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
// 等待同步刷盘超时的时间戳
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
// CAS尝试获取锁
this.notifyTransferObject.waitForRunning(1000);
// 是否已经同步成功
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " req.getNextOffset());
}
// future任务完成并设置完成状态
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead = new LinkedList<>();
}
}
public void run() {
log.info(this.getServiceName() " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() " service has exception. ", e);
}
}
log.info(this.getServiceName() " service end");
}
@Override
protected void onWaitEnd() {
this.swapRequests();
}
@Override
public String getServiceName() {
return GroupTransferService.class.getSimpleName();
}
}
从上面可以知道:
- 同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
- 异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量。
当slave复制超时,如果刷盘成功,则不会丢失消息,否则在还没完成刷盘前master宕机,就会有消息丢失。
而且RocketMQ主从同步复制中,只确保至少一个Slave同步了消息(只要Master的多个Slave中最大的同步偏移量大于给定的偏移量)
参考文档
-
RocketMQ分布式消息中间件核心原理与最佳实践 —— 李伟 著
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhiaecee
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22