RabbitMQ消息队列
# 主流的消息队列和选择问题
当下主流的消息队列主要有四种,其中ActiveMQ因为历史悠久,但其吞吐量不高的特点,让其只存在于老项目中,新开发的项目视情况而定选择另外三种。
- ActiveMQ:http://activemq.apache.org/
- Apache出品,历史悠久,支持多种语言的客户端和协议,支持多种语言Java, .NET, C++ 等
- 基于JMS Provider的实现
- 缺点:吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用
- Kafka:http://kafka.apache.org/
- 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者
- 类似MQ,功能较为简单,主要支持简单的MQ功能
- 缺点:不支持批量和广播消息,运维难度大,文档比较少, 需要掌握Scala
- RocketMQ:http://rocketmq.apache.org/
- 阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域
- 基于JMS Provider的实现
- 缺点:社区相对不活跃,更新比较快,纯java支持
- RabbitMQ:http://www.rabbitmq.com/
- 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错
- 缺点:使用Erlang开发,阅读和修改源码难度大
# 使用Docker安装RabbitMQ
使用源码的方式安装依赖多、且版本和维护相对复杂,需要erlang环境、版本也是有要求,因此不建议在本地或服务器使用源码的方式安装,因为使用RabbitMQ安装erlang环境完全没必要。推荐使用云服务器或虚拟机使用docker安装,学习时可用使用阿里云按量付费减少学习成本。
- 环境问题说明:Win7、Win8、Win10、Mac、虚拟机等等,可能存在兼容问题。务必使用CentOS 7 以上版本,64位系统!!!!
# 1、安装Docker
依次运行以下命令添加yum源
yum update
yum install epel-release -y
yum clean all
yum list
2
3
4
安装并运行Docker。
yum install docker-io -y
systemctl start docker
2
检查安装结果。
docker info
启动使用Docker
systemctl start docker #运行Docker守护进程
systemctl stop docker #停止Docker守护进程
systemctl restart docker #重启Docker守护进程
2
3
帮助文档:https://help.aliyun.com/document_detail/51853.html?spm=a2c4g.11186623.6.820.RaToNY
# 2、安装RabbitMQ
账号:admin 密码:password,部署后访问:ip:15672,注意把防火墙和阿里云/腾讯云安全组开放15672端口
#拉取镜像
docker pull rabbitmq:management
#第一台机子
docker run -d --hostname rabbit_host1 --name xk857_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
#第二台机器
docker run -d --hostname rabbit_host2 --name xk857_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
#第三台机器
docker run -d --hostname rabbit_host3 --name xk857_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
#介绍
-d 以守护进程方式在后台运行
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--name:指定容器名
--hostname:设定容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中
-e 参数
RABBITMQ_DEFAULT_USER 用户名
RABBITMQ_DEFAULT_PASS 密码
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
主要端口介绍:
4369 erlang 发现口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 间内部通信口
2
3
4
- Linux服务器检查防火墙是否关闭
- 云服务器检查网络安全组是否开放端口
CentOS 7 以上默认使用的是firewall作为防火墙
查看防火墙状态
firewall-cmd --state
停止firewall
systemctl stop firewalld.service
禁止firewall开机启动
systemctl disable firewalld.service
2
3
4
5
6
7
8
9
# SpringBoot整合RabbitMQ
Spring-AMQP是Spring框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的POJO的消息监听等。
- 提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发
- 总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter
- 我们使用 spring-boot-starter-amqp 进行开发
在SpringBoot项目中添加依赖:
<!--引入AMQP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2
3
4
5
# 1、web管控台添加虚拟主机
我们在同一个项目中,可能会出现开发、测试包括上线用的都是同一个消息队列,如果不进行隔离,很可能会出现开发环境不小心把线上环境的消息进行消费了,因此添加虚拟主机,达到一个消息隔离的效果。

