• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Redis核心技术和-学习笔记十五消息队列Redis的解决方案

武飞扬头像
孔汤姆
帮助1

一.消息队列

消息队列:分布式系统必备的一个基础软件,能支持组件通信消息快速读写

Redis本身支持数据的快速访问,满足消息队列的读写性能需求

二.Redis适合做消息队列吗?

消息队列的消息存取需求

消息队列存取消息的过程

  • 在分布式系统中,两个组件要基于消息队列进行通信,一个组件就会把要处理的数据以消息的形式传递给消息队列,然后这个组件就可以继续执行其他操作;
  • 远端的另一个组件从消息队列中把消息读取出来,在本地进行处理。

需求:

  • 组件1需要对采集到的数据进行求和计算,并写入数据库
  • 消息到达速度很快,组件1没有办法及时既做采集又做计算,并写入数据库

解决方案:

消息队列:

  • 组件1把数据x和y保存为JSON格式的消息,再把它发送到消息队列,这样就可以继续接受新的数据。
  • 组件2从消息队列中 把数据读取出来在服务器2上进行求和计算上,再写入数据库。

学新通

通用的消息队列的架构模型:

      学新通

消息队列存取消息时候,必须要满足的三个需求:

  •     消息顺序性 
  •     消息幂等性
  •     保证消息的可靠性

消息的顺序性

     消息顺序被消费者异步处理,但是消费者仍然按照生产者发送消息的顺序来处理消息,避免后被发送的消息先被处理了。

     需求:对于消息顺序性的场景来看,一旦出现消息乱序处理时,会导致业务逻辑被错误执行,给业务方造成损失。

重复消息处理

     消费者从 消息队列读取消息时,有时候会因为网络堵塞出现消息重传的情况。此时,消费者可能会收到多条重复消息。对于重复消息,消费者如果多次处理的话,可能造成一个业务逻辑被多次执行,如果业务逻辑正好要修改数据,就会出现数据被多次修改的问题。

消息可靠性

      消费者在处理消息的时候,可能出现因为故障 或者宕机导致消息没有处理完就丢失的情况。当消费者重启时候,可以重新读取消息再次进行处理,否则就会 出现消息漏处理的问题。

Redis如何实现消息队列的需求

     基于List消息队列解决方案

     List本身就是按照先进先出的顺序对数据进行存取,所以如果使用List作为消息队列保存 消息的话,就可以满足消息的顺序性

    生产者使用LPUSH命令要把发送的消息依次写入list,消费者通过RPOP命令从LIST的另一端按照消息的写入顺序,依次读取消息并处理。   

   存在问题:

     生产者往list写入数据时,List并不会主动通知消费者有新消息写入,如果消费者想要及时处理消息,就需要程序不断调用RPOP命令(比如使用一个while(1)循环),如果新消息写入,RPOP就会返回结果,否则,RPOP命令返回空值,再继续循环

     危害:

        没有新消息写入LIST消费者也要不停的调用RPOP命令,这就会导致消费者程序cpu一直消耗在执行RPOP命令上,带来不必要的性能损失

    解决:

         Redis提供了BRPOP命令。BRPOP命令,也称为阻塞式读取客户端在没有读取到队列数据时,自动阻塞,知道有新的数据写入队列,再开始读取新数据,和消费者程序在自己不停调用RPOP命令相比,这种方式能节省CPU开销。

        

重复消息的处理:消息的幂等性

       消费者程序本身可以对重复消息进行判断

      消息队列要能给每个消息提供全局唯一的ID号;另一方面,消费者程序要把已经处理过的消息ID记录下来。当收到一条消息后,消费者程序可以对比收到的消息ID和记录处理过的消息ID。来判断当前收到的消息有么有经过处理。

     如果已经处理 过了就不再处理了。这种处理特性被称为消息 幂等性。

     幂等性:对于同一消息,消费者收到生成一次的处理结果和收到多次的处理结果是一致的。

不过List本身不会为每个消息生成ID号的,所以,消息的全局唯一ID号就需要生产者程序发送消息前自行生成,生成之后,我们在用LPUSH命令把消息插入List中,需要在消息中包含这个全局唯一ID。

