• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ基础组件笔记

武飞扬头像
缩缩北行鸟
帮助1

就像编写定时任务中间件,这里想要编写一个 RabbitMQ 发送消息的基础组件。
希望它具备以下功能:

  • 发送迅速消息(消息发送之后就算了)
  • 发送确认消息(消息发送后回调,在日志中会输出成功或失败)
  • 可靠性消息(消息发送之后,会检查是否发送成功,如果失败了,会定时重新发送,直至发送成功或者超出重试次数,注意保障数据库和所发的消息是原子性的)

构建思路

第一步,不知所措的时候我们可以由简做起,可以先构思 API 接口,构思创建哪些接口,它主要满足什么功能,后续再逐一实现这些接口,初步构思以下接口:

  • 发送一条消息
  • 发送多条消息
  • 发送一条消息后,自定义回调函数

这里我第一反应是构思三个接口,直接就是迅速消息、确认消息、可靠性消息

第二步,在定义每个接口的时候,还要设想它是否需要返回值和参数,这里需要构思一个Java对象,它包含所有发送该消息所需的信息
设想一下发送一条消息最重要需要什么信息?

  • 交换机名称
  • 路由名称
  • 消息内容,即消息体
  • 消息唯一 ID
  • 消息类型(迅速消息、确认消息、可靠性消息),默认为迅速消息

此外自定义一些附加的属性信息,比如想要实现延迟发送消息,就可以添加延迟发送时间(毫秒)

第三步,就是实现最简单的 “发送迅速消息” 接口,并以此为基点,通过 RabbitTemplatesetConfirmCallback 方法可以设置发送消息回调函数,进而实现 “发送确认消息” 接口,而 “可靠性消息” 就是以此为基点,在发送消息之前先在数据库中记录下该消息的发送状态并设置为 等待(包含已经尝试发送次数,下次尝试发送时间等信息),在消息回调函数中,如果是成功回调,则将数据库中的消息的状态设置为 发送成功,同时创建一个定时器,它的任务就是将数据库中所有发送状态为 等待 的消息提取出来,重新发送,将发送次数超过规定次数的消息状态设置为 发送失败

1 知识点

1.1 建造者模式

建造者模式 Bulider Pattern 是将一个复杂对象的构建过程与它的实现表示分离。在 建造者模式 里,开发者可以在用户定义工具类创建对应实体的过程中,编写一些自定义的规则,从而让整个项目更加健壮、安全。
创建 Message 对象建造者模式的 工具类,思路可以分为三步:

  • 尽量让用户使用诸如 create 方法来初始化工具类,而避免用户直接使用构造函数创建工具类,即需要将工具类的默认构造函数私有化
  • 用户在初始化完工具类后,就可以使用一系列的方法给工具类声明 Message 对象的属性值,这个过程是可以链式调用的,即每个方法需要返回该工具类实例 thisMessage 对象的属性值,工具类也需要一个一个地声明这些属性值)
  • 最后调用诸如 build 方法,根据之前链式调用时设置 工具类 的属性值,从而构建对应的 Message 对象

1.2 编写自定义错误

异常可以分为 非运行时异常 Exception运行时异常 RuntimeException

  • 非运行时异常也叫作编译时异常,如果不对这种异常进行捕获则无法编译,Java编译器要求程序员必须对这种异常捕获,Java认为这种异常都是可以被修复的异常,所以 Java 程序要求显式处理这种异常
  • 运行时异常是不用进行捕获的,当这种异常发生时,JVM 会进行处理。比如:
    • ClassCastException (类转换异常)
    • IndexOutOfBoundsException (数组越界)
    • NullPointerException (空指针)
    • ArrayStoreException (数据存储异常,操作数组时类型不一致)

创建自定义异常时,非运行时异常继承 Exception,而运行时异常继承 RuntimeException,然后重新声明一下构造函数即可。

1.3 默认初始化数据表

