• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

微服务实用篇4-消息队列MQ

武飞扬头像
nuist__NJUPT
帮助1

今天主要来学习异步通讯技术MQ,主要包括初识MQ,RabbitMQ快速入门,SpringAMQP三大部分,下面就来一起学习吧。路漫漫其修远兮,吾将上下而求索,继续加油吧,少年。

目录

一、初识MQ

1.1、同步通讯和异步通讯的优缺点

1.2、MQ常见技术介绍

 1.3、RabbitMQ介绍与安装

1.4、RabbitMQ消息队列模型

二、SpringAMQP

2.1、基本介绍

2.2、SpringAMQP入门案例之消息发送

2.3、SpringAMQP入门案例之消息接收

2.4、SpringAMQP之工作队列Work Queue

2.5、SpringAMQP之发布-订阅模型广播交换机

2.6、SpringAMQP之发布-订阅模型路由交换机

2.7、SpringAMQP之发布-订阅模型主题交换机

2.8、SpringAMQP之消息转换器


一、初识MQ

1.1、同步通讯和异步通讯的优缺点

我们先看一下同步调用的优缺点,同步调用是实时响应,可以立即得到结果。但是同步调用一般耦合度较高,性能偏低,还存在级联失败等问题。

学新通

下面我们看一下微服务之间的异步调用,异步调用主要是解决了同步调用存在的一些问题,异步调用的优点:发布订阅的模式,耦合度低,不不要等待,吞吐量高,故障隔离,不会出现级联失败的问题,流量销峰,broker缓存,微服务根据自己的能力从broker 中获取。

学新通

1.2、MQ常见技术介绍

MQ即Message Queue,消息队列,就是存放消息的队列,也就是事件驱动架构中的Broker,常用的四种消息队列:RabbitMQ,ActiveMQ,RocketMQ,Kafka。对于稳定性要求较高的情况下,一般使用RabbitMQ或RocketMQ,对于数据量比较大,性能要求比较高的一般用Kafka。

学新通

 1.3、RabbitMQ介绍与安装

我们先看一下RabbitMQ的架构,首先发布者发布消息到交换即,交换即通过哦队列进行缓存消息,最后消费者通过订阅从队列中取消息。

学新通

我们先了解一下RabbitMQ中的一些概念,channel用于操作MQ,exchange是交换机,用来路由消息到队列中,队列queue用于缓存消息,virtual host是虚拟主机,用于对队列和交换即等进行逻辑分组。

学新通下面我们进行RabbitMQ的安装过程如下,我们在centos7虚拟机中使用docker进行安装。

有两种方式下载MQ,第一种是在线拉取镜像包,如下:

docker pull rabbitmq:3-management

第二种,下载方式,是本地已经有了镜像包,通过Xftp进行上传到虚拟机,然后使用dokcer命令加载镜像即可。

docker load -i mq.tar

镜像加载完成后,可以使用docker images进行查看,具体如下,可以发现MQ镜像导入成功:

学新通

加载好MQ的镜像后,就需要使用指令进行安装,指令如下:

该指令设置了MQ的用户名和密码变量,设置名称和主机名,设置两个端口,一个MQ管理平台的端口,另外一个是作消息通信的一个端口。

  1.  
    docker run \
  2.  
    -e RABBITMQ_DEFAULT_USER=root \
  3.  
    -e RABBITMQ_DEFAULT_PASS=123456 \
  4.  
    --name mq \
  5.  
    --hostname mq1 \
  6.  
    -p 15672:15672 \
  7.  
    -p 5672:5672 \
  8.  
    -d \
  9.  
    rabbitmq:3-management

安装完成后,通过设置的用户名和密码进行MQ的管理页面,如下:

学新通

1.4、RabbitMQ消息队列模型

RabbitMQ常见的5种消息队列模型如下,主要分为两大类,第一类是基本消息队列和工作消息队列,不包含交换机,第二种是发布订阅模式的,根据交换机的不同,分为三种类型:广播、路由和主题。

学新通

下面先来看一下简单的消息队列模型,只包括三个部分,即发布者、消息队列、订阅者。