消息可靠性:

      List 类型是如何保证消息可靠性--- 备份

     背景:  消费者List中读取一条消息后,List就不会存留这条消息,所以如果消费者程序在处理消息的过程中出现了故障或者宕机,就会导致消息没有处理完成,那么消费者程序再次启动就会导致消息丢失。

    解决方案:为了存留消息,list提供了BRPOPLUSH命令,这个命令的作用就是让消费者从一个List中读取消息,同时Redis会把这个消息再插入到另一个List(可以叫作备份 List)留存。

      如果消费者程序读取了消息但是没能正常处理,等它重启以后就可以从备份List中重新读取消息并进行处理。

学新通

      生产者消息发送很快,而消费者处理消息的速度缓慢,这就导致List中消息堆积的很多,给Redis内存带来压力

     启动多个消费者程序组成消费组,一起分担处理 List中消息的消息。但是List类型并不支持消费组的实现。

基于Stream消息队列解决方案

streams是Redis专门为消息队列设计 的数据类型:

  • XADD插入消息,保证有序,可以自动生成全局唯一ID;
  • XREAD用于读取消息,可以按ID读取数据;
  • XREADGROUP按消费组的形式读取消息;
  • XPENDING和XACK: XPENDING查询每个消费组内所有消费者已读取但是尚未确认消息,ASCK命令用于向消息队列确认消息处理已经完成。

XADD命令

可以往消息队列中插入新消息,消息的格式 是键-值对形式。对于插入的每一条消息,Streams可以自动为其生成一个全局唯一ID。

  1.  
    XADD mqstream * repo 5
  2.  
    "1599203861727-0"

可以往名称为mqstream的消息队列插入一条消息,消息的键为 repo, 值为5;

消息队列中的* ,表示让Redis为插入数据自动生成一个全局唯一的ID,例如"1599203861727-0"

也可以自行设定一个ID号,保证这个ID号是全局唯一的就行。不过使用*号会更加方便高效。

消息的全局唯一ID由两部分组成

  •    第一部分"1599203861727"是指当前时间戳 毫秒级
  •    第二部分表示插入消息在当前毫秒内的消息序列,这是从0开始编号的,
  •   “1599203861727-0”就表示在“1599203861727”毫秒内的第 1 条消息。

XREAD 命令

       使用XREAD命令从消息队列读取

        XREAD在读取消息时候,可以指定一个消息ID,并从这个消息ID的下一条消息开始进行读取。例如我们可以执行下面的命令,从ID号为 1599203861727-0 的消息开始,读取后续的所有消息:

  1.  
    XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
  2.  
    1) 1) "mqstream"
  3.  
    2) 1) 1) "1599274912765-0"
  4.  
    2) 1) "repo"
  5.  
    2) "3"
  6.  
    2) 1) "1599274925823-0"
  7.  
    2) 1) "repo"
  8.  
    2) "2"
  9.  
    3) 1) "1599274927910-0"
  10.  
    2) 1) "repo"
  11.  
    2) "1"

消息者也可以在调用XREAD时设定block配置项,实现类似于BRPOP的阻塞读取操作。

当消息队列中没有消息时,一旦设置了block配置项,XREAD就会阻塞;

阻塞的时长可以在block配置项进行设置。

  1.  
    XREAD block 10000 streams mqstream $
  2.  
    (nil)
  3.  
    (10.00s)

       ,命令最后的$符号表示读取最新消息,同时设置block 10000配置项,1000的单位是毫秒,表示XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。上面命令中XREAD执行后,消息队列命令中mqstream 中一直没有消息XREAD 在 10 秒后返回空值(nil)。

     

