• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

SpringBoot 集成RabbitMQ

武飞扬头像
Julywhj
帮助1

目录

SpringBoot 集成RabbitMQ

一、Docker安装Rabbit MQ

二、SpringBoot项目初始化

三、SpringBoot配置RabbitMQ

3.1、创建RabbitMqConfig

3.2、创建队列常量类`SimpleMqConstant`

3.3、创建Simple对象

3.4、创建队列消费者SimpleConsumer

3.5、创建队列生产者SimpleProducer

3.6、创建单元测试类SimpleMqTest

四、思考:我们这样写会存在什么问题?


SpringBoot 集成RabbitMQ

一、Docker安装Rabbit MQ

运行下面命令,docker 可自动拉取镜像,并启动mq。

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

我们执行完成后可以运行docker ps查看下mq运行情况

学新通

我们看到RabbitMq已经启动成功,我们在浏览器中打开ip:15672显示如下:

学新通

输入用户名密码,默认用户名guest/guest;登录成功后显示如下界面。

学新通

至此,RabbitMQ安装完成。

二、SpringBoot项目初始化

我们使用Spring initalizr初始化SpringBoot 项目,Spring initalizr

学新通

这里我们通过Spring官网初始化项目,并添加RabbitMQ的依赖,我们直接点击生成,代码会自动下载下来,我们将下载的代码导入到idea中(我这里的idea是社区版不支持Spring,故在官网初始化项目)。

学新通

项目导入到idea后,我们新创建个controller包,在包中创建IndexController.class。我们使用创建的Controller测试下我们的工程,在IndexController.class中我们添加一下内容:

  1.  
    @RestController
  2.  
    public class IndexController {
  3.  
        @GetMapping("/index")
  4.  
        public String index() {
  5.  
            return "Hello RabbitMQ";
  6.  
        }
  7.  
    }

启动工程,在浏览器中访问127.0.0.1:8080/index可以看到浏览器中出现“Hello RabbitMQ”,说明我们的工程初始化没有问题。

学新通

三、SpringBoot配置RabbitMQ

3.1、创建RabbitMqConfig

默认RabbitMQ序列化方式是SerializerMessageConverter序列化器,这么我们使用Jackson2JsonMessageConverter序列化器。我们需要设置下,内容如下:

  1.  
    @Configuration
  2.  
    public class RabbitMqTemplateConfig {
  3.  
     
  4.  
        @Bean
  5.  
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
  6.  
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  7.  
            rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
  8.  
            return rabbitTemplate;
  9.  
        }
  10.  
     
  11.  
        @Bean
  12.  
        public MessageConverter jackson2JsonMessageConverter() {
  13.  
            return new Jackson2JsonMessageConverter();
  14.  
        }
  15.  
    }
学新通

完善SpringBoot配置文件,配置文件内容如下:

  1.  
    spring.rabbitmq.host=110.40.141.168
  2.  
    spring.rabbitmq.port=5672
  3.  
    spring.rabbitmq.virtual-host=/

这里我们使用的是application.properties,而非yaml。使用yaml的可以自行转换下。

3.2、创建队列常量类`SimpleMqConstant`

这里只做简单的功能演示,我们把队列的名称统一定义在常量类SimpleMqConstant类中,后续我们扩展其他队列方便维护。

  1.  
    /**
  2.  
     * @Author julyWhj
  3.  
     * @Description 默认的交换机测试$
  4.  
     * @Date 2021/10/7 10:52 上午
  5.  
     **/
  6.  
    public class SimpleMqConstant {
  7.  
     
  8.  
        /**
  9.  
         * 处理对象的MQ队列
  10.  
         */
  11.  
        public static final String HANDLER_OBJECT_QUEUE_NAME = "com.july.mq.simple.object";
  12.  
     
  13.  
    }

这里我们定义队列名称叫:com.july.mq.simple.object;

3.3、创建Simple对象