在上述思路中,可以知道在实现 可靠性消息 接口时,需要操作数据库,用于存储消息状态。明显的,用户在使用这个工具包时无需自行在数据库创建该数据表,那么如何如何在用户使用本工具包时,自动的为用户初始化数据表呢,这里可以使用 spring.factories 自动注入自定义配置类,该配置类扫描特定的配置,其中就包括初始化数据库表。
由于还是需要用户配置数据库连接信息,所以这里使用 @ConditionalOnProperty 注释来检测是否包含数据库连接信息,再进行导入。
而除了连接数据库的主要信息外,其它诸如数据库驱动之类的就使用本工具包配置文件提供默认的配置了,即使用到 @PropertySource@ConfigurationProperties,最后将数据源 Bean 注册到 Spring 容器中即可。(@ConfigurationProperties 注释)
接着就是编写创建数据表的逻辑了,只要在 Spring 容器中注入 DataSourceInitializer 类型的 Bean,在这个 Bean 里添加数据源和构建数据表 sql 脚本,spring 就会自动地在数据源加载完成后调用它

1.3.1 @ConditionalOnProperty

在 Spring boot 中有时候需要根据 application.ymlapplication.properties 文件设置某个属性,控制配置类是否生效。此时可以使用 @ConditionalOnProperty 注解来控制 @Configuration 是否生效(条件注释不能声明在 spring.factories 自动注入类之上)

  • prefix 属性:设置属性名的前缀
  • name 属性:具体属性名的名称
  • hadingValue 属性:设置该属性的属性值,明确表明,仅当该属性设置为对应的值时,才加载 @Configuration 配置文件
  • matchIfMissing 属性:默认是 false,设置为 false 则表示没有设置该属性,则不生效该配置文件

1.3.2 @PropertySource@ConfigurationProperties@value

参考文章:《Spring读取配置获取配置yml文件的 3 种方法》

@PropertySource 加载指定的属性文件 *.properties 到 Spring 的容器中。可以配合 @Value
@ConfigurationProperties 使用。

  • @Value(“${xxxx}”)超级详细的参考文章,从加载指定配置文件读取对应的属性值,配置文件中设置了 rabbit.producer.druid.type=com.zaxxer.hikari.HikariDataSource,然后按 @Value("{rabbit.producer.druid.type}") 注释可以直接赋值给一个 Class 类型变量,如:
@Value("${rabbit.producer.druid.type}")
private Class<? extends DataSource> dataSourceType;

同时也可以使用 @Value("classpath:initTable.sql") 的形式导入外部资源文件,如:

@Value("classpath:initTable.sql")
private Resource schemaScript;
  • @ConfigurationProperties:它是实现了 BeanPostProcessor 接口,在 bean 被 实例化后,会调用后置处理,递归的查找属性文件的属性,通过反射注入值(在没有使用 @PropertySource 指定属性文件下,它默认加载的是 application.xmlapplication.properties 文件),对大多数属性而言强制需提供其 settergetter 方法。

注意:@PropertySource 这个指定加载的属性文件可以是本工具包内的,也可以是调用本工具包的父项目中的,同理 @ConfigurationProperties 在没有使用 @PropertySource 指定属性文件下,它默认加载的是 application.xmlapplication.properties 文件来自于这个工具包中以及调用这个工具包的父项目**

1.3.3 DataSourceInitializer

DataSourceInitializer 的作用是用于启动项目时执行一些自定义 SQL 脚本,一些数据表的初始化就可以在这里执行了。
在 Spring 容器中注册一个 DataSourceInitializer 类型的 Bean:

  • 使用 @Value 导入编辑好了 SQL 脚本资源文件 Resource
  • 导入配置好的 DataSource 数据源 Bean
  • 初始化 DataSourceInitializer 对象,设置对应的数据源和 SQL 脚本资源属性,最后将这个 DataSourceInitializer 对象注册到 Spring 容器中即可

1.4 配置 Mybatis

数据库的操作同样少不了配置 Mybatis:

1.4.1 SqlSessionFactory