# 2、SpringBoot配置RabbitMQ
在application.yml中进行配置
spring:
rabbitmq:
host: 1.5.1.26
port: 5672
virtual-host: /dev #这是我上面添加的虚拟主机
password: password
username: admin
2
3
4
5
6
7
# 3、创建配置类RabbitMQConfig
首先定义交换机和队列的名称,然后使用Bean注入的方式,注入交换机和队列对象,最后再绑定二者关系,注意导包
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "order_exchange";
/**
* 队列名称
*/
public static final String QUEUE = "order_queue";
@Bean
public Exchange orderExchange() {
// 创建交换机,durable代表持久化,使用Bean注入
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean
public Queue orderQueue() {
// 创建队列,使用Bean注入
return QueueBuilder.durable(QUEUE).build();
}
/**
* 交换机和队列绑定关系
* @param queue 上面注入的队列Bean,如果你的项目又多个,记得给Bean取名字
* @param exchange 上面注入的交换机Bean
*/
@Bean
public Binding orderBinding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 发送消息与接收消息
# 1、消息生产者发送消息
@SpringBootTest
class RabbitmqDemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void send() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦!!");
}
}
2
3
4
5
6
7
8
9
10
11
# 2、消息消费者监听消息
消息发送使用SpringBoot测试类进行发送,消息接收我们创建消息监听类,进行消息接收。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @description 消息消费者监听消息
*/
@Slf4j
@Component
@RabbitListener(queues = "order_queue") // 监听的队列名称
public class OrderMQListener {
/**
* RabbitHandler会自动匹配消息类型(消息自动确认)
* @param msg 我们发送的是String类型,这里用String进行接收,RabbitHandler会自动进行匹配
* @param message
* @throws IOException
*/
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
log.info("监听到消息:消息内容,msg={}",msg); // 监听到消息:消息内容,msg=新订单来啦!!
log.info("msgTag={}",msgTag); // msgTag=1
log.info("message={}",message.toString()); // message=(Body:'新订单来啦!!' MessageProperties [headers={}, ……
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# RabbitMQ消息可靠性投递
什么是消息的可靠性投递?即保证消息百分百发送到消息队列中去,消息发送端需要接受到mq服务端接受到消息的确认应答。除此之外还应有完善的消息补偿机制,发送失败的消息可以再感知并二次处理。 生产者到交换机通过confirmCallback,交换机到队列通过returnCallback
# 1、可靠性投递confirmCallback
confirmCallback是生产者到交换机,可以理解为确认消息是否发送成功。新版依赖可靠性投递默认是关闭的,使用以下方法开启
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated
2
3
4
此时的配置文件
spring:
rabbitmq:
host: 111.5.111.111
port: 5672
virtual-host: /dev
password: password
username: admin
# 开启消息二次确认,生产者到broker的交换机
publisher-confirm-type: correlated
2
3
4
5
6
7
8
9
# 编码实实现confirmCallback
@Slf4j
@SpringBootTest
class RabbitmqDemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testConfirmCallback() {
/*
correlationData:配置
ack:交换机是否收到消息,true是成功,false是失败
cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("confirm==== ack={}", ack);
log.info("confirm==== cause={}", cause);
if (ack) {
log.info("发送成功,{}", cause);
} else {
log.error("发送失败,{}", cause);
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦!!");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 2、可靠性投递returnCallback
returnCallback交换机到队列,消息从交换器发送到对应队列失败时触发。
- 第一步 开启returnCallback配置
spring.rabbitmq.publisher-returns=true #新版
- 第二步 修改交换机投递到队列失败的策略
#为true,则交换机处理消息到路由失败,则会返回给生产者
spring.rabbitmq.template.mandatory=true
2
完整配置参考
spring:
rabbitmq:
host: 111.5.111.111
port: 5672
virtual-host: /dev
password: password
username: admin
# 开启消息二次确认,生产者到broker的交换机
publisher-confirm-type: correlated
# 开启消息二次确认,交换机到队列的可靠性投递
publisher-returns: true
#为true,则交换机处理消息到路由失败,则会返回给生产者
template:
mandatory: true
2
3
4
5
6
7
8
9
10
11
12
13
14
# 编码实实现returnCallback
@Test
void testReturnCallback() {
// 为true,则交换机处理消息到路由失败,则会返回给生产者,开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
rabbitTemplate.setReturnsCallback(returnedMessage -> {
int code = returnedMessage.getReplyCode();
log.info("code={}", code);
log.info("returned={}", returnedMessage);
});
// 这个routingKey是不存在的,它找不到这个路由,所以会出现异常从而触发上面的回调方法
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xk857.order.new", "新订单来啦!!");
2
3
4
5
6
7
8
9
10
开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制
# RabbitMQ消息确认机制ACK
消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
- 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
- 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
- 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked
我们也可以将其改成手工确认模式
spring:
rabbitmq:
#开启手动确认消息,如果消息重新入队,进行重试
listener:
simple:
acknowledge-mode: manual
2
3
4
5
6
重写之前的Handler
@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message.toString());
System.out.println("body="+body);
//成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
//channel.basicAck(msgTag,false); // 正常返回ACK确认信息
//channel.basicNack(msgTag,false,true); // 告诉broker,消息拒绝确认,最后一个true代表返回队列,为False代表丢弃
}
2
3
4
5
6
7
8
9
10
11
# 当前配置下处理业务逻辑的思考
我们在RabbitMQConfig中将交换机名称以及队列名称都定死了,那么我们此时如果有多个业务都需要用到MQ,例如用户下单时需要超时关单,用户退款时需要回退优惠券,这些业务该怎样处理呢?
核心就是我们需要让监听器能够判断出当前是什么消息,已知交换机名称以及队列名称在当前配置下是无法更改的,那么我们需要一个“动态参数”来标明传递过来的是什么参数,我想到的有两种方案。
- 消息体携带信息,判断当前是超时关单的消息还是回退优惠券的消息,然后再来处理业务逻辑
- 交换机名称以及队列名称是不变的,但是“路由键”是变化的,我们可以根据路由键来判断消息类型
- 投递到新的队列中去,把不同的消息投递到不同的队列
# 1、消息体携带信息判断当前消息类型
创建消息类,用于发送和接收消息,注意要实现Serializable接口,不实现会报错哦。
@Data
public class CommonMQMsg implements Serializable {
private String type;
private String msg;
}
2
3
4
5
6
7
先来看发送消息,发送CommonMQMsg对象构成的数据。
@SpringBootTest
class RabbitmqDemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void send() {
CommonMQMsg msg = new CommonMQMsg();
msg.setType("order.refund");
msg.setMsg("新订单来啦!!");
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
再来看接收消息,使用 CommonMQMsg 接收消息,框架会自动帮我们转换消息格式。
@Component
@RabbitListener(queues = "order_queue") // 监听的队列名称
public class OrderMQListener {
@RabbitHandler
public void releaseCouponRecord(CommonMQMsg commonMQMsg, Message message) throws IOException {
if ("order.new".equals(commonMQMsg.getType())) {
log.info("处理新订单逻辑");
} else if ("order.refund".equals(commonMQMsg.getType())){
log.info("处理退款逻辑");
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# 2、根据routingKey判断当前消息类型
先来看消息的发送,order.new就是我们这里的路由Key
@Test
void send() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦!!");
}
2
3
4
再来看消息消费者的监听,是如何判断的
@Component
@RabbitListener(queues = "order_queue") // 监听的队列名称
public class OrderMQListener {
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
// 获取路由Key
final String routingKey = message.getMessageProperties().getReceivedRoutingKey();
if ("order.new".equals(routingKey)) {
// TODO 处理新订单逻辑
} else if ("order.refund".equals(routingKey)){
// TODO 处理退款逻辑
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
说明:这个路由键例如order.new,这些数据建议再项目中使用常量去提前定义,我这里为了讲解清晰就没写了。
# RabbitMQ消息多队列投递配置
当我们需要再SpringBoot项目中,需要投递消息到不同的队列中去,此时的MQ配置文件该如何编写呢?只改变队列名称和交换机名称可行吗?
# 1、项目中有多个队列/交换机时的配置
直接复制只改变交换机和队列名称会报错,因为交换机和队列我们之前已经注入过了,因此创建多个需要指定Bean的名称,先修改RabbitMQConfig
@Configuration
public class RabbitMQConfig {
/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "order_exchange";
/**
* 队列名称
*/
public static final String QUEUE = "order_queue";
@Bean(name = "orderExchange")
public Exchange orderExchange() {
// 创建交换机,durable代表持久化,使用Bean注入
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean(name = "orderQueue")
public Queue orderQueue() {
// 创建队列,使用Bean注入
return QueueBuilder.durable(QUEUE).build();
}
/**
* 交换机和队列绑定关系
* @param queue 上面注入的队列Bean,如果你的项目又多个,记得给Bean取名字
* @param exchange 上面注入的交换机Bean
*/
@Bean
public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange") Exchange exchange) {
// with是绑定的路由键,
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
然后创建RabbitMQ2Config
@Configuration
public class RabbitMQ2Config {
/**
* 交换机名称
*/
public static final String EXCHANGE_NAME = "order_refund_exchange";
/**
* 队列名称
*/
public static final String QUEUE = "order_refund_queue";
@Bean("orderExchange2")
public Exchange orderExchange2() {
// 创建交换机,durable代表持久化,使用Bean注入
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean("orderQueue2")
public Queue orderQueue2() {
return QueueBuilder.durable(QUEUE).build();
}
@Bean
public Binding orderBinding2(@Qualifier("orderQueue2") Queue queue,@Qualifier("orderExchange2") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 2、发送消息到不同队列
发送消息是如何发送的呢?我们需要将消息发送到指定的交换机即可。
@SpringBootTest
class RabbitmqDemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void send() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "我是订单队列的消息");
// 注意看不同的地方,RabbitMQ2Config.EXCHANGE_NAME
rabbitTemplate.convertAndSend(RabbitMQ2Config.EXCHANGE_NAME, "order.new", "我是退款的消息");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# 3、消息消费者分别监听不同队列的消息
监听 order_queue 队列中的消息
@Component
@RabbitListener(queues = "order_queue") // 监听的队列名称
public class OrderMQListener {
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
log.info("order_queue监听到消息:消息内容,msg={}", msg);
}
}
2
3
4
5
6
7
8
9
监听 order_refund_queue 队列的消息
@Component
@RabbitListener(queues = "order_refund_queue") // 监听的队列名称
public class OrderMQ2Listener {
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
log.info("order_refund_queue监听到消息:消息内容,msg={}", msg);
}
}
2
3
4
5
6
7
8
9
# RabbitMQ实现延时队列
什么是延迟队列?Producer将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。例如超时关单、优惠券回收等场景会用到。
RabbitMQ本身是不支持延迟队列的,我们可以通过死信队列的特性来实现延时队列。

# 1、创建延时队列配置类
说明:创建死信队列和创建延时队列没什么不同,当成正常的队列创建就行了。
@Configuration
public class RabbitMQTTLConfig {
/**
* 死信队列
*/
public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
/**
* 死信交换机
*/
public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
/**
* 进入死信队列的路由key
*/
public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
/**
* 创建死信交换机
*/
@Bean("lockMerchantDeadExchange")
public Exchange lockMerchantDeadExchange() {
return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false);
}
/**
* 创建死信队列
*/
@Bean("lockMerchantDeadQueue")
public Queue lockMerchantDeadQueue() {
return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
}
/**
* 绑定死信交换机和死信队列,和创建普通队列没什么区别
*/
@Bean
public Binding lockMerchantBinding(@Qualifier("lockMerchantDeadQueue") Queue queue, @Qualifier("lockMerchantDeadExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(LOCK_MERCHANT_ROUTING_KEY).noargs();
}
/**
* 普通队列,绑定的个死信交换机
*/
public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
/**
* 普通的topic交换机
*/
public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
/**
* 路由key
*/
public static final String NEW_MERCHANT_ROUTTING_KEY = "new_merchant_routing_key";
/**
* 创建普通交换机
*/
@Bean("newMerchantExchange")
public Exchange newMerchantExchange() {
return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false);
}
/**
* 创建普通队列
*/
@Bean("newMerchantQueue")
public Queue newMerchantQueue() {
Map<String, Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY);
//过期时间,单位毫秒,10秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
}
/**
* 绑定交换机和队列
*/
@Bean
public Binding newMerchantBinding(@Qualifier("newMerchantQueue") Queue queue, @Qualifier("newMerchantExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(NEW_MERCHANT_ROUTTING_KEY).noargs();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# 2、延时队列-发送消息
思考一下,发送消息是发给谁的?发送消息是发给普通队列的,普通队列的消息过期会进入死信队列,然后我们监听死信队列的消息,但是发送消息是发给普通队列的。这次发消息不使用测试类发送了,换成发送
@RestController("/")
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/ttl/{msg}")
public boolean sendTTLMsg(@PathVariable String msg) {
rabbitTemplate.convertAndSend(RabbitMQTTLConfig.NEW_MERCHANT_EXCHANGE, RabbitMQTTLConfig.NEW_MERCHANT_ROUTTING_KEY,
"超时关单信息,10秒后接收订单信息," + msg);
return true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 3、延时队列-接收消息
消息消费者写法上没有不同,注意监听的是死信队列即可
@Component
@RabbitListener(queues = "lock_merchant_dead_queue") // 监听的队列名称是死信队列的名称
public class TTLMQListener {
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
log.info("监听到延迟消息:消息内容,msg={}", msg);
}
}
2
3
4
5
6
7
8
9