RocketMQ消息队列

2023/5/17 Install基础

# 常见概念和编程模型

汇总消息队列的常见概念、基础编程模型以及RocketMQ的特点和概念等,在学习前可以有个简要了解,学习后可以加深理解。

  • 常见概念
    • JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等等
    • JMS生产者(Message Producer):生产消息的服务
    • JMS消费者(Message Consumer):消费消息的服务
    • JMS消息:数据对象
    • JMS队列:存储待消费消息的区域
    • JMS主题:一种支持发送消息给多个订阅者的机制
    • JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)
  • 基础编程模型
    • MQ中需要用的一些类
    • ConnectionFactory :连接工厂,JMS 用它创建连接
    • Connection :JMS 客户端到JMS Provider 的连接
    • Session: 一个发送或接收消息的线程
    • Destination :消息的目的地;消息发送给谁.
    • MessageConsumer / MessageProducer: 消息消费者,消息生产者

# RocketMQ概述

  • 特点
    • 支持Broker和Consumer端消息过滤
    • 支持发布订阅模型,和点对点,
    • 支持拉pull和推push两种消息模式
    • 单一队列百万消息、亿级消息堆积
    • 支持单master节点,多master节点,多master多slave节点
    • 任意一点都是高可用,水平拓展,Producer、Consumer、队列都可以分布式
    • 消息失败重试机制、支持特定level的定时消息
    • 新版本底层采用Netty
    • 4.3.x支持分布式事务
    • 适合金融类业务,高可用性跟踪和审计功能。
  • 概念
    • Producer:消息生产者
    • Producer Group:消息生产者组,发送同类消息的一个消息生产组
    • Consumer:消费者
    • Consumer Group:消费同类消息的多个实例
    • Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务的消息
    • Topic:主题, 如订单类消息,queue是消息的物理管理单位,而topic是逻辑管理单位。一个topic下可以有多个queue, 默认自动创建是4个,手动创建是8个
    • Message:消息,每个message必须指定一个topic
    • Broker:MQ程序,接收生产的消息,提供给消费者消费的程序
    • Name Server:给生产和消费者提供路由信息,提供轻量级的服务发现、路由、元数据信息,可以多个部署,互相独立(比zookeeper更轻量)
    • Offset: 偏移量,可以理解为消息进度
    • commit log: 消息存储会写在Commit log文件里面

# RocketMQ源码方式安装

RocketMQ通过源码的方式安装,在centos7中依次安装JDK8、Maven、RocketMQ4以及控制台。

# 1、centos7安装JDK8

官网下载linux.tar.gz版本,上传至云服务器 /usr/local/software/ 目录下

点击下载 https://xk857.com/t/jdk-8u201-linux-x64.tar.gz

解压:tar -zxvf jdk-8u201-linux-x64.tar.gz
重命名:mv jdk1.8.0_201 jdk8
vim /etc/profile
加入
export JAVA_HOME=/usr/local/software/jdk8
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
使用 source /etc/profile   让配置立刻生效
1
2
3
4
5
6
7
8
9

# 2、Linux服务器下安装Maven

点击下载:https://xk857.com/t/apache-maven-3.6.0-bin.tar.gz,上传至 /usr/local/software/ 目录下

解压:tar -zxvf apache-maven-3.6.0-bin.tar.gz
重命名: mv apache-maven-3.6.0 maven
vim /etc/profile
export PATH=/usr/local/software/maven/bin:$PATH

立刻生效:source /etc/profile
查看版本: mvn -v
1
2
3
4
5
6
7

# 3、Linux服务器下源码部署RocketMQ4.X

进入官网,点击进行下载,Quick Start - Apache RocketMQ (opens new window)

第二步,点击下载

直接下载:https://xk857.com/t/rocketmq-all-4.8.0-source-release.zip

上传至 /usr/local/software/

# Liunx 解压安装
yum install unzip 
# 解压
unzip rocketmq-all-4.8.0-source-release.zip
# 重命名
mv rocketmq-all-4.8.0-source-release rocketmq


cd rocketmq
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0
1
2
3
4
5
6
7
8
9
10
11

启动

cd /usr/local/software/rocketmq/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/

启动 mqnamesrv
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.lo