学新通

下面总结一下基本消息队列的发送和接收流程,基本消息的发送首先需要建立连接,然后建立channel通道,利用通道进行声明队列和消息发送,将消息发送到队列中;基本消息队列的接收流程为:建立连接,创建通道,利用通道声明队列,先定义消费者的消费行为,然后利用通道channel将消费者与队列绑定,消费者就可以消费队列中的消息了。

学新通

二、SpringAMQP

2.1、基本介绍

我们先看一下什么是AMQP,AMQP是高级消息队列协议,是用于应用程序或传递业务消息的开放标准,SpringAMQP是Spring基于AMQP协议的一套API规范。

学新通

SpringAMQP的官方地址如下:https://spring.io/projects/spring-amqp

我们进入官方业面查看,可以发现,该项目主要包含两个部分,一个是基础抽象,另一个是RabbitAQ的基础实现。

可以发现amqp主要包含三个特征,第一个是用于异步处理消息的侦听器容器, 用于接收和发送给消息的RabbitTemplate,用于自动声明队列以及交换和绑定的RabbitAdmin。

学新通

2.2、SpringAMQP入门案例之消息发送

先看一下发送消息的基本案例,具体如如下,首先引入amqp依赖,然后利用RabbitTemplate对象发送消息到队列,在消费者中绑定该队列。

学新通首先需要引入amqp的依赖,具体如下:

  1.  
    <!--AMQP依赖,包含RabbitMQ-->
  2.  
    <dependency>
  3.  
    <groupId>org.springframework.boot</groupId>
  4.  
    <artifactId>spring-boot-starter-amqp</artifactId>
  5.  
    </dependency>

在发布者中创建一个类,用于创建队列和消息并把消息发送到队列中。

  1.  
    import org.junit.Test;
  2.  
    import org.junit.runner.RunWith;
  3.  
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4.  
    import org.springframework.beans.factory.annotation.Autowired;
  5.  
    import org.springframework.boot.test.context.SpringBootTest;
  6.  
    import org.springframework.test.context.junit4.SpringRunner;
  7.  
     
  8.  
    @RunWith(SpringRunner.class)
  9.  
    @SpringBootTest()
  10.  
    public class SpringTests {
  11.  
    @Autowired
  12.  
    private RabbitTemplate rabbitTemplate ;
  13.  
     
  14.  
    @Test
  15.  
    public void testSend(){
  16.  
    String queueName = "simple.queue" ;
  17.  
    String message = "Spring amqp" ;
  18.  
    rabbitTemplate.convertAndSend(queueName,message);
  19.  
    }
  20.  
     
  21.  
    }

当然需要配置MQ的ip地址和端口号,以及登录名和密码,之前设置的。

  1.  
    logging:
  2.  
    pattern:
  3.  
    dateformat: MM-dd HH:mm:ss:SSS
  4.  
    spring:
  5.  
    rabbitmq:
  6.  
    host: 192.168.102.130 # rabbitMQ的ip地址
  7.  
    port: 5672 # 端口
  8.  
    username: root
  9.  
    password: 123456
  10.  
    virtual-host: /

可以在浏览器的rabbitMQ管理界面查看到发送到队列的消息,如下所示。

学新通

 总结一下,SpringAMQP发送消息的流程,具体如下:

学新通

2.3、SpringAMQP入门案例之消息接收

首引入starter依赖,指定MQ地址,然后定义类使用@Component注解成bean交给Spring管理,然后在类的方法中@RabbitListener去监听消息队列,并获取消息。

  1.  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2.  
    import org.springframework.stereotype.Component;
  3.  
     
  4.  
    import java.time.LocalTime;
  5.  
    import java.util.Map;
  6.  
     
  7.  
    @Component
  8.  
    public class SpringRabbitListener {
  9.  
     
  10.  
    @RabbitListener(queues = "simple.queue")
  11.  
    public void listenSimpleQueue(String msg) {
  12.  
    System.out.println("消费者接收到simple.queue的消息:【" msg "】");
  13.  
    }
  14.  
    }

