RabbitMQ教程

推荐:

https://www.rabbitmq.com

RabbitMQ安装

https://www.rabbitmq.com/docs

RabbitMQ是基于Erlang语言开发的,所以安装RabbitMQ之前需要安装Erlang语言环境。需要注意下的是RabbitMQ与Erlang语言之间是有版本对应关系的。https://www.rabbitmq.com/docs/which-erlang

在基于 RPM 的 Linux上安装

注意:不同 Centos 版本的 rpm 包不同:

  • 下载对应 centos 版本的 erlang 安装包
  • 下载对应 erlang 版本的 rabbitmq 安装包
1
2
3
4
5
6
7
8
9
10
11
12
13
dnf install esl-erlang_26.2.5_1_centos_8_x86_64.rpm
dnf install rabbitmq-server-3.13.7-1.el8.noarch.rpm

dnf list installed | grep rabbit
dnf list installed | grep erlang
dnf remove rabbitmq-server.noarch
dnf remove esl-erlang.x86_64

# 单元文件位置 /usr/lib/systemd/system/rabbitmq-server.service
systemctl status rabbitmq-server
systemctl start rabbitmq-server
systemctl stop rabbitmq-server
systemctl enable rabbitmq-server

通用二进制构建

源码安装erlang

下载地址:https://www.erlang.org/downloads

所用版本:26.2.5.5

安装指南:https://www.erlang.org/docs/26/installation_guide/install

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 安装前依赖解决
yum install \
m4 \
vim \
wget \
gcc \
gcc-c++ \
make \
cmake \
automake \
autoconf \
readline \
kernel-devel \
ncurses-devel \
openssl-devel \
readline-devel \
-y

dnf remove --oldinstallonly # 一键删除旧内核
dnf autoremove # 从系统中删除所有最初作为用户安装软件包的依赖项安装的软件包,但此类包不再需要它们。

cd /opt/local/
tar -zxvf otp_src_26.2.5.5.tar.gz
cd /opt/local/otp_src_26.2.5.5/
export ERL_TOP=`pwd` # 设置指向当前路径环境变量,仅当前shell有效
./configure --prefix=/opt/local/erlang --without-javac # --without-javac 具体含义见erlang官方安装指南
make && make install
rm -rf /opt/local/otp_src_26.2.5.5

添加环境变量(也可以创建 erlang 的软链接,见下文 开机自启systemctl ):

1
2
3
echo "export PATH=/opt/local/erlang/bin:$PATH" >> /etc/profile
echo "RABBITMQ_HOME=/opt/local/rabbitmq" >> /etc/profile
source /etc/profile

测试

1
2
$ erl -version
Erlang (SMP,ASYNC_THREADS) (BEAM) emulator version 14.2.5.4

源码安装RabbitMQ

下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.7

所用版本:rabbitmq-server-generic-unix-3.13.7.tar.xz

1
2
3
4
5
cd /opt/local/

xz -d rabbitmq-server-generic-unix-3.13.7.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.13.7.tar
mv /opt/local/rabbitmq_server-3.13.7 /opt/local/rabbitmq

添加环境变量

1
2
echo "export PATH=/opt/local/rabbitmq/sbin:$PATH" >> /etc/profile
source /etc/profile

示例配置文件

RabbitMQ 配置文件位于 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf,是配置节点的主要方式。

可以使用环境变量来控制某些设置。建议使用 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf 文件。

安装后这两个文件都不存在,因此必须先创建它们。

请参阅 RabbitMQ 配置指南 以了解更多信息。

示例配置文件:https://www.rabbitmq.com/docs/configure#example-config

修改配置以支持guest远程登录:

1
echo "loopback_users = none" >> /opt/local/rabbitmq/etc/rabbitmq/rabbitmq.conf

启动、停止

1
2
/opt/local/rabbitmq/sbin/rabbitmq-server -detached # 后台运行
/opt/local/rabbitmq/sbin/rabbitmqctl shutdown

开启 web 管理界面

1
2
3
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl change_password guest 123456789 # 更改guest用户密码
# RabbitMQ默认只有一个guest帐号,guest帐号只能在RabbitMQ安装服务器上登录,解决办法是添加一个新的帐号,或修改配置

访问地址:http://192.168.0.7:15672

