简介

 rabbitmq是实现了高级消息队列协议(amqp)的开源消息中间件,基于erlang语言编写。

 

p:(producling)生产者,生产只意味着发送消息。

 

q: (queue_name)队列,队列是位于rabbitmq中的post box的名称

 

c: (consuming)消费者,消费者主要是等待接收消息的程序

 

 

开发准备

  •  netcoretset.core:该工程主要封装了rabbitmq的公用方法
  • rabbitmqclient    :该工程为生产者
  • rabbitmqserver  :该工程为消费者

 

1.创建netcoretset.core类库项目

 

1.1 安装项目依赖

 

2.定义接口

using netcoretest.core.model;
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core.iserver
{
    public interface iconnectionserver
    {
      
        /// <summary>
        /// 连接服务
        /// </summary>
        void connection();
        /// <summary>
        /// 创建消息队列
        /// </summary>
        /// <param name="quename">队列名称</param>
        void createqueuedir();
        /// <summary>
        /// 关闭连接
        /// </summary>
        void closeconnection();
        /// <summary>
        /// 关闭通道
        /// </summary>
        void closechannel();


    }
}
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core.iserver
{
    public interface imessageservice
    {
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="msg">消息内容</param>
        void sendmsg(string msg);
        /// <summary>
        /// 获取消息
        /// </summary>
        /// <returns></returns>
        string getmsg();
    }
}
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core.iserver
{
   public interface irabbitmqservice:imessageservice,iconnectionserver
    {
    }
}

 

 3.编写rabbitmq辅助类

using netcoretest.core.iserver;
using netcoretest.core.model;
using rabbitmq.client;
using rabbitmq.client.events;
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core
{
    public class rabbitmqmodel : irabbitmqservice
    {

        private readonly connectionfactory factory = null;
        private imodel channel;
        private iconnection connetction;
        readonly string exchangename;//交换机名称
        readonly string routekey;//路由名称
        readonly string queuename;///队列名称
        public rabbitmqmodel(hostmodel model)
        {
            /// <summary>
            /// 创建连接工厂
            /// </summary>
            factory = new connectionfactory
            {
                username = model.username,
                password = model.password,
                hostname = "localhost",
                port = model.port,
            };
            exchangename = model.exchangemodel.exchangename;
            routekey = model.exchangemodel.routekey;
            queuename = model.exchangemodel.queuename;
        }
        /// <summary>
        /// 创建连接
        /// </summary>
        public void connection()
        {
            try
            {
                //创建连接
                connetction = factory.createconnection();
                //创建信道
                channel = connetction.createmodel();
            }
            catch (exception ex)
            {
                console.writeline(ex.tostring());
            }
        }

        public void createqueuedir()
        {
            //定义一个direct类型的交换机
            channel.exchangedeclare(exchangename, exchangetype.direct, false, false, null);
            //定义一个队列
            channel.queuedeclare(queuename, false, false, false, null);
            //将队列绑定交换机
            channel.queuebind(queuename, exchangename, routekey, null);
        }public void sendmsg(string msg)
        {
            var sendbytes = encoding.utf8.getbytes(msg);
            channel.basicpublish(exchangename, routekey, null, sendbytes);
        }

        public void closechannel()
        {
            channel.close();
        }

        public void closeconnection()
        {
            connetction.close();
        }

        public string getmsg()
        {
            //事件基本消费者
            eventingbasicconsumer consumer = new eventingbasicconsumer(channel);
            string msg = null;
            //接收到消息事件
            consumer.received += (ch, ea) =>
            {
                var message = encoding.utf8.getstring(ea.body);
                msg = message;
                console.writeline($"收到消息: {message}");
                //确认该消息已被消费
                channel.basicack(ea.deliverytag, false);
            };
            //启动消费者 设置为手动应答消息
            channel.basicconsume(queuename, false, consumer);
            console.writeline("消费者已启动");
            console.readkey();
            closeconnection();
            closechannel();
            return msg;
        }


    }
}

4.创建direct模式发送类

using netcoretest.core.model;
using system;
using system.collections.generic;
using system.text;

namespace netcoretest.core.exchangetypemodel
{

    /// <summary>
    /// direct模式发送
    /// </summary>
    public class directpost
    {


        rabbitmqmodel rabbitmqmodel;

        public directpost()
        {
            hostmodel hostmodel = new hostmodel();
            hostmodel.username = "admin";
            hostmodel.password = "admin";
            hostmodel.host = "127.0.0.1";
            hostmodel.port = 5672;
            hostmodel.exchangemodel =new exchangemodel {
                exchangename = "clentname",
                queuename = "clent",
                routekey = "clentroute"
            };
            rabbitmqmodel = new rabbitmqmodel(hostmodel);
            rabbitmqmodel.connection();

        }
        public void createqueue()
        {
            rabbitmqmodel.createqueuedir();
        }
        public void sendmsg(string msg)
        {
            rabbitmqmodel.sendmsg(msg);
        }
        public void getmsg()
        {
            rabbitmqmodel.getmsg();
        }
    }
}

5.创建rabbitmqclient控制台应用程序

 

 

using netcoretest.core;
using netcoretest.core.exchangetypemodel;
using netcoretest.core.model;
using rabbitmq.client;
using system;

namespace rabbitmqclient
{
    class program
    {
        static void main(string[] args)
        {
            console.writeline("消息生产者开始生产数据!");
            console.writeline("输入exit退出!");
            directpost directpost = new directpost();
            directpost.createqueue();
            string input;
           
            do
            {
                input = console.readline();
                directpost.sendmsg(input);

            } while (input.trim().tolower() != "exit");


        }
    }
}

6.创建rabbitmqservice控制台应用程序

using netcoretest.core;
using netcoretest.core.exchangetypemodel;
using netcoretest.core.model;
using system;
using system.text;

namespace rabbitmqserver
{
    class program
    {
        static void main(string[] args)
        {
            console.writeline("hello world!");

            directpost directpost = new directpost();
            directpost.getmsg();
        

        }
    }
}

7.执行rabbitmqclient和rabbitmqserver