SqlSessionFactory 是 MyBatis 的核心对象,用于初始化 MyBatis,读取配置文件,创建 SqlSession 对象,SqlSessionFactory 是全局对象,为保证其在应用中全局唯一SqlSession 是 MyBatis 操作数据库的核心对象,SqlSession 使用 JDBC 方式与数据库交互,同时提供了数据表的 CRUD(增删改查)对应的 api 方法。
这就导致一个问题就是:这里配置了 SqlSessionFactory ,外部调用的父项目该如何配置数据库?如果外头使用 Spring Boot 的正常方式去配置数据库,最后会报期待 SqlSessionFactory 是单例,Spring 容器中却发现有两个,不知道选用哪个的错误。
这里就在父项目中使用一下注解,先取消 Spring Boot 的默认数据库配置:

// springboot 默认会自动装配数据库,因此需要配置数据库信息,否则会报错,添加 exclude,取消自动装配数据库
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class})

使用 SqlSessionFactoryBean 创建 SqlSessionFactory,这 SqlSessionFactoryBean 需要设置数据源,设置 Mapper 的 XML 文件路径,设置数据表实体类包的路径。

1.4.2 MapperScannerConfigurer

除了配置数据源,Mapper 的 XML 文件路径,数据表实体类包的路径外,还需要配置扫描 DAO 层路径,这里就使用到了 MapperScannerConfigurer(我这里使用的是工具类 tk.mybatis.spring.mapper.MapperScannerConfigurer),设置扫描 DAO 层路径和设置上面设置的 SqlSessionFactory
然后我出现一个问题,在这里就卡了半天。开始我不解教程里为何将 SqlSessionFactoryMapperScannerConfigurer 拆分到两个配置类中,同时也不解为什么在 MapperScannerConfigurer 设置 SqlSessionFactory 的时候,不是直接使用 @Autowired 之类的形式导入实例类,而是使用设置 BeanName 的形式(不过这个当时没有细究),于是自作主张地只写一个配置类,并将 SqlSessionFactoryMapperScannerConfigurer 的注册 Bean 写在这里面,然后打包测试使用的时候,就报错找不到数据源之类的,那时候眼看着明明使用 @Resource 注入进来的数据源,百思不得其解。
最后迫不得已将项目模仿着教程一步步地敲打测试,最终将 SqlSessionFactoryMapperScannerConfigurer 拆分到两个配置类中,项目才得以顺利执行,原来问题都出在 MapperScannerConfigurer 这里。

参考文章:
《static关键字真能提高Bean的优先级吗?》(重点)
《Spring之BeanPostProcessor(后置处理器)介绍》
《【小家Spring】Spring IOC容器启动流程 AbstractApplicationContext#refresh()方法源码分析(一)》
《BeanDefinitionRegistryPostProcessor用法-注册自己新bean》

这个 MapperScannerConfigurer 实现 BeanDefinitionRegistryPostProcessor 接口,BeanDefinitionRegistryPostProcessor 接口又继承自 BeanFactoryPostProcessor 接口。而在 Spring Bean 的生命周期中,BeanDefinitionRegistryPostProcessor 会最先实例化(比 BeanFactoryPostProcessor 还要靠前),而 BeanFactoryPostProcessor 可以称为 Bean 工厂的后置处理器,通俗一些就是可以管理我们的 bean 工厂内所有的 beandefinition(未实例化)数据,可以随心所欲的修改属性。
也就是说实现 BeanDefinitionRegistryPostProcessor 接口的 MapperScannerConfigurer 在 Spring Bean 的初始化中会被最先实例化,这就导致了 它所在的配置类 需要比 MapperScannerConfigurer 还要靠前地实例化,那么此时 @Autowired@Resource 等注解还没能生效,也就是说此时使用 @Resource 注入进来的数据源是根本无法注入,这也导致了 SqlSessionFactory 无法实例化,从而出错了,而此时处理方法有两个:

  • 分开两个配置类去写
  • 将对 MapperScannerConfigurer 添加 static 关键字提升优先级,此时相当于将其定义成类的静态方法,这样就无需先实例化 它所在的配置类

1.5 异步执行

为了提高发送消息的效率,将消息修改成异步发送。

1.5.1 ExecutorService

参考文章:
《ExecutorService详解》