开机自启service–似乎不支持

vim /etc/init.d/rabbitmq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/bin/bash

source /etc/profile

case "$1" in
'start' )
echo "Starting RabbitMQ ..."
rabbitmq-server -detached
;;
'stop' )
echo "Stopping RabbitMQ ..."
rabbitmqctl shutdown
;;
'status')
echo "Status RabbitMQ ..."
rabbitmqctl status
;;
*)
echo "Usage: $0 {start|stop|status}" >&2
;;
esac

echo "Usage: $0 {start|stop}" >&2 # $0:shell脚本的名字;$1:向shell脚本传的第一个参数;>&2:将“错误”输出到STDERR(标准错误)

启动与停止

1
2
3
4
5
6
7
8
9
10
11
chmod 777 /etc/init.d/rabbitmq

service rabbitmq start # 启动失败,为什么???????????????
service rabbitmq status
service rabbitmq stop
# 等价于
/etc/init.d/rabbitmq start # 启动成功
/etc/init.d/rabbitmq status
/etc/init.d/rabbitmq stop

chkconfig --level 2345 rabbitmq on # 自启动设置失败,提示 【服务 rabbitmq 不支持 chkconfig】

开机自启systemctl–使用中

vim /usr/lib/systemd/system/rabbitmq.service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# rabbitmq.service
[Unit]
Description=rabbitmq
Documentation= https://zh.wikipedia.org/wiki/Systemd
After=syslog.target network.target network-online.target

[Service]
#Type= notify
User=root
Group=root
#UMask=0027
#NotifyAccess=all
TimeoutStartSec=600

# To override LimitNOFILE, create the following file:
#
# /etc/systemd/system/rabbitmq-server.service.d/limits.conf
#
# with the following content:
#
# [Service]
# LimitNOFILE=65536
LimitNOFILE=65536

# Note: systemd on CentOS 7 complains about in-line comments,
# so only append them here
#
# Restart:
# The following setting will automatically restart RabbitMQ
# in the event of a failure. systemd service restarts are not a
# replacement for service monitoring. Please see
# https://www.rabbitmq.com/monitoring.html
Restart=on-failure
RestartSec=10s
WorkingDirectory=/opt/local/rabbitmq
# ExecStart 中不能使用 "rabbitmq-server -detached" 方式启动
ExecStart=/opt/local/rabbitmq/sbin/rabbitmq-server
ExecStop=/opt/local/rabbitmq/sbin/rabbitmqctl shutdown
SuccessExitStatus=69
#PrivateTmp=true

[Install]
WantedBy=multi-user.target

重新加载配置文件:systemctl daemon-reload

创建 erlang 的软链接(因为 /etc/profile 中配置的环境变量不会生效),如果使用rpm包安装的 erlang 请跳过此步骤

1
2
ln -s /opt/local/erlang/bin/erl /usr/local/bin/erl
ln -s /opt/local/erlang/bin/escript /usr/local/bin/escript

/usr/local/bin 和 /usr/bin 区别

首先注意 usr 指 Unix System Resource,而不是User。

/usr/bin 下面的都是系统预装的可执行程序,会随着系统升级而改变。

/usr/local/bin 目录是给用户放置自己的可执行程序的地方,推荐放在这里,不会被系统升级而覆盖同名文件。

如果两个目录下有相同的可执行程序,由PATH环境变量决定优先级,比如一台服务器的PATH变量为:

1
2
echo $PATH
/root/.local/bin:/root/bin:/usr/local/bin:/usr/local/sbin:/usr/bin:/usr/sbin

这里 /usr/local/bin 优先于 /usr/bin,一般都是如此。

启动与停止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
systemctl start rabbitmq
systemctl stop rabbitmq

systemctl enable rabbitmq
systemctl disable rabbitmq

systemctl status rabbitmq
journalctl -xeu rabbitmq.service # journalctl

# 报以下错误,是因为 /etc/init.d/ 目录下存在相同的服务名,删除即可 rm -rf /etc/init.d/rabbitmq
$ systemctl disable rabbitmq
Synchronizing state of rabbitmq.service with SysV service script with /usr/lib/systemd/systemd-sysv-install.
Executing: /usr/lib/systemd/systemd-sysv-install disable rabbitmq
服务 rabbitmq 不支持 chkconfig

