Skip to content

1,集群结构

1,概述

通过 Erlang 的分布式特性(通过 magic cookie 认证节点)进行 RabbitMQ 集群,各 RabbitMQ 服务为对等节点,即每个节点都提供服务给客户端连接,进行消息发送与接收。

这些节点通过 RabbitMQ HA 队列(镜像队列)进行消息队列结构复制。本方案中搭建 3 个节点,并且都是磁盘节点(所有节点状态保持一致,节点完全对等),只要有任何一个节点能够工作,RabbitMQ 集群对外就能提供服务。

2,元数据

RabbitMQ 内部有各种基础构件,包括队列、交换器、绑定、虚拟主机等,他们组成了 AMQP 协议消息通信的基础,而这些构件以元数据的形式存在,它始终记录在 RabbitMQ 内部,它们分别是:

  • 队列元数据:队列名称和它们的属性
  • 交换器元数据:交换器名称、类型和属性
  • 绑定元数据:一张简单的表格展示了如何将消息路由到队列
  • vhost 元数据:为 vhost 内的队列、交换器和绑定提供命名空间和安全属性

在单一节点上,RabbitMQ 会将上述元数据存储到内存上,如果是磁盘节点(下面会讲),还会存储到磁盘上。

3,集群中的队列

这里有个问题需要思考,RabbitMQ 默认会将消息冗余到所有节点上吗?这样听起来正符合高可用的特性,只要集群上还有一个节点存活,那么就可以继续进行消息通信,但这也随之为 RabbitMQ 带来了致命的缺点:

  1. 每次发布消息,都要把它扩散到所有节点上,而且对于磁盘节点来说,每一条消息都会触发磁盘活动,这会导致整个集群内性能负载急剧拉升。
  2. 如果每个节点都有所有队列的完整内容,那么添加节点不会给你带来额外的存储空间,也会带来木桶效应,举个例子,如果集群内有个节点存储了 3G 队列内容,那么在另外一个只有 1G 存储空间的节点上,就会造成内存空间不足的情况,也就是无法通过集群节点的扩容提高消息积压能力。

解决这个问题就是通过集群中唯一节点来负责任何特定队列,只有该节点才会受队列大小的影响,其它节点如果接收到该队列消息,那么就要根据元数据信息,传递给队列所有者节点(也就是说其它节点上只存储了特定队列所有者节点的指针)。这样一来,就可以通过在集群内增加节点,存储更多的队列数据。

4,分布交换器

交换器其实是我们想象出来的,它本质是一张查询表,里面包括了交换器名称和一个队列的绑定列表,当你将消息发布到交换器中,实际上是你所在的信道将消息上的路由键与交换器的绑定列表进行匹配,然后将消息路由出去。有了这个机制,那么在所有节点上传递交换器消息将简单很多,而 RabbitMQ 所做的事情就是把交换器拷贝到所有节点上,因此每个节点上的每条信道都可以访问完整的交换器了。

5,内存节点与磁盘节点

关于上面队列所说的问题与解决办法,又有了一个伴随而来的问题出现:如果特定队列的所有者节点发生了故障,那么该节点上的队列和关联的绑定都会消失吗?

  1. 如果是内存节点,那么附加在该节点上的队列和其关联的绑定都会丢失,并且消费者可以重新连接集群并重新创建队列;
  2. 如果是磁盘节点,重新恢复故障后,该队列又可以进行传输数据了,并且在恢复故障磁盘节点之前,不能在其它节点上让消费者重新连到集群并重新创建队列,如果消费者继续在其它节点上声明该队列,会得到一个 404 NOT_FOUND 错误,这样确保了当故障节点恢复后加入集群,该节点上的队列消息不回丢失,也避免了队列会在一个节点以上出现冗余的问题。

