• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

二、activemq简介、安装和入门| 8月更文挑战

武飞扬头像
MrYangZCh
帮助7

一、activemq简介与安装

 ​

1. ActiveMQ 简介

ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。

消息队列中间件是分布式系统中的重要组件,主要解决异步消息,应用解耦,流量削锋等问题,从而实现高性能,高可用,可伸缩和最终一致性的架构

使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,MetaMQ等

2.activemq下载与安装

  1. activemq.apache.org/download.ht…

2.目录结构

启动activemq步骤: bin -->win64(或者win32) -->activemq.bat 点击运行即可

3.启动完毕,http://localhost:8161/admin/index.jsp 用户名:admin 密码 admin

activemq 入门级代码

 ​

可以启动多个消费者 来区分 队列和 主题模式  

1.queue (session会话 )

  1. 生产者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 生产者
 */
public class Producer {

    /**
     * 用户名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密码
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void send(String message) {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();

            connection.start();
            // true 为开启会话 需要提交 
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //创建队列
            final Queue queue = session.createQueue("test");
            final MessageProducer producer = session.createProducer(queue);

            TextMessage textMessage = session.createTextMessage(message);

            //发送
            producer.send(textMessage);

            // 开启会话 需要提交 
            // session.commit(); 
            producer.close();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        producer.send("hello world");
    }
}

2.消费者

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 消费者
 */

public class Consumer {

    /**
     * 用户名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密码
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void receive() {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //创建队列
            final Queue queue = session.createQueue("test");

            final MessageConsumer messageConsumer = session.createConsumer(queue);


            messageConsumer.setMessageListener(n -> {

                try {
                    TextMessage msg = (TextMessage) n;

                    final String text = msg.getText();

                    if (text.equalsIgnoreCase("hello world")) {
                        System.out.println("               接受信息:         "   msg.getText());
                    } else {//默认为6次
                        System.out.println("     测试重发次数 " );
                        int i = 1 / 0;
                    }


                } catch (JMSException e) {
//                    e.printStackTrace();
                }

            });

        } catch (JMSException e) {
//            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        Consumer consumer = new Consumer();
        consumer.receive();
    }

2.topic  首先要启动 消费者进行订阅

1.producer 生产者

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 生产者
 */
public class TopicProducer {

    /**
     * 用户名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密码
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void send(String message) {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);


            //创建队列
            final Topic topic = session.createTopic("topic-test");
            final MessageProducer producer = session.createProducer(topic);

            TextMessage textMessage = session.createTextMessage(message);

            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//            producer.setTimeToLive(10);

//            发送
            producer.send(textMessage);
//            producer.send(textMessage, DeliveryMode.PERSISTENT, 1, 60 * 60 * 24);

//            session.commit();

            producer.close();

            session.close();

            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        TopicProducer producer = new TopicProducer();
        producer.send("hello world");

    }

2.consumer 消费者 

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 消费者
 */


public class TopicConsumer {

    /**
     * 用户名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密码
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void receive() {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();
             //设置客户端id
          //  connection.setClientID("client-1");

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //创建队列
            final Topic topic = session.createTopic("topic-test");

            final MessageConsumer messageConsumer = session.createConsumer(topic);//普通订阅
//            MessageConsumer consumer = session.createDurableSubscriber(topic,"bb"); //持久订阅

            messageConsumer.setMessageListener(n -> {

                try {
                    TextMessage msg = (TextMessage) n;

                    final String text = msg.getText();

                    if (text.equalsIgnoreCase("hello world")) {
                        System.out.println("               接受信息:         "   msg.getText());
                    } else {
                        System.out.println("     测试重发次数 ");
                        int i = 1 / 0;
                    }


                } catch (JMSException e) {
//                    e.printStackTrace();
                }

            });

        } catch (JMSException e) {
//            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        TopicConsumer consumer = new TopicConsumer();
        consumer.receive();
    }


}

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanggfkgi
系列文章
更多 icon
同类精品
更多 icon
继续加载