添加用户

可以在管理界面添加或使用如下命令:

1
2
3
4
5
6
7
[root@localhost /]# rabbitmqctl add_user admin 123456789
Adding user "admin" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
[root@localhost /]# rabbitmqctl set_permissions -p / admin "." "." ".*"
Setting permissions for user "admin" in vhost "/" ...
[root@localhost /]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

RabbitMQ基础使用

登录控制台后上方就能看到 RabbitMQ 的主要功能。

Overview:概述,主要展示 RabbitMQ 服务的一些整体运行情况。

Conections、Channels、Exchanges、Queues and Streams:RabbitMQ 的核心功能。

Admin:一些管理功能,例如:用户及访问权限管理、虚拟机管理(virtual host)等。

image-20241206180703465

在RabbitMQ中,不同虚拟机之间的资源是完全隔离的。在资源充足的情况下,每个虚拟机可以当成一个独立的RabbitMQ服务来使用。

image-20241206181353060

接下来我们来上手使用一下RabbitMQ的核心功能。

RabbitMQ高级教程

RabbitMQ的消息流转模型

image-20241206190446895

四大核心概念

生产者

产生数据发送消息的程序是生产者。

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定 。

队列

队列是RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

各个名词介绍

**Broker:**接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。

**Virtual host:**出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

**Connection:**publisher/consumer 和 broker 之间的 TCP 连接。

**Channel:**如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。

**Exchange:**message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) 。

**Queue:**消息最终被送到这里等待 consumer 取走。

**Binding:**exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

交换机和交换机类型

服务器发送消息不会直接发送到队列中(Queue),只能将消息发送给交换机(Exchange),然后根据确定的规则,RabbitMQ将会决定消息该投递到哪个队列。这些规则称为路由键(routing key),队列通过路由键绑定到交换机上。消息发送到服务器端(broker),消息也有自己的路由键(也可以是空),RabbitMQ也会将消息和消息指定发送的交换机的绑定(binding,就是队列和交互机的根据路由键映射的关系)的路由键进行匹配。

如果匹配的话,就会将消息投递到相应的队列。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息,把消息放到特定队列?把消息放到许多队列中?丢弃消息?这就的由交换机的类型来决定。

交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 0-9-1的代理提供了四种交换机。

Name(交换机类型) Default pre-declared names(预声明的默认名称)
Direct exchange(直连交换机) (Empty string) and amq.direct
Fanout exchange(扇型交换机) amq.fanout
Topic exchange(主题交换机) amq.topic
Headers exchange(头交换机) amq.match (and amq.headers in RabbitMQ)

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

  • Name
  • Durability (消息代理重启后,交换机是否还存在)
  • Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
  • Arguments(依赖代理本身)

交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。

Direct Exchange

将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。

image-20211230172801166

Topic Exchange

将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。

image-20211230172811483

topic要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,**它必须是一个单词列表,以点号分隔开。**这些单词可以是任意单词,比如说:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。

规则

在这个规则列表中,其中有两个替换符是大家需要注意的。

*(星号)可以代替一个单词

#(井号)可以替代零个或多个单词

注意

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

Fanout Exchange

直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key

image-20211230172824713

Headers Exchange

将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中。

六大模式

简单模式

image-20211230172833658

工作模式

image-20211230172845711

发布订阅模式

image-20211230172957861

路由模式

image-20211230173014717

主题模式

image-20211230173025614

RPC模式

image-20211230173036138


绑定

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和哪个 queue 进行了绑定关系。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。

image-20211230173046913

如果AMQP的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。

消息分发

轮训分发

RabbitMQ 默认分发消息采用的轮训分发的,如果同一队列有多个消费节点,则会按照消息顺序进行轮训消费。

不公平分发

RabbitMQ 默认分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数 channel.basicQos(1); 意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

限流

通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。


消息确认

发布确认

原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

开启发布确认

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法。

1
2
3
4
5
//创建 channel实例
channel = connection.createChannel();

// 开启发布确认
channel.confirmSelect();

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布**,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条**发布消息的吞吐量。

1
2
3
4
// 默认0L
channel.waitForConfirmsOrDie();
// 时间内
channel.waitForConfirmsOrDie(1000L);