2.4、SpringAMQP之工作队列Work Queue

工作队列模型有两个消费者,可以很好低提高消息处理的速度,避免队列消息堆积。

学新通

我们看下面的work queue的案例,实现一个队列绑定两个消费者。

学新通

 设置publisher发送消息,每20ms发送一次,1s发送50次消息,具体如下:

  1.  
    import org.junit.Test;
  2.  
    import org.junit.runner.RunWith;
  3.  
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4.  
    import org.springframework.beans.factory.annotation.Autowired;
  5.  
    import org.springframework.boot.test.context.SpringBootTest;
  6.  
    import org.springframework.test.context.junit4.SpringRunner;
  7.  
     
  8.  
    @RunWith(SpringRunner.class)
  9.  
    @SpringBootTest()
  10.  
    public class SpringTests {
  11.  
    @Autowired
  12.  
    private RabbitTemplate rabbitTemplate ;
  13.  
     
  14.  
    @Test
  15.  
    public void testWorkQueueSend() throws InterruptedException {
  16.  
    String queueName = "simple.queue" ;
  17.  
    String message = "hello message" ;
  18.  
    for(int i=0; i<50; i ){ //1s发送50次
  19.  
    rabbitTemplate.convertAndSend(queueName,message i);
  20.  
    Thread.sleep(20); //每发送一次消息,间隔20ms
  21.  
    }
  22.  
     
  23.  
    }
  24.  
     
  25.  
     
  26.  
    }

使用两个消费者来消费消息,通过设置睡眠时间模拟消费者的消费能力,如下:

  1.  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2.  
    import org.springframework.stereotype.Component;
  3.  
     
  4.  
    import java.time.LocalTime;
  5.  
    import java.util.Map;
  6.  
     
  7.  
    @Component
  8.  
    public class SpringRabbitListener {
  9.  
    /*
  10.  
    @RabbitListener(queues = "simple.queue")
  11.  
    public void listenSimpleQueue(String msg) {
  12.  
    System.out.println("消费者接收到simple.queue的消息:【" msg "】");
  13.  
    }*/
  14.  
     
  15.  
    @RabbitListener(queues = "simple.queue") //监听队列
  16.  
    public void listenWorkQueue1(String msg) throws InterruptedException {
  17.  
    System.out.println("消费者1接收到消息:【" msg "】" LocalTime.now());
  18.  
    Thread.sleep(20);
  19.  
    }
  20.  
     
  21.  
    @RabbitListener(queues = "simple.queue")
  22.  
    public void listenWorkQueue2(String msg) throws InterruptedException {
  23.  
    System.err.println("消费者2........接收到消息:【" msg "】" LocalTime.now());
  24.  
    Thread.sleep(200);
  25.  
    }
  26.  
    }

对于消费者,配置prefetch可以设置每次只能取一个进行消费,消费完成再取,防止消费能力弱的消费者一次取多个,导致性能差。

  1.  
    logging:
  2.  
    pattern:
  3.  
    dateformat: MM-dd HH:mm:ss:SSS
  4.  
    spring:
  5.  
    rabbitmq:
  6.  
    host: 192.168.102.131 # rabbitMQ的ip地址
  7.  
    port: 5672 # 端口
  8.  
    username: root
  9.  
    password: 123456
  10.  
    virtual-host: /
  11.  
    listener:
  12.  
    simple:
  13.  
    prefetch: 1

2.5、SpringAMQP之发布-订阅模型广播交换机

发布订阅模型是允许将一个消息发送给多个消费者,通过交换机实现,三种常用的交换机分别为广播、路由和话题。不过需要注意的是交换机负责消息路由,而不存储消息,如果路由失败,则消息丢失。

学新通

我们先看第一种发布订阅模式,广播的方式,交换机将收到的消息路由给每个与其绑定的队列。

学新通

我们使用SpringAMQP实现广播交换机的案例,具体如下:

学新通

学新通

