spring boot接入阿里云rabbitmq
接入文档
推荐博客
接入过程
附注:我这边就简单做个消费者接收,没有使用交换机,发送者就直接发送消息
消费者,直接创建了个class
-
<dependency>
-
<groupId>com.rabbitmq</groupId>
-
<artifactId>amqp-client</artifactId>
-
<version>5.5.0</version>
-
</dependency>
-
package cn.mn.ac.eff.client;
-
-
import cn.mn.ac.core.tool.jackson.JsonUtil;
-
import cn.mn.ac.core.tool.utils.BeanUtil;
-
import cn.mn.ac.core.tool.utils.Func;
-
import cn.mn.ac.core.tool.utils.StringUtil;
-
import cn.mn.ac.eff.constant.CommonAttributeConstant;
-
import cn.mn.ac.eff.constant.CommonConstant;
-
import cn.mn.ac.eff.entity.JdyInfo;
-
import cn.mn.ac.eff.entity.JdyRecord;
-
import cn.mn.ac.eff.entity.JdySprint;
-
import cn.mn.ac.eff.model.JdyReqData;
-
import cn.mn.ac.eff.model.JdySprintData;
-
import cn.mn.ac.eff.props.RabbitMqConfig;
-
import cn.mn.ac.eff.service.JdyInfoService;
-
import cn.mn.ac.eff.service.JdyRecordService;
-
import cn.mn.ac.eff.service.JdySprintService;
-
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
-
import com.fasterxml.jackson.databind.JsonNode;
-
import com.rabbitmq.client.*;
-
import lombok.AllArgsConstructor;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.commons.lang.StringUtils;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.stereotype.Component;
-
-
import javax.annotation.PostConstruct;
-
import java.io.IOException;
-
import java.nio.charset.StandardCharsets;
-
import java.util.concurrent.TimeoutException;
-
-
-
@Component
-
@AllArgsConstructor
-
@Slf4j
-
public class JdyConsumer {
-
private static JdyInfoService jdyInfoService; //业务service
-
private static JdyRecordService jdyRecordService; //业务service
-
private static JdySprintService jdySprintService; //业务service
-
private static RabbitMqConfig rabbitMqConfig; //rabbitmq配置参数文件
-
-
@Autowired
-
public void setJdyInfoService(JdyInfoService JdyInfoService) {
-
this.jdyInfoService = JdyInfoService; //自动注入
-
}
-
-
@Autowired
-
public void setJdyRecordService(JdyRecordService jdyRecordService) {
-
this.jdyRecordService = jdyRecordService; //自动注入
-
}
-
-
@Autowired
-
public void setJdySprintService(JdySprintService jdySprintService) {
-
this.jdySprintService = jdySprintService; //自动注入
-
}
-
-
@Autowired
-
public void setRabbitMqConfig(RabbitMqConfig rabbitMqConfig) {
-
this.rabbitMqConfig = rabbitMqConfig; //自动注入
-
}
-
-
@PostConstruct
-
public void receive() throws IOException, TimeoutException {
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost(rabbitMqConfig.getHost()); //rabbitmq阿里云服务器地址
-
factory.setUsername(rabbitMqConfig.getUsername()); //rabbitmq控制台静态用户名
-
factory.setPassword(rabbitMqConfig.getPassword()); //rabbitmq控制台静态密码页面密码
-
factory.setAutomaticRecoveryEnabled(true); //网络异常时连接断开后,开启Connection自动恢复功能
-
factory.setNetworkRecoveryInterval(5000); //网络恢复间隔时间
-
factory.setVirtualHost(rabbitMqConfig.getVhost()); //设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成
-
factory.setPort(rabbitMqConfig.getPort()); //默认端口,非加密端口5672,加密端口5671
-
factory.setConnectionTimeout(300 * 1000); //连接超时时间
-
factory.setHandshakeTimeout(300 * 1000);
-
factory.setShutdownTimeout(0);
-
Connection connection = factory.newConnection(); //创建连接
-
final Channel channel = connection.createChannel(); //创建channel【因为发送方没有使用exchange交换机,所以在rabbitmq提供的文档上创建交换机和队列与交换机的绑定步骤没有,直接声明了channel进行消息发哦送】
-
channel.basicConsume(rabbitMqConfig.getBindingKey(), false, "ConsumerTag", new DefaultConsumer(channel) {
-
@Override
-
public void handleDelivery(String consumerTag, Envelope envelope,
-
AMQP.BasicProperties properties, byte[] body)
-
throws IOException {
-
log.info("receive msg deliveryTag {} messageId {}", envelope.getDeliveryTag(), properties.getMessageId());
-
saveOrUpdate(new String(body, StandardCharsets.UTF_8));
-
channel.basicAck(envelope.getDeliveryTag(), false); //手动ack【个人理解,如果不手动ack的话,业务代码在消费消息失败的时候实际消息没有被消费,导致消息丢失】
-
log.info("consume msg end");
-
}
-
});
-
}
-
-
//业务代码消费消息
-
public void saveOrUpdate(String msg) {
-
if (StringUtil.isBlank(msg)) return;
-
JsonNode readTree = JsonUtil.readTree(msg);
-
JsonNode data = readTree.get("data");
-
JdyReqData jdyReqData = JsonUtil.parse(data.toString(), JdyReqData.class);
-
log.info("start to consume dataId {} ", jdyReqData.getDataId());
-
JsonNode event = readTree.get("op");
-
Long curr = System.currentTimeMillis();
-
JdyRecord record = jdyRecordService.list(Wrappers.lambdaQuery(JdyRecord.class).eq(JdyRecord::getRecordId, jdyReqData.getDataId())).stream().findFirst().orElse(null);
-
JdyRecord jdyRecord = JdyRecord.builder().recordId(jdyReqData.getDataId())
-
.content(msg)
-
.source(Func.equals(CommonAttributeConstant.JDY_REQ, jdyReqData.getFormName()) ? CommonConstant.status_off : CommonConstant.status_on)
-
.id(record == null ? null : record.getId())
-
.createTime(curr).event(event != null ? event.asText() : null).build();
-
if (record == null) jdyRecordService.save(jdyRecord);
-
if (record != null) jdyRecordService.updateById(jdyRecord);
-
if (Func.equals(jdyReqData.getFormName(), CommonAttributeConstant.JDY_SPRINT)) {
-
JdySprintData sprintData = JsonUtil.parse(readTree.get("data").toString(), JdySprintData.class);
-
sprintUpdate(sprintData);
-
return;
-
}
-
JdyInfo jdyInfo = BeanUtil.copy(jdyReqData, JdyInfo.class);
-
jdyInfo.setUserName(jdyReqData.getUserInfo() != null ? jdyReqData.getUserInfo().getName() : "");
-
jdyInfo.setCreator(jdyReqData.getCreator() != null ? jdyReqData.getCreator().getName() : "");
-
jdyInfo.setUpdater(jdyReqData.getUpdater() != null ? jdyReqData.getUpdater().getName() : "");
-
jdyInfo.setDeptName(jdyReqData.getDeptInfo() != null ? jdyReqData.getDeptInfo().getName() : "");
-
jdyInfo.setFlowStatus(jdyReqData.getFlowStatus() != null ? Integer.valueOf(jdyReqData.getFlowStatus()) : null);
-
if (jdyReqData.getFlowStatus() != null) jdyInfo.setFlowStatusName(CommonAttributeConstant.FLOWSTATUSMAP.getOrDefault(Integer.valueOf(jdyReqData.getFlowStatus()), ""));
-
jdyInfo.setCreateTime(jdyReqData.getCreateTime());
-
jdyInfo.setUpdateTime(jdyReqData.getUpdateTime());
-
jdyInfo.setReqAcceptorDept(jdyReqData.getReqAcceptorDept() != null ? jdyReqData.getReqAcceptorDept().getName() : "");
-
jdyInfo.setReqAcceptorEst(jdyReqData.getReqAcceptorEst() != null ? jdyReqData.getReqAcceptorEst().getName() : "");
-
jdyInfo.setReqAcceptorName(jdyReqData.getReqAcceptorName() != null ? jdyReqData.getReqAcceptorName().getName() : "");
-
JsonNode secondaryDept = data.get("_widget_1638943383266");
-
if(secondaryDept != null && Func.equals(secondaryDept.getNodeType(), StringUtils.upperCase("String"))) jdyInfo.setSecondaryDept(secondaryDept.asText());
-
if(secondaryDept != null && Func.equals(secondaryDept.getNodeType(), StringUtils.upperCase("Object")) && secondaryDept.get("name") != null) jdyInfo.setSecondaryDept(secondaryDept.get("name").asText());
-
// if(data.get("_widget_1638943383266"))
-
// jdyInfo.setSecondaryDept(jdyReqData.getSecondaryDept() == null ? "" : jdyReqData.getSecondaryDept().getName());
-
jdyInfoService.remove(Wrappers.lambdaQuery(JdyInfo.class).in(JdyInfo::getRequireNo, jdyReqData.getRequireNo()));
-
jdyInfoService.save(jdyInfo);
-
}
-
-
public void sprintUpdate(JdySprintData sprintData) {
-
JdySprint jdySprint = BeanUtil.copy(sprintData, JdySprint.class);
-
if(sprintData.getReqSprintBudget() != null) jdySprint.setReqSprintBudget(sprintData.getReqSprintBudget().doubleValue());
-
jdySprint.setCreator(sprintData.getCreator() != null ? sprintData.getCreator().getName() : "");
-
jdySprint.setUpdater(sprintData.getUpdater() != null ? sprintData.getUpdater().getName() : "");
-
jdySprint.setFlowStatus(sprintData.getFlowStatus() != null ? Integer.valueOf(sprintData.getFlowStatus()) : null);
-
if (jdySprint.getFlowStatus() != null)
-
jdySprint.setFlowStatusName(CommonAttributeConstant.FLOWSTATUSMAP.getOrDefault(Integer.valueOf(jdySprint.getFlowStatus()), ""));
-
jdySprint.setCreateTime(jdySprint.getCreateTime());
-
jdySprint.setUpdateTime(jdySprint.getUpdateTime());
-
jdySprint.setStartTime(jdySprint.getStartTime());
-
jdySprint.setEndTime(jdySprint.getEndTime());
-
jdySprint.setReqAcceptorNumber(sprintData.getReqAcceptor() != null ?sprintData.getReqAcceptor().getNumber() : "");
-
jdySprint.setReqAcceptorName(sprintData.getReqAcceptor() != null ? sprintData.getReqAcceptor().getName() : "");
-
jdySprint.setReqProposerNumber(sprintData.getReqProposer() != null ?sprintData.getReqProposer().getNumber() : "");
-
jdySprint.setReqProposerName(sprintData.getReqProposer() != null ? sprintData.getReqProposer().getName() : "");;
-
// jdySprint.setEstName(sprintData.getEstGroup() != null ?sprintData.getEstGroup().getName() : "");
-
// jdySprint.setEstNumber(sprintData.getEstGroup() != null ? sprintData.getEstGroup().getNumber() : "");;
-
jdySprintService.remove(Wrappers.lambdaQuery(JdySprint.class).in(JdySprint::getSprintNo, jdySprint.getSprintNo()));
-
jdySprintService.save(jdySprint);
-
}
-
}
rabbitmqConfig
-
package cn.mn.ac.eff.props;
-
-
import lombok.Data;
-
import org.springframework.boot.context.properties.ConfigurationProperties;
-
import org.springframework.stereotype.Component;
-
-
-
-
"aliyun.rabbitmq")(
-
public class RabbitMqConfig {
-
private String host;
-
private int port;
-
private String username;
-
private String password;
-
private String vhost;
-
private String bindingKey;
-
}
配置中心使用的是nacos,nacos配置如下图【vhost以raabbitmq控制台的名称为准,控制台的vhost没有/,在代码配置时也不要加/】
-
aliyun:
-
-
rabbitmq:
-
host: xxx
-
port: 5672
-
username: xxx
-
password: xxx
-
vhost: xxx
-
bindingKey: xxx
rabbitmq控制台
生产者(自己为了测试消息,简单写了个测试的生产者)
-
package cn.mn.ac.eff.client;
-
-
import cn.mn.ac.core.tool.jackson.JsonUtil;
-
import cn.mn.ac.eff.model.*;
-
import com.rabbitmq.client.*;
-
import org.springframework.stereotype.Component;
-
-
import java.io.IOException;
-
import java.nio.charset.StandardCharsets;
-
import java.util.HashMap;
-
import java.util.UUID;
-
import java.util.concurrent.TimeoutException;
-
-
@Component
-
public class ProducerClient {
-
public static void main(String[] args) throws IOException, TimeoutException {
-
ConnectionFactory factory = new ConnectionFactory();
-
// 设置接入点,在消息队列RabbitMQ版控制台实例详情页面查看。
-
factory.setHost("xxx");
-
// 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
-
// 用户名,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
-
factory.setUsername("xxx");
-
// 密码,在消息队列RabbitMQ版控制台静态用户名密码页面查看。
-
factory.setPassword("xxx");
-
//设置为true,开启Connection自动恢复功能;设置为false,关闭Connection自动恢复功能。
-
factory.setAutomaticRecoveryEnabled(true);
-
factory.setNetworkRecoveryInterval(5000);
-
-
// 设置Vhost名称,请确保已在消息队列RabbitMQ版控制台上创建完成。
-
factory.setVirtualHost("xxx");
-
// 默认端口,非加密端口5672,加密端口5671。
-
factory.setPort(5672);
-
// 基于网络环境合理设置超时时间。
-
factory.setConnectionTimeout(30 * 1000);
-
factory.setHandshakeTimeout(30 * 1000);
-
factory.setShutdownTimeout(0);
-
Connection connection = factory.newConnection();
-
Channel channel = connection.createChannel();
-
JdySprintDataObj build = JdySprintDataObj.builder()._widget_1643105278264("数据看板五期\\n需要配合进行金额扣减验证")
-
._widget_1643106376512(42200d)
-
.formName("迭代交付版本")
-
._widget_1644216673523("ITXQSQ-20220121-01951")
-
._widget_1644307690286(JdyUserInfo.builder().name("董").number("12").build())
-
._widget_1644300502440("ITXQSQ-20220121-01951-V36").build();
-
// JdyDataObj jdyDataObj = JdyDataObj.builder()._widget_1600677792312(JdyUserInfo.builder().name("ceshi")
-
// .build())._id("61dd1a893f35270008903ce3")._widget_1611634839749(15.00)
-
// ._widget_1638943383266(JdyUserInfo.builder().name("测试部门").build())
-
// ._widget_1614667640528("ITXQSQ-20220111-01878")
-
// ._widget_1600677792368("测试")._widget_1600677792597(JdyUserInfo.builder().name("测试部门").build())
-
// ._widget_1611582498034("abc")
-
// ._widget_1600910979169("测试").build();
-
Long curr = System.currentTimeMillis();
-
JdyCallBack2 jdyCallBack = JdyCallBack2.builder().data(build).qName("MN001").timestamp(curr.toString()).op("update").build();
-
-
// channel.exchangeDeclare("amq.fanout", "fanout", true, false, false, null);
-
// channel.queueDeclare("dingtalk_callback_user_add_org", true, false, false, new HashMap<String, Object>());
-
// channel.queueBind("dingtalk_callback_user_add_org", "amq.fanout", "dev_adm");
-
// 开始发送消息。
-
// for (int i = 0; i < 2; i ) {
-
// ${ExchangeName}必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致。
-
// BindingKey根据业务需求填入相应的BindingKey。
-
UserInfo userInfo = UserInfo.builder().username("测试").payedDays(7).payedTime("2022-04-05").build();
-
String json = JsonUtil.toJson(jdyCallBack);
-
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
-
// String json ="{\"data\":{\"_id\":\"62382ffb0ce72c0008cccdd1\",\"_widget_1600677792312\":null,\"_widget_1600677792368\":\"0053194\",\"_widget_1600677792597\":null,\"_widget_1600677792709\":\"2022-04-27T16:00:00.000Z\",\"_widget_1600677792797\":\"1.业务员在智网拜访时通过数字化拍照进行图像识别,图匠将识别结果回传到红包项目后台,后台经过红包发放规则引擎去判断是否符合要求,后台展示每一条的明细和对账数据;\\n2.各大区因活动开始结束时间不同和活动门店范围不同需要单独设置,根据不同门店类型区分判断标准和红包发放标准;\\n3.根据竞品强势店活跃方案,产品在进店后需要进行货架陈列然后通过图像识别结果判定该门店是否陈列合格,再通过陈列结果发放业务红包\",\"_widget_1600677793187\":[],\"_widget_1600910979169\":\"\",\"_widget_1600910979205\":{\"_id\":\"5e8580ab4cfe7900061d99af\",\"name\":\"张亚光\",\"status\":1,\"type\":0,\"username\":\"0053194\"},\"_widget_1600910979243\":{\"_id\":\"5e857d598bd93b36bd9bea51\",\"dept_no\":22000496,\"name\":\"渠道数字化管理处\",\"type\":0},\"_widget_1600910979638\":null,\"_widget_1600910979650\":null,\"_widget_1600910980434\":\"\",\"_widget_1600910980492\":[],\"_widget_1600910983435\":null,\"_widget_1610430610089\":\"系统优化\",\"_widget_1610939163841\":\"经济收益上,通过货架陈列铺市增加,单包陈列率增加,来提升铺市质量和增加动销;管理收益上,可统计各大区、各门店类型货架陈列执行结果和红包发放结果,加强总部、大区对市场货架陈列掌控和管理。\",\"_widget_1610939164512\":null,\"_widget_1610950434587\":\"1.由于业务方货架陈列标准有变动和新品以及场景的增加,对于图匠回传的照片识别结果需要建立新的规则判定是否合格,是否发放业务红包;\\n2.当前根据图像识别结果发放业务红包不能灵活配置,竞品强势店活跃的活动需求现有图像识别货架陈列判定规则不能满足,当产品进店后业务方不能拿到进店后货架陈列的结果或是需要人工核查\",\"_widget_1610950434773\":\"\",\"_widget_1611206722187\":[],\"_widget_1611582498034\":\"\",\"_widget_1611634839749\":null,\"_widget_1611650581274\":\"可以提交\",\"_widget_1613960273377\":\"\",\"_widget_1614062405686\":[{\"_id\":\"601b0a1791c07e00076099fa\",\"name\":\"刘硕\",\"status\":1,\"type\":0,\"username\":\"0120082\"},{\"_id\":\"60f0897ad65457000857ef2c\",\"name\":\"张艺夕\",\"status\":1,\"type\":0,\"username\":\"0126673\"},{\"_id\":\"5e8580ab4cfe7900061df2cb\",\"name\":\"刘彩宾\",\"status\":1,\"type\":0,\"username\":\"0085167\"},{\"_id\":\"5e8580ab4cfe7900061d9964\",\"name\":\"贺治\",\"status\":1,\"type\":0,\"username\":\"0002940\"},{\"_id\":\"620096c86088660007573e19\",\"name\":\"邢昊\",\"status\":1,\"type\":0,\"username\":\"0134361\"}],\"_widget_1614062406187\":\"业务红包系统优化\",\"_widget_1614151972568\":{\"_id\":\"5e8580ab4cfe7900061d9260\",\"name\":\"董晓岩\",\"status\":1,\"type\":0,\"username\":\"0006013\"},\"_widget_1614224201555\":[],\"_widget_1614224201759\":[],\"_widget_1614356732731\":null,\"_widget_1614667640528\":\"ITXQSQ-20220321-02234\",\"_widget_1614820307553\":\"\",\"_widget_1614907086133\":{\"_id\":\"602de21536a77a0009fa657c\",\"name\":\"李琦\",\"status\":1,\"type\":0,\"username\":\"0119605\"},\"_widget_1617240667649\":\"605ddc92417c5800448b54ed\",\"_widget_1617240667668\":\"常温事业部\",\"_widget_1617240668051\":\"常温事业部-新建项目\",\"_widget_1617240668324\":\"607ce0add5268e00444281cf\",\"_widget_1617240668971\":\"\",\"_widget_1617240669063\":\"\",\"_widget_1623306810182\":\"\",\"_widget_1623306810198\":\"\",\"_widget_1623306810214\":\"\",\"_widget_1623306810230\":\"\",\"_widget_1623306810301\":\"\",\"_widget_1623307467250\":\"\",\"_widget_1635747812907\":null,\"_widget_1638943383266\":\"常温事业部\",\"_widget_1638943383431\":\"销售管理中心\",\"_widget_1641361074502\":[],\"_widget_1648023699615\":null,\"_widget_1648023699684\":\"\",\"_widget_1648025845140\":null,\"_widget_1650940105179\":\"\",\"_widget_1650951804448\":\"\",\"_widget_1651584421420\":\"\",\"_widget_1651584421494\":null,\"_widget_1651584421563\":null,\"_widget_1651584421682\":null,\"_widget_1651584421748\":null,\"_widget_1651651911041\":\"\",\"_widget_1651660760696\":[],\"_widget_1652329430385\":null,\"_widget_1652342510657\":[],\"_widget_1654596247398\":\"\",\"_widget_1654596247555\":null,\"appId\":\"5f6818e3a3a12300065e3550\",\"createTime\":\"2022-03-21T07:57:47.693Z\",\"creator\":{\"_id\":\"5e8580ab4cfe7900061d99af\",\"name\":\"张亚光\",\"status\":1,\"type\":0,\"username\":\"0053194\"},\"deleteTime\":null,\"deleter\":null,\"entryId\":\"603361d84a0ace0007fe39c8\",\"flowState\":0,\"formName\":\"IT需求申请流程\",\"updateTime\":\"2022-06-11T12:28:55.077Z\",\"updater\":{\"_id\":\"5e8580ab4cfe7900061d9964\",\"name\":\"贺治\",\"status\":1,\"type\":0,\"username\":\"0002940\"}},\"op\":\"data_update\",\"qName\":\"MN001\",\"nonce\":\"b9c215\",\"timestamp\":\"1654950535\"}";
-
channel.basicPublish("", "MN001", true, props,
-
(json).getBytes(StandardCharsets.UTF_8));
-
connection.close();
-
}
-
}
建议:
发送消息时一定要带上messageId,否则因为一些原因需要重新消费消息时会很难找到对应的消息
小白一枚,大神绕行
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggkjkc
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13