RabbitMQ消息队列

2023/5/9 Install基础

# 主流的消息队列和选择问题

当下主流的消息队列主要有四种,其中ActiveMQ因为历史悠久,但其吞吐量不高的特点,让其只存在于老项目中,新开发的项目视情况而定选择另外三种。

  • ActiveMQ:http://activemq.apache.org/
    • Apache出品,历史悠久,支持多种语言的客户端和协议,支持多种语言Java, .NET, C++ 等
    • 基于JMS Provider的实现
    • 缺点:吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用
  • Kafka:http://kafka.apache.org/
    • 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者
    • 类似MQ,功能较为简单,主要支持简单的MQ功能
    • 缺点:不支持批量和广播消息,运维难度大,文档比较少, 需要掌握Scala
  • RocketMQ:http://rocketmq.apache.org/
    • 阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域
    • 基于JMS Provider的实现
    • 缺点:社区相对不活跃,更新比较快,纯java支持
  • RabbitMQ:http://www.rabbitmq.com/
    • 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错
    • 缺点:使用Erlang开发,阅读和修改源码难度大

# 使用Docker安装RabbitMQ

使用源码的方式安装依赖多、且版本和维护相对复杂,需要erlang环境、版本也是有要求,因此不建议在本地或服务器使用源码的方式安装,因为使用RabbitMQ安装erlang环境完全没必要。推荐使用云服务器或虚拟机使用docker安装,学习时可用使用阿里云按量付费减少学习成本。

  • 环境问题说明:Win7、Win8、Win10、Mac、虚拟机等等,可能存在兼容问题。务必使用CentOS 7 以上版本,64位系统!!!!

# 1、安装Docker

依次运行以下命令添加yum源

yum update
yum install epel-release -y
yum clean all
yum list
1
2
3
4

安装并运行Docker。

yum install docker-io -y
systemctl start docker
1
2

检查安装结果。

docker info
1

启动使用Docker

systemctl start docker     #运行Docker守护进程
systemctl stop docker      #停止Docker守护进程
systemctl restart docker   #重启Docker守护进程
1
2
3

帮助文档:https://help.aliyun.com/document_detail/51853.html?spm=a2c4g.11186623.6.820.RaToNY

# 2、安装RabbitMQ

账号:admin 密码:password,部署后访问:ip:15672,注意把防火墙和阿里云/腾讯云安全组开放15672端口

#拉取镜像
docker pull rabbitmq:management

#第一台机子
docker run -d --hostname rabbit_host1 --name xk857_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
#第二台机器
docker run -d --hostname rabbit_host2 --name xk857_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
#第三台机器
docker run -d --hostname rabbit_host3 --name xk857_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management


#介绍
-d 以守护进程方式在后台运行
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--name:指定容器名
--hostname:设定容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中

-e 参数
  RABBITMQ_DEFAULT_USER 用户名
  RABBITMQ_DEFAULT_PASS 密码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

主要端口介绍:

4369 erlang 发现口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 间内部通信口
1
2
3
4
  • Linux服务器检查防火墙是否关闭
  • 云服务器检查网络安全组是否开放端口
CentOS 7 以上默认使用的是firewall作为防火墙
查看防火墙状态
firewall-cmd --state

停止firewall
systemctl stop firewalld.service

禁止firewall开机启动
systemctl disable firewalld.service
1
2
3
4
5
6
7
8
9

# SpringBoot整合RabbitMQ

Spring-AMQP是Spring框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的POJO的消息监听等。

  • 提供不依赖于任何特定的AMQP代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除AMQP,因为它可以只针对抽象层来开发
  • 总之就是提高我们的框架整合消息队列的效率,SpringBoot为更方便开发RabbitMQ推出了starter
  • 我们使用 spring-boot-starter-amqp 进行开发

在SpringBoot项目中添加依赖:

<!--引入AMQP-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5

# 1、web管控台添加虚拟主机

我们在同一个项目中,可能会出现开发、测试包括上线用的都是同一个消息队列,如果不进行隔离,很可能会出现开发环境不小心把线上环境的消息进行消费了,因此添加虚拟主机,达到一个消息隔离的效果。

# 2、SpringBoot配置RabbitMQ

在application.yml中进行配置

spring:
  rabbitmq:
    host: 1.5.1.26
    port: 5672
    virtual-host: /dev #这是我上面添加的虚拟主机
    password: password
    username: admin
1
2
3
4
5
6
7

# 3、创建配置类RabbitMQConfig