即将一个消费者绑定到两个队列,如下:

  1.  
    import org.springframework.amqp.core.Binding;
  2.  
    import org.springframework.amqp.core.BindingBuilder;
  3.  
    import org.springframework.amqp.core.FanoutExchange;
  4.  
    import org.springframework.amqp.core.Queue;
  5.  
    import org.springframework.context.annotation.Bean;
  6.  
    import org.springframework.context.annotation.Configuration;
  7.  
     
  8.  
    @Configuration
  9.  
    public class FanoutConfig {
  10.  
    //声明广播类型的交换机1
  11.  
    @Bean
  12.  
    public FanoutExchange fanoutExchange(){
  13.  
    return new FanoutExchange("fanout");
  14.  
    }
  15.  
     
  16.  
    //声明队列1
  17.  
    @Bean
  18.  
    public Queue fanoutQueue1(){
  19.  
    return new Queue("fanout.queue1");
  20.  
    }
  21.  
     
  22.  
    // 绑定队列1到交换机
  23.  
    @Bean
  24.  
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
  25.  
    return BindingBuilder
  26.  
    .bind(fanoutQueue1)
  27.  
    .to(fanoutExchange);
  28.  
    }
  29.  
     
  30.  
    // 声明队列2
  31.  
    @Bean
  32.  
    public Queue fanoutQueue2(){
  33.  
    return new Queue("fanout.queue2");
  34.  
    }
  35.  
     
  36.  
    // 绑定队列2到交换机
  37.  
    @Bean
  38.  
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
  39.  
    return BindingBuilder
  40.  
    .bind(fanoutQueue2)
  41.  
    .to(fanoutExchange);
  42.  
    }
  43.  
     
  44.  
    @Bean
  45.  
    public Queue objectQueue(){
  46.  
    return new Queue("object.queue");
  47.  
    }
  48.  
    }

学新通

在消费者的监听类中监听两个队列的消息,获取可以获取消息进行消费,如下:

  1.  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2.  
    import org.springframework.stereotype.Component;
  3.  
     
  4.  
    import java.time.LocalTime;
  5.  
     
  6.  
    @Component
  7.  
    public class SpringRabbitListener {
  8.  
     
  9.  
     
  10.  
    @RabbitListener(queues = "simple.queue") //监听队列
  11.  
    public void listenWorkQueue1(String msg) throws InterruptedException {
  12.  
    System.out.println("消费者1接收到消息:【" msg "】" LocalTime.now());
  13.  
    Thread.sleep(20);
  14.  
    }
  15.  
     
  16.  
    @RabbitListener(queues = "simple.queue")
  17.  
    public void listenWorkQueue2(String msg) throws InterruptedException {
  18.  
    System.err.println("消费者2........接收到消息:【" msg "】" LocalTime.now());
  19.  
    Thread.sleep(200);
  20.  
    }
  21.  
     
  22.  
    }

学新通 最后,将消息发送到交换机,交换机会把消息路由给队列,这个交换是广播交换机,即广播给每个队列,如下:

  1.  
    import org.junit.Test;
  2.  
    import org.junit.runner.RunWith;
  3.  
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4.  
    import org.springframework.beans.factory.annotation.Autowired;
  5.  
    import org.springframework.boot.test.context.SpringBootTest;
  6.  
    import org.springframework.test.context.junit4.SpringRunner;
  7.  
     
  8.  
    @RunWith(SpringRunner.class)
  9.  
    @SpringBootTest()
  10.  
    public class SpringTests {
  11.  
    @Autowired
  12.  
    private RabbitTemplate rabbitTemplate ;
  13.  
     
  14.  
     
  15.  
    @Test
  16.  
    public void testExchange1(){
  17.  
    //交换机名称
  18.  
    String exchangeName = "fanout" ;
  19.  
    //消息
  20.  
    String message = "hello" ;
  21.  
    rabbitTemplate.convertAndSend(exchangeName,"",message);
  22.  
    }
  23.  
     
  24.  
    }

2.6、SpringAMQP之发布-订阅模型路由交换机

我们看一下路由交换机,这个的模式是根据规则将消息路由到指定的队列,即通过对比key的方式进行路由消息到相应的队列。

学新通

学新通