接下来说说内存节点与磁盘节点在集群中的作用,在集群中的每个节点,要么是内存节点,要么是磁盘节点,如果是内存节点,会将所有的元数据信息仅存储到内存中,而磁盘节点则不仅会将所有元数据存储到内存上, 还会将其持久化到磁盘。

在单节点 RabbitMQ 上,仅允许该节点是磁盘节点,这样确保了节点发生故障或重启节点之后,所有关于系统的配置与元数据信息都会重磁盘上恢复;而在 RabbitMQ 集群上,允许节点上至少有一个磁盘节点,在内存节点上,意味着队列和交换器声明之类的操作会更加快速。原因是这些操作会将其元数据同步到所有节点上,对于内存节点,将需要同步的元数据写进内存就行了,但对于磁盘节点,意味着还需要及其消耗性能的磁盘写入操作。

RabbitMQ 集群只要求至少有一个磁盘节点,这是有道理的,当其它内存节点发生故障或离开集群,只需要通知至少一个磁盘节点进行元数据的更新,如果是碰巧唯一的磁盘节点也发生故障了,集群可以继续路由消息,但是不可以做以下操作了:

  • 创建队列
  • 创建交换器
  • 创建绑定
  • 添加用户
  • 更改权限
  • 添加或删除集群节点

这是因为上述操作都需要持久化到磁盘节点上,以便内存节点恢复故障可以从磁盘节点上恢复元数据,解决办法是在集群添加 2 台以上的磁盘节点,这样其中一台发生故障了,集群仍然可以保持运行,且能够在任何时候保存元数据变更。

2,安装

1,环境准备。

  • CentOS 版本:CentOS Linux release 7.5.1804 (Core)
  • RabbitMQ 版本:RabbitMQ 3.6
  • 三台主机:192.168.106.7,192.168.106.8,192.168.106.9

防火墙之类的全部关掉。

2,安装 Erlang

RabbitMQ 安装需要依赖 Erlang 环境

sh
cd /opt
wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
yum -y install erlang-19.0.4-1.el7.centos.x86_64.rpm
cd /opt
wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
yum -y install erlang-19.0.4-1.el7.centos.x86_64.rpm

3,安装 RabbitMQ

sh
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
yum -y install rabbitmq-server-3.6.10-1.el7.noarch.rpm
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
yum -y install rabbitmq-server-3.6.10-1.el7.noarch.rpm

4,启动服务

启动前先添加一下配置信息。

sh
vim /etc/rabbitmq/rabbitmq.config
[{rabbit, [{loopback_users, []}]}].
vim /etc/rabbitmq/rabbitmq.config
[{rabbit, [{loopback_users, []}]}].
  • 这里的意思是开放使用,rabbitmq 默认创建的用户 guest,密码也是 guest,这个用户默认只能是本机访问,localhost 或者 127.0.0.1,从外部访问需要添加上面的配置。

保存配置之后启动服务。

sh
systemctl start rabbitmq-server
systemctl status rabbitmq-server
systemctl enable rabbitmq-server
systemctl start rabbitmq-server
systemctl status rabbitmq-server
systemctl enable rabbitmq-server

5,启用 web 管理

sh
/sbin/rabbitmq-plugins enable rabbitmq_management
/sbin/rabbitmq-plugins enable rabbitmq_management
  • 访问:http://192.168.106.7:15672
  • 用户名:guest
  • 密码:guest

3,集群配置

1,修改 hostname。

sh
hostnamectl set-hostname node1
hostnamectl set-hostname node2
hostnamectl set-hostname node3
hostnamectl set-hostname node1
hostnamectl set-hostname node2
hostnamectl set-hostname node3

2,修改 hosts。

编辑/etc/hosts文件,添加到在三台机器的/etc/hosts中以下内容

sh
cat >> /etc/hosts << EOF
192.168.106.7 node1
192.168.106.8 node2
192.168.106.9 node3
EOF
cat >> /etc/hosts << EOF
192.168.106.7 node1
192.168.106.8 node2
192.168.106.9 node3
EOF