这里我们创建一个Simple对象,使用该对象进行序列化发送。

  1.  
    /**
  2.  
     * @Author julyWhj
  3.  
     * @Description Simple对象$
  4.  
     * @Date 2021/10/7 10:55 上午
  5.  
     **/
  6.  
    @Data
  7.  
    @Builder
  8.  
    @AllArgsConstructor
  9.  
    @NoArgsConstructor
  10.  
    public class Simple implements Serializable {
  11.  
        private String name;
  12.  
        private String no;
  13.  
        private int age;
  14.  
        private String phone;
  15.  
        private Date createTime;
  16.  
    }
学新通

3.4、创建队列消费者SimpleConsumer

我们创建SimpleConsumer类,做为队列的消费者,内容如下:

  1.  
    /**
  2.  
     * @Author julyWhj
  3.  
     * @Description 消费者$
  4.  
     * @Date 2021/10/7 10:57 上午
  5.  
     **/
  6.  
    @Component
  7.  
    @Slf4j
  8.  
    public class SimpleConsumer {
  9.  
     
  10.  
     
  11.  
        @RabbitListener(queuesToDeclare = @Queue(SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME))
  12.  
        @RabbitHandler
  13.  
        public void receiveObject(Simple simple) throws JsonProcessingException {
  14.  
            ObjectMapper objectMapper = new ObjectMapper();
  15.  
            String message = objectMapper.writeValueAsString(simple);
  16.  
            log.info("simple consumer receive the object:{}", message);
  17.  
        }
  18.  
    }
学新通

这里我们使用@RabbitListener(queuesToDeclare = @Queue(SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME)) 其中queuesToDeclare它可以在队列SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME不存在的时候自动创建队列,不会出现reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX' in vhost '/', class-id=50, method-id=的异常。

这里我们接收到消息后,只做打印处理,不做其他处理。

3.5、创建队列生产者SimpleProducer

队列生产者SimpleProducer内容如下:

  1.  
    /**
  2.  
     * @Author julyWhj
  3.  
     * @Description 生产者$
  4.  
     * @Date 2021/10/7 10:54 上午
  5.  
     **/
  6.  
    @Component
  7.  
    public class SimpleProducer {
  8.  
        @Autowired
  9.  
        private RabbitTemplate rabbitTemplate;
  10.  
     
  11.  
        /**
  12.  
         * 消息体为对象。配置MessageConverter为Jackson2JsonMessageConverter即可
  13.  
         *
  14.  
         * @param simple
  15.  
         */
  16.  
        public void sendOrderMessage(Simple simple) {
  17.  
            rabbitTemplate.convertAndSend(SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME, simple);
  18.  
        }
  19.  
    }
学新通

生产者内容很简单,接收Simple对象,调用convertAndSend 方法发送对象。

3.6、创建单元测试类SimpleMqTest

这里我们使用单元测试进行消息的发送和接收测试,测试类内容如下:

  1.  
    /**
  2.  
     * @Author julyWhj
  3.  
     * @Description $
  4.  
     * @Date 2021/10/7 10:58 上午
  5.  
     **/
  6.  
    @SpringBootTest
  7.  
    @Slf4j
  8.  
    public class SimpleMqTest {
  9.  
        @Autowired
  10.  
        private SimpleProducer simpleProducer;
  11.  
     
  12.  
        @Test
  13.  
        public void testSimple() throws Exception {
  14.  
            for (int i = 0; i < 10; i ) {
  15.  
                simpleProducer.sendOrderMessage(Simple.builder()
  16.  
                        .createTime(new Date())
  17.  
                        .name("JulyWhj")
  18.  
                        .age(i)
  19.  
                        .no("ID-0001")
  20.  
                        .phone("138XXXXXXXX")
  21.  
                        .build());
  22.  
            }
  23.  
        }
  24.  
    }
学新通

我们运行单元测试,看下执行结果:

学新通

可以看到,消费者成功接收到10条数据,并成功打印出来。

四、思考:我们这样写会存在什么问题?

我们这样写会存在一个致命的问题,消息丢失

如何造成的消息丢失,我们应该怎么处理保证消息不丢失。后续的文章会为大家逐一分析。这里我们先简单的使用SpringBoot连接MQ,进行收发消息的Demo。

源码我上传github中,需要的可自行下载。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhghabfa
系列文章
更多 icon
同类精品
更多 icon
继续加载