启动 broker
1
2
3
4
5
6
7

可能会出现报错,下面列出解决方案。

内存不够怎么处理? 找到 runserver.sh 修改 JAVA_OPT

cd /usr/local/software/rocketmq/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/
vim bin/runserver.hs

进入后修改:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
1
2
3
4
5

broker内存不足解决办法

cd /usr/local/software/rocketmq/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/
vim bin/runbroker.sh

修改配置
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
1
2
3
4
5

验证

 > export NAMESRV_ADDR=localhost:9876
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ...
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt...
1
2
3

# 4、RocketMQ源码方式安装控制台

下载地址:https://github.com/apache/rocketmq-externals

码云同步访问仓库:https://gitee.com/monkeyz6/rocketmq-externals.git

将下载后的zip文件,上传到 /usr/local/software/ ,下面的命令文件名可能有所差异,注意辨别

cd /usr/local/software/
解压
unzip monkeyz6-rocketmq-externals-master.zip
cd rocketmq-externals
cd rocketmq-console
 
# 修改 applicatiuon.properties
cd /usr/local/software/rocketmq-externals/rocketmq-console/src/main/resources
vim application.properties
# 设置地址
rocketmq.config.namesrvAddr=127.0.0.1:9876

cd /usr/local/software/rocketmq-externals/rocketmq-console/
编译打包
mvn clean package -Dmaven.test.skip=true
cd target/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  • 进入target目录 ,启动 java -jar rocketmq-console-ng-2.0.0.jar
  • 守护进程方式启动 nohup java -jar rocketmq-console-ng-2.0.0.jar &

默认8080端口,访问:ip:8080

# 入门实战之发送消息

在SpringBoot框架中,使用RockerMQ发送消息,模拟真实业务开发场景,而不是官方demo,可以直接按照此文档整合到项目中去。

先开启防火墙端口,10909、8080、10911、9876 ,搭建 SpringBoot 项目,并加入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>
1
2
3
4
5

创建 com.example.rocketmq.jms.PayProducer

@Component
public class PayProducer {

    private String producerGroup = "pay_group";

    private String nameServerAddr = "1.15.143.246:9876";

    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(nameServerAddr);
        start();
    }

    public DefaultMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 对象在使用前必须调用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

创建接口 com.example.rocketmq.controller.PayController

import com.example.rocketmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RestController
@RequestMapping("/api/pay")
public class PayController {

    @Autowired
    private PayProducer payProducer;

    // 主题
    public static final String topic = "pay_test_topic";