注意:三台主机上安装的 RabbitMQ 都保证都可以正常启动, 才可以进行以下操作

3,停止 RabbitMQ 服务

sh
$ systemctl stop rabbitmq-server
$ systemctl stop rabbitmq-server

进入下边操作之前,需要全部停止,否则在后边执行 rabbitmqctl stop_app将会报如下错误:

sh
[root@node3 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node3
Error: unable to connect to node rabbit@node3: nodedown
DIAGNOSTICS
===========
attempted to contact: [rabbit@node3]
rabbit@node3:
  * connected to epmd (port 4369) on node3
  * epmd reports node 'rabbit' running on port 25672
  * TCP connection succeeded but Erlang distribution failed
  * Hostname mismatch: node "rabbit@localhost" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@localhost"
current node details:
- node name: 'rabbitmq-cli-27@node3'
- home dir: /var/lib/rabbitmq
- cookie hash: hodtTGJwleXyVXlosQGJcg==
[root@node3 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node3
Error: unable to connect to node rabbit@node3: nodedown
DIAGNOSTICS
===========
attempted to contact: [rabbit@node3]
rabbit@node3:
  * connected to epmd (port 4369) on node3
  * epmd reports node 'rabbit' running on port 25672
  * TCP connection succeeded but Erlang distribution failed
  * Hostname mismatch: node "rabbit@localhost" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@localhost"
current node details:
- node name: 'rabbitmq-cli-27@node3'
- home dir: /var/lib/rabbitmq
- cookie hash: hodtTGJwleXyVXlosQGJcg==

设置不同节点间同一认证的 Erlang Cookie,这里将 node1 的 Cookie 传给另外两台。

sh
[root@node1 ~]$ scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
[root@node1 ~]$ scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
[root@node1 ~]$ scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
[root@node1 ~]$ scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/

5,注意事项

cookie 在所有节点上必须完全一样,同步时一定要注意。 erlang 是通过主机名来连接服务,必须保证各个主机名之间可以 ping 通。可以通过编辑 / etc/hosts 来手工添加主机名和 IP 对应关系。如果主机名 ping 不通,rabbitmq 服务启动会失败。

6,运行各节点

  • 设置好 cookie 后先将三个节点的 rabbitmq 重启。
sh
[root@node1 ~]$ rabbitmq-server -detached
[root@node2 ~]$ rabbitmq-server -detached
[root@node3 ~]$ rabbitmq-server -detached
[root@node1 ~]$ rabbitmq-server -detached
[root@node2 ~]$ rabbitmq-server -detached
[root@node3 ~]$ rabbitmq-server -detached
  • 查看单节点的集群状态。
sh
[root@node1 ~]$ rabbitmqctl cluster_status
Cluster status of node rabbit@node1
[{nodes,[{disc,[rabbit@node1]}]},
 {running_nodes,[rabbit@node1]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]}]}]
[root@node2 ~]$ rabbitmqctl cluster_status
Cluster status of node rabbit@node2
[{nodes,[{disc,[rabbit@node2]}]},
 {running_nodes,[rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]}]}]
[root@node3 ~]$ rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node3]}]},
 {running_nodes,[rabbit@node3]},
 {cluster_name,<<"rabbit@node3">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]}]}]
[root@node1 ~]$ rabbitmqctl cluster_status
Cluster status of node rabbit@node1
[{nodes,[{disc,[rabbit@node1]}]},
 {running_nodes,[rabbit@node1]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]}]}]
[root@node2 ~]$ rabbitmqctl cluster_status
Cluster status of node rabbit@node2
[{nodes,[{disc,[rabbit@node2]}]},
 {running_nodes,[rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]}]}]
[root@node3 ~]$ rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node3]}]},
 {running_nodes,[rabbit@node3]},
 {cluster_name,<<"rabbit@node3">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]}]}]

7,配置集群

1,加入集群