批量确认发布

与单个等待确认消息相比,先发布一批消息然后一起确认waitForConfirms()可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}

//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}

异步确认发布

利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个跳表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目,只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

/**
* 确认监听器
* 1. 消息序列号
* 2. true 可以确认小于等于当前序列号的消息
* 3. false 确认当前序列号消息
*/
ConfirmListener confirmListener = new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 是否批量
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
//清除该部分未确认消息
confirmed.clear();
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
String message = outstandingConfirms.get(deliveryTag);
logger.error("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
}
};


/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(confirmListener);

for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}

回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

1
2
3
4
5
6
7
8
9
10
11
// mandatory true 强制推送到一个队列中 
public abstract void publish(String topic,boolean mandatory, boolean immediate,boolean durable,Object data) throws Exception;

ReturnListener returnListener = new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, BasicProperties properties, byte[] body) throws IOException {

}
};
// 添加回退监听器
channel.addReturnListener(returnListener);

消息应答

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

  • Channel.basicAck(用于肯定确认)RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了

批量应答

在手动应答时,**指定 multiple 为 true,**可以进行批量应答,从而减少网络拥堵。

1
channel.basicAck(envelope.getDeliveryTag(), true);

重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。在手动应答时,指定 requeue 为 true,可以进行重新入队。

1
2
// requeue ? 重新入队 : 丢弃
channel.basicReject(envelope.getDeliveryTag(), true);

持久化

我们需要将队列和消息都标记为持久化。来保障当 RabbitMQ 服务停掉或奔溃以后消息生产者发送过来的消息不丢失。

队列持久化

默认我们创建的队列都是非持久化的。rabbitmq 如果重启的话,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化。

1
2
boolean queue_durable = true;
channel.queueDeclare(queue_name, queue_durable, false, false, getQueueArgs(queueLength,queueByteLength));

注意:如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

消息持久化

要想让消息实现持久化,需要在生产者发布消息时 指定 deliveryMode 为 2。

1
2
3
4
5
6
public abstract void publish(String topic,boolean mandatory, boolean immediate,boolean durable,Object data) throws Exception;

BasicProperties.Builder propsBuilder = new BasicProperties.Builder();

//是否持久化
propsBuilder.deliveryMode(durable ? 2 : 1);

注意:将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没

有真正写入磁盘。持久性保证并不强,。如果需要更强有力的持久化策略,可以增加发布确认


队列

死信队列

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

死信来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

设置死信队列

image-20211230173628371

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);

// 死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead_routingkey");

// 正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead_routingkey");
String normalQueue = "normal-queue";
// 声明正常队列
channel.queueDeclare(normalQueue, false, false, false, params);
// 正常队列绑定
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal_routingkey");

延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

实现方式

  • 基于消息TTL和队列TTL转换入死信队列,消费死信队列中的消息
  • 基于延迟插件,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
  • 用 Java 的 DelayQueue
  • 利用 Redis 的 zset

优先队列

要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序。

1
2
3
4
5
6
7
8
// 队列
Map args = MapUtils.toMap(new Object[][]{{"x-max-priority", 10}});
channel.queueDeclare(queue_name, queue_durable, false, false, args);

// 消息
BasicProperties.Builder propsBuilder = new BasicProperties.Builder();
propsBuilder.priority(5);
channel.basicPublish(exchange_name, topic, null, str_msg.getBytes("UTF-8"));

惰性队列

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。

队列声明

1
2
3
// 队列
Map args = MapUtils.toMap(new Object[][]{{"x-queue-mode", "lazy"}});
channel.queueDeclare(queue_name, queue_durable, false, false, args);

内存开销对比

image-20211230173648687

在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB。

TTL(Time to Live)

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为”死信”。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用

队列TTL

1
2
Map args = MapUtils.toMap(new Object[][]{{"x-message-ttl", 2000}});
channel.queueDeclare(queue_name, queue_durable, false, false, args);

消息TTL

1
2
BasicProperties.Builder basicProperties = new BasicProperties.Builder();
basicProperties.expiration("1000");

幂等性

消息重复消费

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

解决思路

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

幂等性保障

唯一 ID+指纹码机制

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

Redis 原子性

利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。