首先定义交换机和队列的名称,然后使用Bean注入的方式,注入交换机和队列对象,最后再绑定二者关系,注意导包

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    /**
     * 交换机名称
     */
    public static final String EXCHANGE_NAME = "order_exchange";

    /**
     * 队列名称
     */
    public static final String QUEUE = "order_queue";


    @Bean
    public Exchange orderExchange() {
        // 创建交换机,durable代表持久化,使用Bean注入
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public Queue orderQueue() {
        // 创建队列,使用Bean注入
        return QueueBuilder.durable(QUEUE).build();
    }

    /**
     * 交换机和队列绑定关系
     * @param queue 上面注入的队列Bean,如果你的项目又多个,记得给Bean取名字
     * @param exchange 上面注入的交换机Bean
     */
    @Bean
    public Binding orderBinding(Queue queue, Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 发送消息与接收消息

# 1、消息生产者发送消息

@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void send() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦!!");
    }
}
1
2
3
4
5
6
7
8
9
10
11

# 2、消息消费者监听消息

消息发送使用SpringBoot测试类进行发送,消息接收我们创建消息监听类,进行消息接收。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
 * @description 消息消费者监听消息
 */
@Slf4j
@Component
@RabbitListener(queues = "order_queue") // 监听的队列名称
public class OrderMQListener {