ExecutorService 是 Java 提供的线程池,每次需要使用线程的时候,可以通过 ExecutorService 获得线程。它可以有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
创建 ExecutorService

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:核心线程数,一旦创建将不会再释放。如果创建的线程数还没有达到指定的核心线程数量,将会继续创建新的核心线程,直到达到最大核心线程数后,核心线程数将不在增加;如果没有空闲的核心线程,同时又未达到最大线程数,则将继续创建非核心线程;如果核心线程数等于最大线程数,则当核心线程都处于激活状态时,任务将被挂起,等待空闲线程来执行。
  • maximumPoolSize:最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。
  • keepAliveTime:也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。
  • unit:时间单位,TimeUnit.SECONDS 等。
  • workQueue:任务队列,存储暂时无法执行的任务,等待空闲线程来执行任务。BlockingQueue 它的特性是在任意时刻只有一个线程可以进行 take 或者 put 操作
  • threadFactory:线程工程,用于创建线程。
  • handler:当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序

创建了 ExecutorService 还需要委托任务:

  • execute(Runnable):方法 execute(Runnable) 接收 java.lang.Runnable 对象作为参数,并且以异步的方式执行它,不返回结果
  • submit(Runnable):方法 submit(Runnable) 同样接收 java.lang.Runnable 的作为参数,但是会返回一个 Future 对象,这个 Future 对象可以用于判断 Runnable 是否结束执行,如果任务结束执行,调用 future.get() 会返回 null,如:
Future future = executorService.submit(new Runnable() {  
    public void run() {  
        System.out.println("Asynchronous task");  
    }  
});  
//如果任务结束执行则返回 null  
System.out.println("future.get()="   future.get());  
  • submit(Callable):方法 submit(Callable) 接收的是 Callable 作为参数,Callablecall() 方法可以返回结果,而此时 Future 对象就可以读取到这个结果,如:
Future future = executorService.submit(new Callable(){  
    public Object call() throws Exception { 
        return "Callable Result";  
    }  
});  
//如果任务结束执行则返回 null  
System.out.println("future.get()="   future.get());  

当使用 ExecutorService 完毕之后,我们应该关闭它,这样才能保证线程不会继续保持运行状态。
举例来说,如果项目是通过 main() 方法启动的,并且主线程退出了你的程序,此时假如 ExecutorService 存在于你的程序中,那么程序将会继续保持运行状态。存在于 ExecutorService 中的活动线程会阻止 Java 虚拟机关闭。
为了关闭在 ExecutorService 中的线程,你需要调用 shutdown() 方法。ExecutorService 并不会马上关闭,而是不再接收新的任务,当所有的线程结束执行当前任务ExecutorServie 才会真的关闭。所有在调用 shutdown() 方法之前提交到 ExecutorService 的任务都会执行。
如果希望立即关闭 ExecutorService,需要调用 shutdownNow() 方法。该方法会尝试马上关闭所有正在执行的任务,并且跳过所有已经提交但是还没有运行的任务。但是对于正在执行的任务,是否能够成功关闭它是无法保证的,有可能他们真的被关闭掉了,也有可能它会一直执行到任务结束。

1.6 RabbitTemplate

直接用注入的形式注入 RabbitTemplate 然后使用,由于它是一个单例模式,所以在发送迅速消息时无需设置回调函数,而在发送可靠性消息时,则需要设置回调函数,这就会导致报错,因为它只能设置一次回调函数,同时考虑到不同的发送方式都有可能对 RabbitTemplate 设置、修改,所以这里就创建一个 RabbitTemplate 池。
思路其实就是建立一个全局变量,根据消息类型的分类地建立各个 RabbitTemplate 并放置在这个全局变量中,执行发送消息逻辑时,从中去除对应的 RabbitTemplate 实例。

1.6.1 ConcurrentMap

在上述的思路中,装载 RabbitTemplate 的应该是一个 线性安全 的集合对象。在 java 中存在这样一个概念:线性安全,线性安全必然也会涉及到集合对象,对于集合对象中,存在两种类型,即为 线性安全线性不安全
理解线性安全之前,还需要理解一个概念:线性同步

  • 线性同步:当一个程序对语句(或线性安全的方法)进行访问过程中,其他的将不能对其进行其他相关的操作,必须要等到本次访问结束之后才能对这个语句(或线性安全的方法)进行访问。
  • 线性安全:如果现在代码中有多个线程同时运行,而这些线程可能在同一个时刻运行这段代码,那么如果运行结果个单线程运行的结果是一致的,而且其他的变量值跟预期是一模一样的。那么我们就称之为线性安全。