这里将 node2 和 node3 加入到 node1 当中,按照顺序执行 先node2node3

sh
node2 $ rabbitmqctl stop_app            # 停止rabbitmq服务
node2 $ rabbitmqctl join_cluster rabbit@node1    # node2和node1构成集群, node2必须能通过node1的主机名ping通
node2 $ rabbitmqctl start_app            # 开启rabbitmq服务
node3 $ rabbitmqctl stop_app            # 停止rabbitmq服务
node3 $ rabbitmqctl join_cluster rabbit@node1    # node3和node1构成集群, node2必须能通过node1的主机名ping通
node3 $ rabbitmqctl start_app            # 开启rabbitmq服务
node2 $ rabbitmqctl stop_app            # 停止rabbitmq服务
node2 $ rabbitmqctl join_cluster rabbit@node1    # node2和node1构成集群, node2必须能通过node1的主机名ping通
node2 $ rabbitmqctl start_app            # 开启rabbitmq服务
node3 $ rabbitmqctl stop_app            # 停止rabbitmq服务
node3 $ rabbitmqctl join_cluster rabbit@node1    # node3和node1构成集群, node2必须能通过node1的主机名ping通
node3 $ rabbitmqctl start_app            # 开启rabbitmq服务

此时 node2 与 node3 也会自动建立连接,查看集群成员,可以看到都加进来了。

sh
[root@node1 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node1
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node3,rabbit@node2,rabbit@node1]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]},{rabbit@node2,[]},{rabbit@node1,[]}]}]
[root@node1 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node1
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node3,rabbit@node2,rabbit@node1]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]},{rabbit@node2,[]},{rabbit@node1,[]}]}]
  • 第一行是集群中的节点成员
  • disc 表示这些都是磁盘节点,默认创建之后就是 disc。
  • 第二行是正在运行的节点成员。

如果要为集群增加新节点时,我们可以按照上面的步骤将新节点添加到集群。

2,停止一个节点

在集群中如果要停止一个节点执行命令 rabbitmqctl stop_apprabbitmqctl stop 或者可以 kill 掉节点的进程不影响其他节点运行(注意: 如果只有一个磁盘节点,如果干掉磁盘节点后消息数据会丢失)。

  • 停止 node3。
sh
[root@node3 ~]$rabbitmqctl stop
Stopping and halting node rabbit@node3
[root@node3 ~]$rabbitmqctl stop
Stopping and halting node rabbit@node3
  • 查看集群状态。
sh
[root@node1 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node1
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node2,rabbit@node1]},   #可以看到node3已经是非running状态
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]},{rabbit@node1,[]}]}]
[root@node1 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node1
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node2,rabbit@node1]},   #可以看到node3已经是非running状态
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]},{rabbit@node1,[]}]}]

3,重置一个节点。

刚刚将 node3 给停止了,如果再次启动node3,那么还会加入到节点中来。

sh
[root@node3 ~]$systemctl start rabbitmq-server
[root@node3 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]},  #可以看到node3又加入到集群中了
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]
[root@node3 ~]$systemctl start rabbitmq-server
[root@node3 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]},  #可以看到node3又加入到集群中了
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]

那么,如果才能将一个节点重置,与集群彻底脱离呢。

sh
[root@node3 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node3
[root@node3 ~]$rabbitmqctl reset
Resetting node rabbit@node3
[root@node3 ~]$rabbitmqctl start_app
Starting node rabbit@node3
[root@node3 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node3
[root@node3 ~]$rabbitmqctl reset
Resetting node rabbit@node3
[root@node3 ~]$rabbitmqctl start_app
Starting node rabbit@node3

然后再查看一下分离之后的集群状态。

sh
[root@node3 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node3]}]},   #node3又变成了单节点状态了。
 {running_nodes,[rabbit@node3]},
 {cluster_name,<<"rabbit@node3">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]}]}]