需要先监听两个队列,然后 绑定队列到交换机,为每个队列设置相应的key。

  1.  
    @RabbitListener(queues = "fanout.queue1")
  2.  
    public void listenFanoutQueue1(String msg) {
  3.  
    System.out.println("消费者接收到fanout.queue1的消息:【" msg "】");
  4.  
    }
  5.  
    @RabbitListener(queues = "fanout.queue2")
  6.  
    public void listenFanoutQueue2(String msg) {
  7.  
    System.out.println("消费者接收到fanout.queue2的消息:【" msg "】");
  8.  
    }
  9.  
     
  10.  
    @RabbitListener(bindings = @QueueBinding(
  11.  
    value = @Queue(name = "direct.queue1"),
  12.  
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
  13.  
    key = {"red", "blue"}
  14.  
    ))
  15.  
    public void listenDirectQueue1(String msg){
  16.  
    System.out.println("消费者接收到direct.queue1的消息:【" msg "】");
  17.  
    }
  18.  
     
  19.  
    @RabbitListener(bindings = @QueueBinding(
  20.  
    value = @Queue(name = "direct.queue2"),
  21.  
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
  22.  
    key = {"red", "yellow"}
  23.  
    ))
  24.  
    public void listenDirectQueue2(String msg){
  25.  
    System.out.println("消费者接收到direct.queue2的消息:【" msg "】");
  26.  
    }

然后发送响应的消息到交换机就可以,需要指定key,通过对比key路由到指定的消息队列。

  1.  
     
  2.  
    @Test
  3.  
    public void testExchange2(){
  4.  
    //交换机名称
  5.  
    String exchangeName = "itcast.direct" ;
  6.  
    //消息
  7.  
    String message = "hello, blue" ;
  8.  
    rabbitTemplate.convertAndSend(exchangeName,"blue",message);
  9.  
    }

2.7、SpringAMQP之发布-订阅模型主题交换机

下面看一下主题交换机和路由交换机的区别,主要就是绑定的方式不一样,主题交换机可以通过通配符的形式进行绑定,比较方便。

学新通

 首先需要在消费者的监听器中绑定交换机和队列,并使用通配符的模式指定key,如下:

  1.  
     
  2.  
    @RabbitListener(bindings = @QueueBinding(
  3.  
    value = @Queue(name = "topic.queue1"),
  4.  
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
  5.  
    key = "china.#"
  6.  
    ))
  7.  
    public void listenTopicQueue1(String msg){
  8.  
    System.out.println("消费者接收到topic.queue1的消息:【" msg "】");
  9.  
    }
  10.  
     
  11.  
    @RabbitListener(bindings = @QueueBinding(
  12.  
    value = @Queue(name = "topic.queue2"),
  13.  
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
  14.  
    key = "#.news"
  15.  
    ))
  16.  
    public void listenTopicQueue2(String msg){
  17.  
    System.out.println("消费者接收到topic.queue2的消息:【" msg "】");
  18.  
    }

按照key发送消息到交换机,交换机会路由到相应的队列,如下:
 

  1.  
    @Test
  2.  
    public void testExchange3(){
  3.  
    //交换机名称
  4.  
    String exchangeName = "itcast.topic" ;
  5.  
    //消息
  6.  
    String message = "大厂offer" ;
  7.  
    rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
  8.  
    }

2.8、SpringAMQP之消息转换器

SpringAMQP会将对象序列化成字节后发送,然后进行反序列化即可接收。

学新通

首先引入依赖,json的依赖,如下:

  1.  
    <dependency>
  2.  
    <groupId>com.fasterxml.jackson.core</groupId>
  3.  
    <artifactId>jackson-databind</artifactId>
  4.  
    </dependency>

在发布者和消费者的配置类中声明消息转换器,即序列化和反序列化,实现消息转换。

  1.  
    @Bean
  2.  
    public MessageConverter messageConverter(){
  3.  
    return new Jackson2JsonMessageConverter();
  4.  
    }

 剩下的就是发送消息,监听并消费消息即可。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhibieii
系列文章
更多 icon
同类精品
更多 icon
继续加载