• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Java连接rabbitMQ三步

武飞扬头像
井上泷奈
帮助1

rabbitMQ安装教程网上特别多就不多赘述,这里主要说一下怎么去连接

第一步,创建工程添加依赖

创建一个Maven项目,打开pom.xml,添加两个依赖,并更新Maven。

  1.  
    <?xml version="1.0" encoding="UTF-8"?>
  2.  
    <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.  
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.  
    <modelVersion>4.0.0</modelVersion>
  6.  
     
  7.  
    <groupId>org.example</groupId>
  8.  
    <artifactId>rabbitMQdemo</artifactId>
  9.  
    <version>1.0-SNAPSHOT</version>
  10.  
     
  11.  
    ------------添加下面两个依赖------------
  12.  
     
  13.  
    <dependencies>
  14.  
    <dependency>
  15.  
    <groupId>com.rabbitmq</groupId>
  16.  
    <artifactId>amqp-client</artifactId>
  17.  
    <version>5.14.0</version>
  18.  
    </dependency>
  19.  
    <dependency>
  20.  
    <groupId>org.slf4j</groupId>
  21.  
    <artifactId>slf4j-nop</artifactId>
  22.  
    <version>1.7.25</version>
  23.  
    </dependency>
  24.  
    </dependencies>
  25.  
     
  26.  
    </project>
学新通

第二步,配置连接

在src->main->java中新建一个文件夹utils,在此文件夹中添加class:rabbitMQUtils

  1.  
    package utils;
  2.  
     
  3.  
    import com.rabbitmq.client.Channel;
  4.  
    import com.rabbitmq.client.Connection;
  5.  
    import com.rabbitmq.client.ConnectionFactory;
  6.  
     
  7.  
    public class RabbitMQUtils {
  8.  
     
  9.  
    private static final ConnectionFactory factory;
  10.  
     
  11.  
    static {
  12.  
    factory = new ConnectionFactory();
  13.  
    factory.setHost("126.239.25.24"); // 换成自己的ip
  14.  
    factory.setPort(5672); // 一般默认端口为5672
  15.  
    factory.setUsername("root");
  16.  
    factory.setPassword("123456");
  17.  
    factory.setAutomaticRecoveryEnabled(true); // 开启Connection自动恢复功能
  18.  
    factory.setNetworkRecoveryInterval(5000);
  19.  
    factory.setVirtualHost("/");
  20.  
    factory.setConnectionTimeout(30 * 1000);
  21.  
    factory.setHandshakeTimeout(30 * 1000);
  22.  
    factory.setShutdownTimeout(0);
  23.  
    }
  24.  
     
  25.  
    // 定义提供连接对象的方法
  26.  
    public static Connection getConnection() {
  27.  
    try {
  28.  
    return factory.newConnection();
  29.  
    } catch (Exception e) {
  30.  
    e.printStackTrace();
  31.  
    }
  32.  
    return null;
  33.  
    }
  34.  
     
  35.  
    // 定义关闭通道和连接的方法
  36.  
    public static void closeAll(Channel chan, Connection conn) {
  37.  
    try{
  38.  
    if(chan != null) chan.close();
  39.  
    if(conn != null) conn.close();
  40.  
    } catch (Exception e) {
  41.  
    e.printStackTrace();
  42.  
    }
  43.  
    }
  44.  
    }
学新通

需要注意的是要将ip、用户名、密码更换成自己的,如果是在服务器上装的MQ记得在安全组中把端口放行,另外记得设置虚拟用户"/",或改成自己的虚拟用户名。

第三步,测试连接

使用直连测试连接效果,在src->main->java创建文件夹direct,添加两个class:consumer和producer。

  1.  
    package direct;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
    import utils.RabbitMQUtils;
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    public class producer {
  8.  
     
  9.  
    public static void main(String[] args) throws IOException {
  10.  
     
  11.  
    Connection connection = RabbitMQUtils.getConnection();
  12.  
    assert connection != null;
  13.  
    final Channel channel = connection.createChannel();
  14.  
     
  15.  
    // 声明交换机
  16.  
    channel.exchangeDeclare("direct_router", "direct", true, false, false, null);
  17.  
     
  18.  
    // 声明消息队列
  19.  
    // 参数1: 队列名称
  20.  
    // 参数2: 定义是否需要持久化队列,true为持久化
  21.  
    // 参数3: 定义是否让当前连接独占队列,true为独占
  22.  
    // 参数4: 是否在消费完成后自动删除队列,true为删除
  23.  
    // 参数5: 额外附加参数,传入HashMap<String, Object>
  24.  
    channel.queueDeclare("direct_queue", false, false, false, null);
  25.  
     
  26.  
    // 交换机与消息队列绑定
  27.  
    channel.queueBind("direct_queue", "direct_router", "Dir-RQ");
  28.  
     
  29.  
    // 发送消息
  30.  
    // 参数1: 交换机名称
  31.  
    // 参数2: routingKey
  32.  
    // 参数3: 若为true,则当消息无法送达指定队列时会触发channel.BasicReturn事件否则broker会将消息直接丢弃
  33.  
    // 参数4: 传递消息的额外设置
  34.  
    // 参数5: 发送消息内容
  35.  
    String msg="direct producer message";
  36.  
    channel.basicPublish("direct_router", "Dir-RQ", true, null, msg.getBytes());
  37.  
     
  38.  
    RabbitMQUtils.closeAll(channel, connection);
  39.  
    }
  40.  
    }
学新通
  1.  
    package direct;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
    import utils.RabbitMQUtils;
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    public class consumer {
  8.  
     
  9.  
    public static void main(String[] args) throws IOException, InterruptedException {
  10.  
     
  11.  
    Connection connection = RabbitMQUtils.getConnection();
  12.  
    assert connection != null;
  13.  
    final Channel channel = connection.createChannel();
  14.  
     
  15.  
    DefaultConsumer consumer = new DefaultConsumer(channel){
  16.  
    @Override
  17.  
    public void handleDelivery(
  18.  
    String consumerTag,
  19.  
    Envelope envelope,
  20.  
    AMQP.BasicProperties properties,
  21.  
    byte[] body
  22.  
    ) throws IOException {
  23.  
     
  24.  
    // 打印消息
  25.  
    String msg= new String(body, "utf-8");
  26.  
    System.out.println(msg);
  27.  
     
  28.  
    // 应答机制 将队列中的消息删除掉,第二个参数为是否需要确认多个ACK
  29.  
    channel.basicAck(envelope.getDeliveryTag(), false);
  30.  
    }
  31.  
    };
  32.  
     
  33.  
    // 接收消息
  34.  
    // 参数1: 队列名称
  35.  
    // 参数2: 是否自动签收消息,对应上面的应答机制,最好手动删,否则消费者多了可能会出问题
  36.  
    channel.basicConsume("direct_queue", false, "ConsumerTag", consumer);
  37.  
     
  38.  
    Thread.sleep(20000);
  39.  
    RabbitMQUtils.closeAll(channel, connection);
  40.  
    }
  41.  
    }
学新通

右键运行consumer,20s内运行producer就能看见控制台中接收到了producer发出的消息。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgghjei
系列文章
更多 icon
同类精品
更多 icon
继续加载