    @RequestMapping("/pay_cb")
    public Object callback(String text) {

        Message message = new Message(topic, "tag1", ("Hello rocketmq = " + text).getBytes());
        try {
            // 发送消息
            SendResult sendResult = payProducer.getProducer().send(message,10000);
            System.out.println(sendResult);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new HashMap<>();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 发送消息常见错误

# 1、云服务器部署网卡问题

常见错误一 :org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:sendDefaultImpl call timeout

原因:阿里云/腾讯云存在多网卡,rocketmq都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。

比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP, 因此需要配置broker.conf 指定当前的公网ip, 然后重新启动broker

新增配置:conf/broker.conf (属性名称brokerIP1=broker所在的公网ip地址 )

新增这个配置:brokerIP1=120.76.62.13

启动命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

具体解决: org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

cd /usr/local/software/rocketmq/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf
vim broker.conf

增加配置
brokerIP1=公网IP

保存后关闭进程
[root@VM-4-16-centos conf]# jps
8578 jar
20692 Jps
26296 NamesrvStartup
26346 BrokerStartup
[root@VM-4-16-centos conf]# kill -9 26346
[root@VM-4-16-centos conf]# cd ..


[root@VM-4-16-centos rocketmq-4.8.0]# sh bin/mqbroker -n localhost:9876  -c ./conf/broker.conf
The broker[broker-a, 1.15.143.246:10911] boot success. serializeType=JSON and name server is localhost:9876
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 2、Broker禁止自动创建Topic

常见错误二:MQClientException: No route info of this topic, TopicTest1

原因:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 此Topic, 或者broker和Nameserver网络不通

解决: 通过 sh bin/mqbroker -m 查看配置

autoCreateTopicEnable=true 则自动创建topic

Centos7关闭防火墙 systemctl stop firewalld

# 3、控制台查看不了数据,提示连接 10909错误

原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909

解决:阿里云安全组需要增加一个端口 10909

# 4、其他错误:

https://blog.csdn.net/qq_14853889/article/details/81053145

https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E

https://www.jianshu.com/p/bfd6d849f156

https://blog.csdn.net/wangmx1993328/article/details/81588217

# 入门实战之接收消息

在SpringBoot框架中,使用RockerMQ发送消息后我们需要接收消息,同样模拟真实业务开发场景,可以直接在项目中使用。

将配置简单提取一下:com.example.rocketmq.jms.JmsConfig

public class JmsConfig {
    public static final String NAME_SERVER = "1.15.143.246:9876";
    public static final String TOPIC = "pay_test_topic";
}
1
2
3
4

消费消息:

@Component
public class PayConsumer {
    /**
     * 接收消息对象
     */
    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "pay_consumer_group";

    public PayConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe(JmsConfig.TOPIC, "*");

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                Message msg = msgs.get(0);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                String topic = msg.getTopic();
                String body = new String(msg.getBody(), "utf-8");
                String tags = msg.getTags();
                String keys = msg.getKeys();
                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 核心配置及发送状态

在经过RocketMQ入门实战后,我们学会了如何进行最基本的发送和接收消息,在RocketMQ中对消息生产者有着大量的配置,这里对常见配置进行了汇总,可在项目中按需选择使用。

生产者常见核心配置

  • compressMsgBodyOverHowmuch :消息超过默认字节4096后进行压缩
  • retryTimesWhenSendFailed : 失败重发次数
  • maxMessageSize : 最大消息配置,默认128k
  • topicQueueNums : 主题下面的队列数量,默认是4
  • autoCreateTopicEnable : 是否自动创建主题Topic, 开发建议为true,生产要为false
  • defaultTopicQueueNums : 自动创建服务器不存在的topic,默认创建的队列数
  • autoCreateSubscriptionGroup: 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭
  • brokerClusterName : 集群名称
  • brokerId : 0表示Master主节点 大于0表示从节点
  • brokerIP1 : Broker服务地址
  • brokerRole : broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE
  • deleteWhen : 每天执行删除过期文件的时间,默认每天凌晨4点
  • flushDiskType :刷盘策略, 默认为 ASYNC_FLUSH(异步刷盘), 另外是SYNC_FLUSH(同步刷盘)
  • listenPort : Broker监听的端口号
  • mapedFileSizeCommitLog : 单个conmmitlog文件大小,默认是1GB
  • mapedFileSizeConsumeQueue:ConsumeQueue每个文件默认存30W条,可以根据项目调整
  • storePathRootDir : 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store
  • storePathCommitLog:commitlog存储目录默认为${storePathRootDir}/commitlog
  • storePathIndex: 消息索引存储路径
  • syncFlushTimeout : 同步刷盘超时时间
  • diskMaxUsedSpaceRatio : 检测可用的磁盘空间大小,超过后会写入报错

# 1、消息常见发送状态

消息发送有同步和异步

Broker消息投递状态

  • FLUSH_DISK_TIMEOUT:没有在规定时间内完成刷盘 (刷盘策略需要为SYNC_FLUSH 才会出这个错误)
  • FLUSH_SLAVE_TIMEOUT:主从模式下,broker是SYNC_MASTER, 没有在规定时间内完成主从同步
  • SLAVE_NOT_AVAILABLE:从模式下,broker是SYNC_MASTER, 但是没有找到被配置成Slave的Broker
  • SEND_OK:发送成功,没有发生上面的三种问题

# 生产和消费消息重试及处理

由于MQ经常处于复杂的分布式系统中,考虑⽹络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的⼀个关键点。如果没有消息重试,就可能产⽣消息丢失的问题,可能对系统产⽣很⼤的影响。所以,秉承宁可多发消息,也不可丢失消息的原则。 MQ消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。

生产者Producer重试(异步和SendOneWay下配置无效)

  • 消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,
  • 如果网络情况比较差,或者跨集群则建改多几次
// com.example.rocketmq.jms.PayProducer
@Component
public class PayProducer {

    private String producerGroup = "pay_group";

    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);
        // 生产者投递消息重试次数
        producer.setRetryTimesWhenSendFailed(3);

        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        start();

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

com.example.rocketmq.controller.PayController

@RestController
@RequestMapping("/api/pay")
public class PayController {

    @Autowired
    private PayProducer payProducer;

    @RequestMapping("/pay_cb")
    public Object callback(String text) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
        // key是唯一的,一般是订单号等,这里仅做测试,生产者根据key进行消息重投,默认次数为2
        Message message = new Message(JmsConfig.TOPIC, "tag1","1234", ("Hello rocketmq = " + text).getBytes());
        // 发送消息
        SendResult sendResult = payProducer.getProducer().send(message, 10000);
        System.out.println(sendResult);
        return new HashMap<>();
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

消费端重试原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等问题。

注意: 重试间隔时间配置 ,默认每条消息最多重试 16 次 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

  • 超过重试次数人工补偿
  • 消费端去重
  • 一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
  • 消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息,

com.example.rocketmq.jms.PayConsumer

# 异步发送消息和回调

同步消息需要发送消息成功后才可以继续往下执行,然而在一些对响应时间敏感的业务场景,发送端不能容忍长时间地等待Broker的响应,那么此时就需要异步消息了。

核心代码

producer.send(message, new SendCallback(){
	onSuccess(){}
	onException(){}
})
1
2
3
4

发送异步消息

@RequestMapping("/async")
public String asyncMsg(String text) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
    // key是唯一的,一般是订单号等,这里仅做测试,生产者根据key进行消息重投,默认次数为2
    Message message = new Message(JmsConfig.TOPIC, "tag1","1234", ("Hello rocketmq = " + text).getBytes());
    payProducer.getProducer().send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("发送结果 %s, msg=%s", sendResult.getSendStatus(), sendResult.toString());
        }

        @Override
        public void onException(Throwable throwable) {
            throwable.printStackTrace();
            //补偿机制,根据业务情况查看是否需要进行重试
        }
    });
    return "";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# OneWay消息及多种场景对比

消息队列的发送方式一般有三种,同步发送、异步发送、无需要等待响应。下面详细列举了不同发送方式的主要特点及应用场景。

  • SYNC:同步发送,重要通知邮件、报名短信通知、营销短信系统等

  • ASYNC异步发送,对RT时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功后通知积分系统发放优惠券

  • ONEWAY无需要等待响应,主要是日志收集,适用于某些耗时非常短,但对可靠性要求并不高的场景, 也就是LogServer, 只负责发送消息,不等待服务器 (opens new window) (opens new window)回应且没有回调函数触发,即只发送请求不等待应答

发送方式 发送 TPS 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 可能丢失

# 1、发送OneWay消息

@RequestMapping("/async")
public String sendOnWayMsg(String text) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
    Message message = new Message(JmsConfig.TOPIC, "tag1","1234", ("Hello rocketmq = " + text).getBytes());
    payProducer.getProducer().sendOneway(message);
    return "";
}
1
2
3
4
5
6

# RocketMQ延迟消息

什么是延迟消息?Producer将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息,目前支持固定精度的消息。

  • 源码:rocketmq-store > MessageStoreConfig.java 属性 messageDelayLevel "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

  • 使用message.setDelayTimeLevel(xxx) //xxx是级别,1表示配置里面的第一个级别,2表示第二个级别

# 1、使用场景

  1. 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
  2. 消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略
@RequestMapping("/delay")
public String delayMsg(String text) throws RemotingException, InterruptedException, MQClientException {
    // key是唯一的,一般是订单号等,这里仅做测试,生产者根据key进行消息重投,默认次数为2
    Message message = new Message(JmsConfig.TOPIC, "tag1","1234", ("Hello rocketmq = " + text).getBytes());
    // 5s后被消费
    message.setDelayTimeLevel(2);

    payProducer.getProducer().send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("发送结果 %s, msg=%s", sendResult.getSendStatus(), sendResult.toString());
        }

        @Override
        public void onException(Throwable throwable) {
            throwable.printStackTrace();
            //补偿机制,根据业务情况查看是否需要进行重试
        }
    });
    return "";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21