[root@node1 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node1
[{nodes,[{disc,[rabbit@node1,rabbit@node2]}]},  #集群内也看不到node3了
 {running_nodes,[rabbit@node2,rabbit@node1]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]},{rabbit@node1,[]}]}]
[root@node3 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node3]}]},   #node3又变成了单节点状态了。
 {running_nodes,[rabbit@node3]},
 {cluster_name,<<"rabbit@node3">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]}]}]
[root@node1 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node1
[{nodes,[{disc,[rabbit@node1,rabbit@node2]}]},  #集群内也看不到node3了
 {running_nodes,[rabbit@node2,rabbit@node1]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]},{rabbit@node1,[]}]}]

8,设置内存节点

上边两个节点加入集群的时候,默认的是作为磁盘节点,如果想要创建内存节点,只需在创建的时候多加一个--ram参数即可。

这里同样拿刚才的 node3 做测试:

sh
[root@node3 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node3
[root@node3 ~]$rabbitmqctl join_cluster --ram rabbit@node1
Clustering node rabbit@node3 with rabbit@node1
[root@node3 ~]$rabbitmqctl start_app
Starting node rabbit@node3
[root@node3 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node3
[root@node3 ~]$rabbitmqctl join_cluster --ram rabbit@node1
Clustering node rabbit@node3 with rabbit@node1
[root@node3 ~]$rabbitmqctl start_app
Starting node rabbit@node3

然后查看一下集群状态:

sh
[root@node3 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node2,rabbit@node1]},{ram,[rabbit@node3]}]}, #可以看到node3有一个ram的标识
 {running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]
[root@node3 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node2,rabbit@node1]},{ram,[rabbit@node3]}]}, #可以看到node3有一个ram的标识
 {running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]

在 RabbitMQ 集群里,必须至少有一个磁盘节点存在

9,更改节点属性

还能通过一些命令,更改节点的属性。

先将刚刚的 node3 改回到磁盘节点:

sh
[root@node3 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node3
[root@node3 ~]$rabbitmqctl change_cluster_node_type disc
Turning rabbit@node3 into a disc node
[root@node3 ~]$rabbitmqctl start_app
Starting node rabbit@node3
[root@node3 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]
[root@node3 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node3
[root@node3 ~]$rabbitmqctl change_cluster_node_type disc
Turning rabbit@node3 into a disc node
[root@node3 ~]$rabbitmqctl start_app
Starting node rabbit@node3
[root@node3 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node3
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]

然后再把 node2 变更为内存节点:

sh
[root@node2 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node2
[root@node2 ~]$rabbitmqctl change_cluster_node_type ram
Turning rabbit@node2 into a ram node
[root@node2 ~]$rabbitmqctl start_app
Starting node rabbit@node2
[root@node2 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node2
[{nodes,[{disc,[rabbit@node3,rabbit@node1]},{ram,[rabbit@node2]}]},
 {running_nodes,[rabbit@node1,rabbit@node3,rabbit@node2]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node3,[]},{rabbit@node2,[]}]}]
[root@node2 ~]$rabbitmqctl stop_app
Stopping rabbit application on node rabbit@node2
[root@node2 ~]$rabbitmqctl change_cluster_node_type ram
Turning rabbit@node2 into a ram node
[root@node2 ~]$rabbitmqctl start_app
Starting node rabbit@node2
[root@node2 ~]$rabbitmqctl cluster_status
Cluster status of node rabbit@node2
[{nodes,[{disc,[rabbit@node3,rabbit@node1]},{ram,[rabbit@node2]}]},
 {running_nodes,[rabbit@node1,rabbit@node3,rabbit@node2]},
 {cluster_name,<<"rabbit@node1">>},
 {partitions,[]},
 {alarms,[{rabbit@node1,[]},{rabbit@node3,[]},{rabbit@node2,[]}]}]

10,登录后台

img

上面配置 RabbitMQ 默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制,虽然该模式解决一部分节点压力,但队列节点宕机直接导致该队列无法使用,只能等待重启,所以要想在队列节点宕机或故障也能正常使用,就要复制队列内容到集群里的每个节点,也就引入了镜像队列的概念。

11,镜像队列

1,概念

镜像队列可以同步 queue 和 message,当主 queue 挂掉,从 queue 中会有一个变为主 queue 来接替工作。

镜像队列是基于普通的集群模式的, 所以你还是得先配置普通集群, 然后才能设置镜像队列。

镜像队列设置后,会分一个主节点和多个从节点,如果主节点宕机,从节点会有一个选为主节点,原先的主节点起来后会变为从节点。

queue 和 message 虽然会存在所有镜像队列中,但客户端读取时不论物理面连接的主节点还是从节点,都是从主节点读取数据,然后主节点再将 queue 和 message 的状态同步给从节点,因此多个客户端连接不同的镜像队列不会产生同一 message 被多次接受的情况。

2,配置

在普通集群的中任意节点启用策略,策略会自动同步到集群节点

  • 命令格式
sh
set_policy [-p vhostpath] {name} {pattern} {definition} [priority]
set_policy [-p vhostpath] {name} {pattern} {definition} [priority]
  • 在任意一个节点上执行
sh
[root@node1 ~]$rabbitmqctl set_policy -p / ha-allqueue "^isj" '{"ha-mode":"all"}'
Setting policy "ha-allqueue" for pattern "^isj" to "{\"ha-mode\":\"all\"}" with priority "0"
[root@node1 ~]$rabbitmqctl set_policy -p / ha-allqueue "^isj" '{"ha-mode":"all"}'
Setting policy "ha-allqueue" for pattern "^isj" to "{\"ha-mode\":\"all\"}" with priority "0"

注意"^isj" 这个规则要根据自己实际情况修改,这个是指同步”isj” 开头的队列名称,配置时使用的应用于所有队列,所以表达式为”^”。

12,集群重启

集群重启时,最后一个挂掉的节点应该第一个重启,如果因特殊原因(比如同时断电),而不知道哪个节点最后一个挂掉。可用以下方法重启:

先在一个节点上执行:

sh
$ rabbitmqctl force_boot
$ service rabbitmq-server start
$ rabbitmqctl force_boot
$ service rabbitmq-server start

在其他节点上执行

sh
$ service rabbitmq-server start
$ service rabbitmq-server start

查看 cluster 状态是否正常(要在所有节点上查询)。

sh
$ rabbitmqctl cluster_status
$ rabbitmqctl cluster_status

13,前端代理

网上有不少使用 haproxy 做前端代理的,这里使用 nginx 作为前端代理,配置如下:

nginx
upstream rabbitmq{
    server 192.168.106.7:5672 max_fails=2 fail_timeout=3s weight=2; ##最大失败2次 超时3秒
    server 192.168.106.8:5672 max_fails=2 fail_timeout=3s weight=2;
    server 192.168.106.9:5672 max_fails=2 fail_timeout=3s weight=2;
}
server {
    listen 5678;
    server_name 192.168.106.16;
    proxy_connect_timeout 1s;
    proxy_timeout 3s;
    proxy_pass rabbitmq;
    }
upstream rabbitmq{
    server 192.168.106.7:5672 max_fails=2 fail_timeout=3s weight=2; ##最大失败2次 超时3秒
    server 192.168.106.8:5672 max_fails=2 fail_timeout=3s weight=2;
    server 192.168.106.9:5672 max_fails=2 fail_timeout=3s weight=2;
}
server {
    listen 5678;
    server_name 192.168.106.16;
    proxy_connect_timeout 1s;
    proxy_timeout 3s;
    proxy_pass rabbitmq;
    }

然后连接的时候直接使用 nginx 地址即可,如果还有更大的量,或者更加严苛的需求,那么可以再添加 keepalived 实现 nginx 的高可用。

4,参考