    /**
     * RabbitHandler会自动匹配消息类型(消息自动确认)
     * @param msg 我们发送的是String类型,这里用String进行接收,RabbitHandler会自动进行匹配
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {
        long msgTag = message.getMessageProperties().getDeliveryTag();
        log.info("监听到消息:消息内容,msg={}",msg); // 监听到消息:消息内容,msg=新订单来啦!!
        log.info("msgTag={}",msgTag); // msgTag=1
        log.info("message={}",message.toString()); // message=(Body:'新订单来啦!!' MessageProperties [headers={}, ……
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# RabbitMQ消息可靠性投递

什么是消息的可靠性投递?即保证消息百分百发送到消息队列中去,消息发送端需要接受到mq服务端接受到消息的确认应答。除此之外还应有完善的消息补偿机制,发送失败的消息可以再感知并二次处理。 生产者到交换机通过confirmCallback,交换机到队列通过returnCallback

# 1、可靠性投递confirmCallback

confirmCallback是生产者到交换机,可以理解为确认消息是否发送成功。新版依赖可靠性投递默认是关闭的,使用以下方法开启

#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated
1
2
3
4

此时的配置文件

spring:
  rabbitmq:
    host: 111.5.111.111
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    # 开启消息二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated
1
2
3
4
5
6
7
8
9

# 编码实实现confirmCallback

@Slf4j
@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testConfirmCallback() {
        /*
          correlationData:配置
          ack:交换机是否收到消息,true是成功,false是失败
          cause:失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("confirm==== ack={}", ack);
            log.info("confirm==== cause={}", cause);
            if (ack) {
                log.info("发送成功,{}", cause);
            } else {
                log.error("发送失败,{}", cause);
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦!!");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 2、可靠性投递returnCallback

returnCallback交换机到队列,消息从交换器发送到对应队列失败时触发。

  • 第一步 开启returnCallback配置
spring.rabbitmq.publisher-returns=true #新版
1
  • 第二步 修改交换机投递到队列失败的策略
#为true,则交换机处理消息到路由失败,则会返回给生产者
spring.rabbitmq.template.mandatory=true
1
2

完整配置参考

spring:
  rabbitmq:
    host: 111.5.111.111
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    # 开启消息二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated
    # 开启消息二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 编码实实现returnCallback

@Test
void testReturnCallback() {
    // 为true,则交换机处理消息到路由失败,则会返回给生产者,开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
    rabbitTemplate.setReturnsCallback(returnedMessage -> {
        int code = returnedMessage.getReplyCode();
        log.info("code={}", code);
        log.info("returned={}", returnedMessage);
    });
    // 这个routingKey是不存在的,它找不到这个路由,所以会出现异常从而触发上面的回调方法
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xk857.order.new", "新订单来啦!!");
1
2
3
4
5
6
7
8
9
10

开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制

# RabbitMQ消息确认机制ACK

消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除

  • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
  • 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  • 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked

我们也可以将其改成手工确认模式

spring:
  rabbitmq:
    #开启手动确认消息,如果消息重新入队,进行重试
    listener:
      simple:
        acknowledge-mode: manual
1
2
3
4
5
6

重写之前的Handler

@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
    long msgTag = message.getMessageProperties().getDeliveryTag();
    System.out.println("msgTag="+msgTag);
    System.out.println("message="+message.toString());
    System.out.println("body="+body);

    //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
    //channel.basicAck(msgTag,false); // 正常返回ACK确认信息
    //channel.basicNack(msgTag,false,true); // 告诉broker,消息拒绝确认,最后一个true代表返回队列,为False代表丢弃
}
1
2
3
4
5
6
7
8
9
10
11

# 当前配置下处理业务逻辑的思考

我们在RabbitMQConfig中将交换机名称以及队列名称都定死了,那么我们此时如果有多个业务都需要用到MQ,例如用户下单时需要超时关单,用户退款时需要回退优惠券,这些业务该怎样处理呢?

核心就是我们需要让监听器能够判断出当前是什么消息,已知交换机名称以及队列名称在当前配置下是无法更改的,那么我们需要一个“动态参数”来标明传递过来的是什么参数,我想到的有两种方案。

  1. 消息体携带信息,判断当前是超时关单的消息还是回退优惠券的消息,然后再来处理业务逻辑
  2. 交换机名称以及队列名称是不变的,但是“路由键”是变化的,我们可以根据路由键来判断消息类型
  3. 投递到新的队列中去,把不同的消息投递到不同的队列

# 1、消息体携带信息判断当前消息类型

创建消息类,用于发送和接收消息,注意要实现Serializable接口,不实现会报错哦。

@Data
public class CommonMQMsg implements Serializable {

    private String type;

    private String msg;
}
1
2
3
4
5
6
7

先来看发送消息,发送CommonMQMsg对象构成的数据。

@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void send() {
        CommonMQMsg msg = new CommonMQMsg();
        msg.setType("order.refund");
        msg.setMsg("新订单来啦!!");

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", msg);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

再来看接收消息,使用 CommonMQMsg 接收消息,框架会自动帮我们转换消息格式。

@Component
@RabbitListener(queues = "order_queue") // 监听的队列名称
public class OrderMQListener {

    @RabbitHandler
    public void releaseCouponRecord(CommonMQMsg commonMQMsg, Message message) throws IOException {
        if ("order.new".equals(commonMQMsg.getType())) {
            log.info("处理新订单逻辑");
        } else if ("order.refund".equals(commonMQMsg.getType())){
            log.info("处理退款逻辑");
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 2、根据routingKey判断当前消息类型

先来看消息的发送,order.new就是我们这里的路由Key

@Test
void send() {
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦!!");
}
1
2
3
4

再来看消息消费者的监听,是如何判断的

@Component
@RabbitListener(queues = "order_queue") // 监听的队列名称
public class OrderMQListener {

    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {
        // 获取路由Key
        final String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        if ("order.new".equals(routingKey)) {
            // TODO 处理新订单逻辑
        } else if ("order.refund".equals(routingKey)){
            // TODO 处理退款逻辑
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

说明:这个路由键例如order.new,这些数据建议再项目中使用常量去提前定义,我这里为了讲解清晰就没写了。

# RabbitMQ消息多队列投递配置

当我们需要再SpringBoot项目中,需要投递消息到不同的队列中去,此时的MQ配置文件该如何编写呢?只改变队列名称和交换机名称可行吗?

# 1、项目中有多个队列/交换机时的配置

直接复制只改变交换机和队列名称会报错,因为交换机和队列我们之前已经注入过了,因此创建多个需要指定Bean的名称,先修改RabbitMQConfig

@Configuration
public class RabbitMQConfig {

    /**
     * 交换机名称
     */
    public static final String EXCHANGE_NAME = "order_exchange";

    /**
     * 队列名称
     */
    public static final String QUEUE = "order_queue";