线性安全问题都是由全局变量及静态变量引起的,其次,在实际的运行过程中,若每个线程中对全局变量,静态变量只有读操作,并无写操作,一般而言,这个全局变量是线程安全的,若有多个线程同时执行写操作,一般需要考虑到线程同步的问题,否则就可能影响到线程的安全问题。

ConcurrentMap 是一个能够支持并发访问的 java.util.map 集合,使用它存储 RabbitTemplate 即可。

1.7 序列化

参考文章:《RabbitMQ发送对象之消息序列化(必踩坑的一个点)》

涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析,RabbitMQ 的序列化是指 Messagebody 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter
当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化。
默认的 SimpleMessageConverter 对于要发送的消息体 bodybyte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含 class 类名,类相应方法等信息,因此性能较差。
这时候就需要自定义序列化方式了。

RabbitTemplate 使用 setMessageConverter 可以设置自定义的序列化方法,传入的对象需要满足两个接口,分别是:

  • toMessage:将发送的消息转化为 RabbitMQ 可以识别的 org.springframework.amqp.core.Message 对象,这里我选用其传入字节数组的构造函数,即现阶段需要将早就定义好的如 top.seiei.pojo.Message 对象如何快速地转化为字节数组
  • fromMessage:接受到消息之后,将接收到的字节数组,反序列化转化为对应原先的 top.taka.pojo.Message 对象

至此使用 com.fasterxml.jackson.databind.ObjectMapper 工具类实现序列化和反序列化的功能。
此时消费者端也需要配置对应序列化方法才能解析获取到对应的 top.seiei.pojo.Message 对象,如:

@Configuration
public class RabbitMQConfig {
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        SerializerImpl serializer = SerializerImpl.createParametricType(Message.class);
        GenericMessageConverter gmc = new GenericMessageConverter(serializer);
        factory.setMessageConverter(gmc);
        return factory;
    }
}

1.8 批量发送消息

1.8.1 ThreadLocal

参考文章:《ThreadLocal原理及使用场景》

用户批量发送消息时候,会调用批量发送消息的接口,传入批量消息集合,此时应该如何处理这个集合,直接把这个参数也传入真正处理发送消息逻辑的 Api 类方法吗?
这样也可以,但为了避免直接传递参数带来的代码耦合问题,这里想要使用一个全局变量,使用全局变量一般都会考虑到 static 静态全局变量,但这就会导致出另外一个问题就是,我们不清楚用户会如何调用这个批量发送消息的接口,假如用户是使用多线程发送批量消息,这里就需要考虑到 使用静态全局变量在多线程环境下不安全 这个问题。
所以借此就引入了 ThreadLocalThreadLocal 叫做线程变量,即 ThreadLocal 中填充的变量属于当前线程,该变量对其他线程而言是隔离的,也就是说该变量是当前线程独有的变量。
使用 ThreadLocal 的好处:

  • 保存每个线程绑定的数据,在需要的地方可以直接获取,避免直接传递参数带来的代码耦合问题;
  • 各个线程之间的数据相互隔离却又具备并发性,避免同步方式带来的性能损失。

1.9 导入 Elastic Job 自定义中间件

<dependency>
    <groupId>top.seiei</groupId>
    <artifactId>seiei-elastic-job</artifactId>
    <version>1.0</version> <!--版本号-->
    <scope>system</scope> <!--作用域-->
    <systemPath>${basedir}\src\lib\TakaRabbitMQ.jar</systemPath>
</dependency>

所以就这里就直接将这个中间件直接注册到本地 Maven 库中:

mvn install:install-file -Dfile=D:/idealFiles/seiei-elastic-job/target/seiei-elastic-job.jar -DgroupId=top.taka -DartifactId=seiei-elastic-job -Dversion=1.0 -Dpackaging=jar

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggjgke
系列文章
更多 icon
同类精品
更多 icon
继续加载