目录
  • 一、集群架构简介
  • 二、普通集群搭建
    • 2.1 各个节点分别安装rabbitmq
    • 2.2 把节点加入集群
    • 2.3 代码演示普通集群的问题
  • 三、镜像集群
    • 四、haproxy环境搭建。
      • 五、keepalived 环境搭建

        一、集群架构简介

        当单台 rabbitmq 服务器的处理消息的能力达到瓶颈时,此时可以通过 rabbitmq 集群来进行扩展,从而达到提升吞吐量的目的。rabbitmq 集群是一个或多个节点的逻辑分组,集群中的每个节点都是对等的,每个节点共享所有的用户,虚拟主机,队列,交换器,绑定关系,运行时参数和其他分布式状态等信息。一个高可用,负载均衡的 rabbitmq 集群架构应类似下图:

        解析说明:

        最下面层是rabbitmq的集群,没有ha镜像时是普通集群,普通集群的缺点是挂了一个机器,以这个机器为根的队列就无法使用了(一个队列的数据只会存在一个节点),无法实现高可用

        所以把队列变成镜像队列,这样每个节点都会有一份完整的数据,有节点挂了也不影响使用,实现了高可用,但rabbitmq集群本身没有实现负载均衡,也就是说对于一个三节点的集群,

        每个节点的负载可能都是不相同的。

        haproxy层的作用就是为了实现rabbitmq集群的负载均衡,但一个节点的话显然也不能高可用,所以需要两个haproxy实现haproxy的高可用,但没法实现自动的故障转移,就是haproxy1挂了,

        需要手动把ip地址改成haproxy2的。

        所以需要用到keepalived,它通常由一主一备两个节点组成,同一时间内只有主节点会提供对外服务,并同时提供一个虚拟的 ip 地址 (virtual internet protocol address ,简称 vip),可以避免暴露真实ip 。 如果主节点故障,那么备份节点会自动接管 vip 并成为新的主节点 ,直到原有的主节点恢复。

        生产环境架构应该为:

        机器1:rabbitmq1,机器2:rabbitmq2,机器3:rabbitmq3,机器4:haproxy+keeplived(主),机器5(haproxy+keeplived(备)。

        这里资源原因只有3台机器,所以搭建架构为:

        172.16.2.84(rabbit1) rabbitmq1,haproxy+keeplived(主)
        172.16.2.85(rabbit2) rabbitmq2,haproxy+keeplived(备)
        172.16.2.86(rabbit3) rabbitmq3

        二、普通集群搭建

        2.1 各个节点分别安装rabbitmq

        这里前面的章节功能使用是用的docker安装,这里集群不用docker,因为docker安装会有很多的映射,很容易扰乱,装有docker的rabbitmq的,需要先把docker停掉。

        这里准备了3台机器,172.16.2.84(rabbit1),172.16.2.85(rabbit2),172.16.2.86(rabbit3),为了避免混淆,下面都会用rabbit1,rabbit2,rabbit3称呼。

        rabbit1,rabbit2,rabbit3都执行一遍下面的安装。

        1)安装前,先修改hostname,因为rabbitmq集群通讯需要用hostname

        rabbit1机器执行

        hostnamectl set-hostname rabbit1 –static

        rabbit2机器执行

        hostnamectl set-hostname rabbit2 –static

        rabbit3机器执行

         hostnamectl set-hostname rabbit3 –static

        rabbit1,rabbit2,rabbit3执行同样操作

        #修改hosts,因为rabbitmq通讯要通过hostname

        vi /etc/hosts

        2)把ip对应hostname添加到后面

        172.16.2.84 rabbit1
        172.16.2.85 rabbit2
        172.16.2.86 rabbit3

        # 重启网络
        systemctl restart network
        # 重启机器
        init 6

        3)安装erlang

        #第一步 运行package cloud提供的erlang安装脚本
        curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash

        #第二步 安装erlang
        yum install erlang

        #第三步 查看erlang版本号,在命令行直接输入erl
        erl

        4)安装rabbitmq

        #第一步 先导入两个key
        rpm –import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
        rpm –import https://packagecloud.io/gpg.key

        #第二步 运行package cloud提供的rabbitmq安装脚本
        curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

        #第三步 下载rabbit安装文件
        wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.5/rabbitmq-server-3.9.5-1.el8.noarch.rpm

        #第四步
        rpm –import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

        #第五步 rabbitmq依赖
        yum -y install epel-release
        yum -y install socat

        #第六步 安装
        rpm -ivh rabbitmq-server-3.9.5-1.el8.noarch.rpm

        #第七步 启用管理平台插件,启用插件后,可以可视化管理rabbitmq
        rabbitmq-plugins enable rabbitmq_management

        #第八步 启动应用
        systemctl start rabbitmq-server

        上面rabbitmq的版本号安装文件地址:https://github.com/rabbitmq/rabbitmq-server/releases/

        5)设置访问权限

        #创建管理员账户
        rabbitmqctl add_user admin 123456
        #设置注册的账户为管理员
        rabbitmqctl set_user_tags admin administrator
        #授权远程访问
        rabbitmqctl set_permissions -p / admin "." "." ".*"
        #重启服务
        systemctl restart rabbitmq-server

        注意这里关了防火墙,如果开启防火墙的话,需要把15672端口开放出来

        #查看防火墙状态
        systemctl status firewalld
        #关闭防火墙
        systemctl stop firewalld

        到这里,3台机器都安装好了rabbitmq。

        2.2 把节点加入集群

        1)停止服务rabbit2,rabbit3

        #停止全部服务
        systemctl stop rabbitmq-server

        2)拷贝cookie

        将rabbit1上的.erlang.cookie文件拷贝到其他两台主机上。该 cookie 文件相当于密钥令牌,集群中的 rabbitmq 节点需要通过交换密钥令牌以获得相互认证,因此处于同一集群的所有节点需要具有相同的密钥令牌,否则在搭建过程中会出现 authentication fail 错误,只要保证这3台机器中的.erlang.cookie内的密钥字符串一致即可,这里把rabbit1的密钥复制到rabbit2,rabbit3。

        3个机器都给.erlang.cookie 400权限。

        #给600权限
        chmod 600 /var/lib/rabbitmq/.erlang.cookie

        rabbit1机器

        #编辑文件
        vi /var/lib/rabbitmq/.erlang.cookie

        把内容复制出来,修改rabbit2,rabbit3这个文件的值为这个。

        启动服务

        #开启全部服务
        systemctl start rabbitmq-server

        3)集群搭建

        rabbitmq 集群的搭建需要选择其中任意一个节点为基准,将其它节点逐步加入。这里我们以 rabbit1为基准节点,将 rabbit2 和 rabbit3 加入集群。在 rabbit2和 rabbit3上执行以下命令:

        # 1.停止服务
        rabbitmqctl stop_app
        # 2.重置状态(需要更改节点类型的时候执行,首次不需要执行,除非你节点是以disk加入集群的)
        rabbitmqctl reset
        # 3.节点加入
        #rabbitmqctl join_cluster --ram rabbit@rabbit1
        rabbitmqctl join_cluster rabbit@rabbit1
        # 4.启动服务
        rabbitmqctl start_app

        join_cluster命令有一个可选的参数 —ram,该参数代表新加入的节点是内存节点,默认是磁盘节点。如果是内存节点,则所有的队列、交换器、绑定关系、用户、访问权限和 vhost 的元数据都将存储在内存中,如果是磁盘节点,则存储在磁盘中。内存节点可以有更高的性能,但其重启后所有配置信息都会丢失,因此rabbitmq 要求在集群中至少有一个磁盘节点,其他节点可以是内存节点,大多数情况下rabbitmq 的性能都是够用的,可以采用默认的磁盘节点的形式。

        另外,如果节点以磁盘节点的形式加入,则需要先使用reset命令进行重置,然后才能加入现有群集,重置节点会删除该节点上存在的所有的历史资源和数据。采用内存节点的形式加入时可以略过reset这一步,因为内存上的数据本身就不是持久化的。

        操作上面的,一个普通集群就搭建成功了,打开rabbit管理界面(随便打开一个都是一样的)。

        2.3 代码演示普通集群的问题

        普通集群中, 第一次创建队列时,会随机选一个节点作为根节点,这个节点会存储队列的信息(交互机,路由,队列名等)和队列的数据,其它两个节点只会同步根节点的元信息(交换机,路由,队列名)等,

        但不会存储队列的数据,他们是通过元信息找到根节点读写消息。

        例如集群选择了rabbit2作为根节点,那么数据存储在rabbit2,rabbit1和rabbit3是没有数据的,那么如果rabbit2宕机了,队列里面的数据就取不到了。

        代码演示,.netcore5.0读取集群连接代码。

        /// <summary>
                /// 获取集群连接对象
                /// </summary>
                /// <returns></returns>
                public static iconnection getclusterconnection()
                {
                    var factory = new connectionfactory
                    {
                        username = "admin",//账户
                        password = "123456",//密码
                        virtualhost = "/" //虚拟机
                    };
                    list<amqptcpendpoint> list = new list<amqptcpendpoint>()
                    {
                        new amqptcpendpoint(){hostname="172.16.2.84",port=5672},
                         new amqptcpendpoint(){hostname="172.16.2.85",port=5672},
                          new amqptcpendpoint(){hostname="172.16.2.86",port=5672}
                    };
                    return factory.createconnection(list);
                }

        生产者代码:

        /// <summary>
                /// 工作队列模式
                /// </summary>
                public static void workersendmsg()
                {
                    string queuename = "worker_order";//队列名
                    //创建连接
                    using (var connection = rabbitmqhelper.getclusterconnection())
                    {
                        //创建信道
                        using (var channel = connection.createmodel())
                        {
                            //创建队列
                            channel.queuedeclare(queuename, durable: true, exclusive: false, autodelete: false, arguments: null);
                            ibasicproperties properties = channel.createbasicproperties();
                            properties.persistent = true; //消息持久化
                            for ( var i=0;i<10;i++)
                            {
                                string message = $"hello rabbitmq messagehello,{i+1}";
                                var body = encoding.utf8.getbytes(message);
                                //发送消息到rabbitmq
                                channel.basicpublish(exchange: "", routingkey: queuename, mandatory: false, basicproperties: properties, body);
                                console.writeline($"发送消息到队列:{queuename},内容:{message}");
                            }
                        }
                    }
                }

        执行:

        查看rabbitmq管理界面

        rabbitmq选择了rabbit3作为根节点,现在试一下停止rabbit3节点

        #停止服务
        rabbitmqctl stop_app

        发现队列不可用了,启动rabbit3,再试一下停止rabbit2

        发现队列正常,试下消费数据:

        public static void workerconsumer()
                {
                    string queuename = "worker_order";
                    var connection = rabbitmqhelper.getclusterconnection();
                    {
                        //创建信道
                        var channel = connection.createmodel();
                        {
                            //创建队列
                            channel.queuedeclare(queuename, durable: true, exclusive: false, autodelete: false, arguments: null);
                            var consumer = new eventingbasicconsumer(channel);
                            ///prefetchcount:1来告知rabbitmq,不要同时给一个消费者推送多于 n 个消息,也确保了消费速度和性能
                            ///global:是否设为全局的
                            ///prefetchsize:单条消息大小,通常设0,表示不做限制
                            //是autoack=false才会有效
                            channel.basicqos(prefetchsize: 0, prefetchcount: 1, global: true);
                            int i = 1;
                            int index = new random().next(10);
                            consumer.received += (model, ea) =>
                            {
                                //处理业务
                                var message = encoding.utf8.getstring(ea.body.toarray());
                                console.writeline($"{i},消费者:{index},队列{queuename}消费消息长度:{message.length}");
                                thread.sleep(1000);
                                channel.basicack(ea.deliverytag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了
                                i++;
                            };
                            channel.basicconsume(queuename, autoack: false, consumer);
                        }
                    }
                }

        如果根节点挂了,再往集群发送数据,rabbitmq又会从其余的选一个作为根节点,就可能的情况是,多个节点都存有不同队列的数据。

        到这里,可以看到普通集群并不可以高可用, 根节点挂了,在这个节点上的队列也不可用了。但三个节点都没问题的时候,可以提高并发和队列的负载。

        要实现高可用,就要用到了镜像集群

        三、镜像集群

        镜像集群,在每个节点上都同步一份镜像数据,相当于每个节点都有一份完整的数据,这样有节点宕机了,还能正常提供rabbitmq服务。

        变镜像集群很简单,在上面普通集群的基础上,在任意一个节点下执行

        #把集群变成镜像集群
        rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

        执行完,看回队列,多了ha标识。

        四、haproxy环境搭建。

        1)下载

        haproxy 官方下载地址为:https://www.haproxy.org/,如果这个网站无法访问,也可以从https://src.fedoraproject.org/repo/pkgs/haproxy/上进行下载。

        #下载haproxy
        wget https://www.haproxy.org/download/2.4/src/haproxy-2.4.3.tar.gz

        解压

        #解压
        tar xf haproxy-2.4.3.tar.gz

        2)编译

        安装gcc

        yum install gcc

        进入解压后根目录,执行下面的编译命令:

        #进入haproxy-2.4.3目录后执行
        make target=linux-glibc  prefix=/usr/app/haproxy-2.4.3
        make install prefix=/usr/app/haproxy-2.4.3

        3)配置环境变量

        #编辑文件
        vi /etc/profile
        
        #把这两项加上
        export haproxy_home=/usr/app/haproxy-2.4.3
        export path=$path:$haproxy_home/sbin

        使得配置的环境变量立即生效:

        source /etc/profile

        4)负载均衡配置

        新建配置文件haproxy.cfg,这里我新建的位置为:/etc/haproxy/haproxy.cfg,文件内容如下:

        global
        # 日志输出配置、所有日志都记录在本机,通过 local0 进行输出
        log 127.0.0.1 local0 info
        # 最大连接数
        maxconn 4096
        # 改变当前的工作目录
        chroot /usr/app/haproxy-2.4.3
        # 以指定的 uid 运行 haproxy 进程
        uid 99
        # 以指定的 gid 运行 haproxy 进程
        gid 99
        # 以守护进行的方式运行
        daemon
        # 当前进程的 pid 文件存放位置
        pidfile /usr/app/haproxy-2.4.3/haproxy.pid

        # 默认配置
        defaults
        # 应用全局的日志配置
        log global
        # 使用4层代理模式,7层代理模式则为”http”
        mode tcp
        # 日志类别
        option tcplog
        # 不记录健康检查的日志信息
        option dontlognull
        # 3次失败则认为服务不可用
        retries 3
        # 每个进程可用的最大连接数
        maxconn 2000
        # 连接超时
        timeout connect 5s
        # 客户端超时
        timeout client 120s
        # 服务端超时
        timeout server 120s

        # 绑定配置
        listen rabbitmq_cluster
        bind :5671
        # 配置tcp模式
        mode tcp
        # 采用加权轮询的机制进行负载均衡
        balance roundrobin
        # rabbitmq 集群节点配置
        server rabbit1 rabbit1:5672 check inter 5000 rise 2 fall 3 weight 1
        server rabbit2 rabbit2:5672 check inter 5000 rise 2 fall 3 weight 1
        server rabbit3 rabbit3:5672 check inter 5000 rise 2 fall 3 weight 1

        # 配置监控页面
        listen monitor
        bind :8100
        mode http
        option httplog
        stats enable
        stats uri /stats
        stats refresh 5s

        上传上去的,记得打开看一下换行符有没有出行这些符号,如果有要删掉,不然会报错。

        负载均衡的主要配置在listen rabbitmq_cluster下,这里指定负载均衡的方式为加权轮询,同时定义好健康检查机制:

        server rabbit1 rabbit1:5672 check inter 5000 rise 2 fall 3 weight 1

        以上配置代表对地址为 rabbit1:5672 的 rabbit1 节点每隔 5 秒进行一次健康检查,如果连续两次的检查结果都是正常,则认为该节点可用,此时可以将客户端的请求轮询到该节点上;如果连续 3 次的检查结果都不正常,则认为该节点不可用。weight 用于指定节点在轮询过程中的权重。

        5)启动服务

        haproxy -f /etc/haproxy/haproxy.cfg

        启动后可以在监控页面进行查看,端口为设置的 8100,完整地址为:http://172.16.2.84:8100/stats,页面情况如下:

        所有节点都为绿色,代表节点健康。此时证明 haproxy 搭建成功,并已经对 rabbitmq 集群进行监控。

        这里已经实现了rabbitmq的负载均衡了,代码怎么通过haproxy连接rabbit集群呢,因为上面配置haproxy暴露的端口是5671,所以ip是haproxy的ip:5671。

        连接代码,可以通过haproxy1 或haproxy2都可以:

        public static iconnection getconnection()
                {
        
                    connectionfactory factory = new connectionfactory
                    {
                        hostname = "172.16.2.84",//haproxy ip
                        port = 5671,//haproxy 端口
                        username = "admin",//账号
                        password = "123456",//密码
                        virtualhost = "/" //虚拟主机
                    };
        
                    return factory.createconnection();
                }

        五、keepalived 环境搭建

        接着就可以搭建 keepalived 来解决 haproxy 故障转移的问题。这里我在 rabbit1 和 rabbit2 上安装 keepalived ,两台主机上的搭建的步骤完全相同,只是部分配置略有不同,具体如下:

        官网:https://www.keepalived.org

        1)安装

        yum install -y keepalived

        2)修改配置文件

        安装了keepalived后,配置文件生成在/etc/keepalived/keepalived.conf

        这里先对keepalived1上keepalived.conf配置文件进行修改,完整内容如下:

        global_defs {
           # 路由id,主备节点不能相同
           router_id node1
        }
        ​
        # 自定义监控脚本
        vrrp_script chk_haproxy {
            # 脚本位置
            script "/etc/keepalived/haproxy_check.sh" 
            # 脚本执行的时间间隔
            interval 5 
            weight 10
        }
        ​
        vrrp_instance vi_1 {
            # keepalived的角色,master 表示主节点,backup 表示备份节点
            state master  
            # 指定监测的网卡,可以使用 ip addr 进行查看
            interface ens33
            # 虚拟路由的id,主备节点需要设置为相同
            virtual_router_id 1
            # 优先级,主节点的优先级需要设置比备份节点高
            priority 100 
            # 设置主备之间的检查时间,单位为秒 
            advert_int 1 
            # 定义验证类型和密码
            authentication { 
                auth_type pass
                auth_pass 123456
            }
        ​
            # 调用上面自定义的监控脚本
            track_script {
                chk_haproxy
            }
        ​
            virtual_ipaddress {
                # 虚拟ip地址,可以设置多个
                172.16.2.200  
            }
        }

        以上配置定义了 keepalived1上的 keepalived 节点为 master 节点,并设置对外提供服务的虚拟 ip 为 172.16.2.200。此外最主要的是定义了通过haproxy_check.sh来对 haproxy 进行监控,这个脚本需要我们自行创建,内容如下:

        #!/bin/bash
        # 判断haproxy是否已经启动
        if [ `ps -c haproxy --no-header | wc -l` -eq 0 ] ; then
            #如果没有启动,则启动
            haproxy -f /etc/haproxy/haproxy.cfg
        fi
        ​
        #睡眠3秒以便haproxy完全启动
        sleep 3
        ​
        #如果haproxy还是没有启动,此时需要将本机的keepalived服务停掉,以便让vip自动漂移到另外一台haproxy
        if [ `ps -c haproxy --no-header | wc -l` -eq 0 ]; then
            systemctl stop keepalived
        fi

        创建后为其赋予执行权限:

        chmod +x /etc/keepalived/haproxy_check.sh

        这个脚本主要用于判断 haproxy 服务是否正常,如果不正常且无法启动,此时就需要将本机 keepalived 关闭,从而让虚拟 ip 漂移到备份节点。

        备份节点(keepalived2)的配置与主节点基本相同,但是需要修改其 state 为 backup;同时其优先级 priority 需要比主节点低。完整配置如下:

        global_defs {
           # 路由id,主备节点不能相同    
           router_id node2  
        ​
        }
        ​
        vrrp_script chk_haproxy {
            script "/etc/keepalived/haproxy_check.sh" 
            interval 5 
            weight 10
        }
        ​
        vrrp_instance vi_1 {
            # backup 表示备份节点
            state backup 
            interface ens33
            virtual_router_id 1
            # 优先级,备份节点要比主节点低
            priority 50 
            advert_int 1 
            authentication { 
                auth_type pass
                auth_pass 123456
            }
            
            track_script {
                chk_haproxy
            }
        ​
            virtual_ipaddress {
                172.16.2.200  
            }
        }

        haproxy_check.sh文件和keepalived1相同

        3)启动服务

        分别在keepalived1和keepalived2上启动keepalived服务,命令如下:

        systemctl start  keepalived

        启动后此时 keepalived1 为主节点,可以在keepalived1 上使用ip a命令查看到虚拟 ip 的情况:

        此时只有 keepalived1上是存在虚拟 ip 的,而keepalived2 上是没有的。

        4)验证故障转移

        这里我们验证一下故障转移,因为按照我们上面的检测脚本,如果 haproxy 已经停止且无法重启时 keepalived 服务就会停止,这里我们直接使用以下命令停止 keepalived1 服务:

        systemctl stop keepalived

        此时再次使用ip a分别查看,可以发现 keepalived1上的 vip 已经漂移到 keepalived2上,情况如下:

        此时对外服务的 vip 依然可用,代表已经成功地进行了故障转移。至此集群已经搭建成功,任何需要发送或者接受消息的客户端服务只需要连接到该 vip 即可,示例如下:

        public static iconnection getconnection()
                {
                    connectionfactory factory = new connectionfactory()
                    {
                        hostname = "172.16.2.200",//vip
                        port = 5671,//haproxy 端口
                        username = "admin",//账号
                        password = "123456",//密码
                        virtualhost = "/" //虚拟主机
                    };
        
                    return factory.createconnection();
                }

        到此这篇关于运用.net core中实例讲解rabbitmq高可用集群构建的文章就介绍到这了,更多相关.net core rabbitmq集群构建内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!