消费组

      Stream本身可以使用XGROUP创建消费组,创建消费组后,Stream可以使用XREADGROUP命令让消费组内的消费者读取消息

      

  1.  
    XGROUP create mqstream group1 0
  2.  
    ok

   我们再执行一段命令,让GROUP1消费组中的消费者consumer1 从 mqstream 中读取所有消息

  1.  
    XREADGROUP group group1 cinsumer1 streams mqstream >
  2.  
    1) 1) "mqstream"
  3.  
    2) 1) 1) "1599203861727-0"
  4.  
    2) 1) "repo"
  5.  
    2) "5"
  6.  
    2) 1) "1599274912765-0"
  7.  
    2) 1) "repo"
  8.  
    2) "3"
  9.  
    3) 1) "1599274925823-0"
  10.  
    2) 1) "repo"
  11.  
    2) "2"
  12.  
    4) 1) "1599274927910-0"
  13.  
    2) 1) "repo"
  14.  
    2) "1"

让group1消费组里的消费者consumer1从mqstream中读取所有消息,

命令">"表示从第一天尚未被消费的消息开始读取。

因为在consumer1读取消息前,group1并没有其他消费者读取过消息,所以consumer1就得到了mqstream消息队列中的所有消息。

消息队列中的消息一旦被消费组里的一个消息读取了,就不能再被该消费组内的其他消费者读取。

我们继续执行下面命令

  1.  
     
  2.  
    XREADGROUP group group1 consumer2 streams mqstream 0
  3.  
    1) 1) "mqstream"
  4.  
    2) (empty list or set)

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行下面的命令,让 group1 内的 consumer2 读取消息时,consumer2 读到的就是空值,因为消息已经被 consumer1 读取完了 

消费组的目的 

    让组内多个消费者共同分担读取消息,通常会让每个消费者读取部分消息,从而实现让组内的多个消费者共同分担读取消息,实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

  1.  
    XREADGROUP group group2 consumer1 count 1 streams mqstream >
  2.  
    1) 1) "mqstream"
  3.  
    2) 1) 1) "1599203861727-0"
  4.  
    2) 1) "repo"
  5.  
    2) "5"
  6.  
     
  7.  
    XREADGROUP group group2 consumer2 count 1 streams mqstream >
  8.  
    1) 1) "mqstream"
  9.  
    2) 1) 1) "1599274912765-0"
  10.  
    2) 1) "repo"
  11.  
    2) "3"
  12.  
     
  13.  
    XREADGROUP group group2 consumer3 count 1 streams mqstream >
  14.  
    1) 1) "mqstream"
  15.  
    2) 1) 1) "1599274925823-0"
  16.  
    2) 1) "repo"
  17.  
    2) "2"
学新通

保证消费者在发生故障或者宕机再次重启时,让可以读取未处理完的消息,stream会自动使用内部队列(PENDING List)留存消费组里 每个消费者读取的消息;

直到消费者使用XACK命令通知Streams消息已经被处理完成。

如果消费者没有成功处理消息,他就不会给Stream发送XACK命令,消息仍然会留存。

此时消费者可以在重启后,用XPENDING 命令查看已读取、但尚未确认处理完成的消息。

  1.  
    XPEBDING mqstream group2
  2.  
    1) (integer) 3
  3.  
    2) "1599203861727-0"
  4.  
    3) "1599274925823-0"
  5.  
    4) 1) 1) "consumer1"
  6.  
    2) "1"
  7.  
    2) 1) "consumer2"
  8.  
    2) "1"
  9.  
    3) 1) "consumer3"
  10.  
    2) "1"

查看group2中各个消费者已读取,但是尚未确认的消息个数。其中,XPENDING返回结果的第二行第三行分别表示group2中所有消费者读取的消息最小ID和最大ID。

  1.  
    XACK mqstream group2 1599274912765-0
  2.  
    (integer) 1
  3.  
    XPENDING mqstream group2 - 10 consumer2
  4.  
    (empty list or set)

consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

  基于List 基于Streams
消息顺序性 LPUSH/RPOP XADD/XREAD
阻塞读取 BRPOP XREAD block
重复消息处理 生产者自行实现全局唯一ID Streams自动生成全局唯一ID
消息可靠性 BRPOPLPUSH 使用PENDING List自动存留消息,使用XPENDING查看,使XACK确认
适用场景

Redis 5.0前版本

部署环境消息总量小

Redis 5.0以后版本

部署环境消息总量大,需要以消费组的形式读取数据

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhgibi
系列文章
更多 icon
同类精品
更多 icon
继续加载