    @Bean(name = "orderExchange")
    public Exchange orderExchange() {
        // 创建交换机,durable代表持久化,使用Bean注入
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    @Bean(name = "orderQueue")
    public Queue orderQueue() {
        // 创建队列,使用Bean注入
        return QueueBuilder.durable(QUEUE).build();
    }

    /**
     * 交换机和队列绑定关系
     * @param queue 上面注入的队列Bean,如果你的项目又多个,记得给Bean取名字
     * @param exchange 上面注入的交换机Bean
     */
    @Bean
    public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange") Exchange exchange) {
        // with是绑定的路由键,
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

然后创建RabbitMQ2Config

@Configuration
public class RabbitMQ2Config {

    /**
     * 交换机名称
     */
    public static final String EXCHANGE_NAME = "order_refund_exchange";

    /**
     * 队列名称
     */
    public static final String QUEUE = "order_refund_queue";


    @Bean("orderExchange2")
    public Exchange orderExchange2() {
        // 创建交换机,durable代表持久化,使用Bean注入
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    @Bean("orderQueue2")
    public Queue orderQueue2() {
        return QueueBuilder.durable(QUEUE).build();
    }

    @Bean
    public Binding orderBinding2(@Qualifier("orderQueue2") Queue queue,@Qualifier("orderExchange2") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 2、发送消息到不同队列

发送消息是如何发送的呢?我们需要将消息发送到指定的交换机即可。

@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void send() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "我是订单队列的消息");
        // 注意看不同的地方,RabbitMQ2Config.EXCHANGE_NAME
        rabbitTemplate.convertAndSend(RabbitMQ2Config.EXCHANGE_NAME, "order.new", "我是退款的消息");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 3、消息消费者分别监听不同队列的消息

监听 order_queue 队列中的消息

@Component
@RabbitListener(queues = "order_queue") // 监听的队列名称
public class OrderMQListener {

    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {
        log.info("order_queue监听到消息:消息内容,msg={}", msg);
    }
}
1
2
3
4
5
6
7
8
9

监听 order_refund_queue 队列的消息

@Component
@RabbitListener(queues = "order_refund_queue") // 监听的队列名称
public class OrderMQ2Listener {

    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {
        log.info("order_refund_queue监听到消息:消息内容,msg={}", msg);
    }
}
1
2
3
4
5
6
7
8
9

# RabbitMQ实现延时队列

什么是延迟队列?Producer将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。例如超时关单、优惠券回收等场景会用到。

RabbitMQ本身是不支持延迟队列的,我们可以通过死信队列的特性来实现延时队列。

# 1、创建延时队列配置类

说明:创建死信队列和创建延时队列没什么不同,当成正常的队列创建就行了。

@Configuration
public class RabbitMQTTLConfig {

    /**
     * 死信队列
     */
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";

    /**
     * 死信交换机
     */
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";

    /**
     * 进入死信队列的路由key
     */
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";


    /**
     * 创建死信交换机
     */
    @Bean("lockMerchantDeadExchange")
    public Exchange lockMerchantDeadExchange() {
        return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false);
    }

    /**
     * 创建死信队列
     */
    @Bean("lockMerchantDeadQueue")
    public Queue lockMerchantDeadQueue() {
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }

    /**
     * 绑定死信交换机和死信队列,和创建普通队列没什么区别
     */
    @Bean
    public Binding lockMerchantBinding(@Qualifier("lockMerchantDeadQueue") Queue queue, @Qualifier("lockMerchantDeadExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(LOCK_MERCHANT_ROUTING_KEY).noargs();
    }
    
    
    

    /**
     * 普通队列,绑定的个死信交换机
     */
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";

    /**
     * 普通的topic交换机
     */
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";

    /**
     * 路由key
     */
    public static final String NEW_MERCHANT_ROUTTING_KEY = "new_merchant_routing_key";


    /**
     * 创建普通交换机
     */
    @Bean("newMerchantExchange")
    public Exchange newMerchantExchange() {
        return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false);
    }

    /**
     * 创建普通队列
     */
    @Bean("newMerchantQueue")
    public Queue newMerchantQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY);
        //过期时间,单位毫秒,10秒
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }

    /**
     * 绑定交换机和队列
     */
    @Bean
    public Binding newMerchantBinding(@Qualifier("newMerchantQueue") Queue queue, @Qualifier("newMerchantExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(NEW_MERCHANT_ROUTTING_KEY).noargs();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

# 2、延时队列-发送消息

思考一下,发送消息是发给谁的?发送消息是发给普通队列的,普通队列的消息过期会进入死信队列,然后我们监听死信队列的消息,但是发送消息是发给普通队列的。这次发消息不使用测试类发送了,换成发送

@RestController("/")
public class TestController {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/ttl/{msg}")
    public boolean sendTTLMsg(@PathVariable String msg) {
        rabbitTemplate.convertAndSend(RabbitMQTTLConfig.NEW_MERCHANT_EXCHANGE, RabbitMQTTLConfig.NEW_MERCHANT_ROUTTING_KEY, 
                                      "超时关单信息,10秒后接收订单信息," + msg);
        return true;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 3、延时队列-接收消息

消息消费者写法上没有不同,注意监听的是死信队列即可

@Component
@RabbitListener(queues = "lock_merchant_dead_queue") // 监听的队列名称是死信队列的名称
public class TTLMQListener {
    
    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {
        log.info("监听到延迟消息:消息内容,msg={}", msg);
    }
}
1
2
3
4
5
6
7
8
9