RabbitMQ-Direct模式

2019-05-17 00:01:04来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

简介

 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息中间件,基于Erlang语言编写。

AMQP是什么

AMQP 0-9-1(高级消息队列协议)是一种消息传递协议,它允许一致的客户端应用程序与一致的消息传递中间件代理进行通信。

消息传递代理接收来自发布者(发布它们的应用程序,也称为生产者)的消息,并将它们路由到消费者(处理它们的应用程序)。

 由于它是一个网络协议,发布者、消费者和代理都可以驻留在不同的机器上。

 AMQP 0-9-1模型简介

AMQP 0-9-1模型具有以下世界视图:消息发布到交换,这通常与邮局或邮箱进行比较。交换然后使用名为绑定的规则将消息副本分发到队列。然后,代理将消息传递给订阅队列的消费者,或者消费者根据需要从队列获取/拉取消息。

 

发布消息时,发布者可以指定各种消息属性(消息元数据)。有些元数据可以由代理使用,但是,其余的元数据对代理是完全不透明的,只能由接收消息的应用程序使用。

 网络不可靠,应用程序可能无法处理消息,因此AMQP 0-9-1模型具有消息确认的概念:当消息传递给消费者时,消费者会自动或在应用程序开发人员选择时立即通知代理。当消息确认正在使用时,代理将仅在收到消息(或消息组)通知时从队列中完全删除消息。

 例如,在某些情况下,当消息无法路由时,消息可能会返回给发布者、丢弃,或者,如果代理实现扩展,则将消息放入所谓的“死信队列”。发布者通过使用某些参数发布消息来选择如何处理这种情况。

 队列、交换和绑定统称为AMQP实体。

 

交换和交换类型

交换机是发送消息的实体,交换机接收消息并将消息路由到零个或者多个队列当中,使用的路由算法取决于绑定的交换类型和规则,因此AMQP 0-9-1提供了以下四种交换类型:

  • Direct exchange    
  • Fanout exchange  
  • Topic exchange      
  • Headers exchang    

除了交换类型之外,还使用许多属性声明交换其中最重要的是:

  • 耐久性(Durability)       :交易所在经纪人重启后仍能存活
  • 自动删除(Auto-delete):当最后一个队列与其解除绑定时,将删除Exchange
  • 参数(arguments)         :可选,由插件和特定于代理的功能使用

交换可以是持久的,也可以是暂时的。持久性交易所能在经纪重启后存活下来,而短暂性交易所则不能(当经纪重新上线时,必须重新申报)。并非所有的场景和用例都需要持久的交换。

 

本文主要记录了Direct模式学习RabbitMQ

 

P:(producling)生产者,生产只意味着发送消息。

 

Q: (queue_name)队列,队列是位于rabbitmq中的post box的名称

 

C: (Consuming)消费者,消费者主要是等待接收消息的程序

 

 

开发准备

  • netCoreTset.core:该工程主要封装了RabbitMQ的公用方法
  • RabbitMQClient    :该工程为生产者
  • RabbitMQServer  :该工程为消费者

 

1.创建netCoreTset.core类库项目

 

1.1 安装项目依赖

 

2.定义接口

using netCoreTest.core.Model;
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.Iserver
{
    public interface IConnectionServer
    {
      
        /// <summary>
        /// 连接服务
        /// </summary>
        void Connection();
        /// <summary>
        /// 创建消息队列
        /// </summary>
        /// <param name="queName">队列名称</param>
        void CreateQueueDir();
        /// <summary>
        /// 关闭连接
        /// </summary>
        void CloseConnection();
        /// <summary>
        /// 关闭通道
        /// </summary>
        void CloseChannel();


    }
}
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.Iserver
{
    public interface IMessageService
    {
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="msg">消息内容</param>
        void SendMsg(string msg);
        /// <summary>
        /// 获取消息
        /// </summary>
        /// <returns></returns>
        string GetMsg();
    }
}
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.Iserver
{
   public interface IRabbitMqService:IMessageService,IConnectionServer
    {
    }
}

 

 3.编写RabbitMQ辅助类

using netCoreTest.core.Iserver;
using netCoreTest.core.Model;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core
{
    public class RabbitMQModel : IRabbitMqService
    {

        private readonly ConnectionFactory factory = null;
        private IModel channel;
        private IConnection connetction;
        readonly string exchangeName;//交换机名称
        readonly string routeKey;//路由名称
        readonly string queueName;///队列名称
        public RabbitMQModel(HostModel model)
        {
            /// <summary>
            /// 创建连接工厂
            /// </summary>
            factory = new ConnectionFactory
            {
                UserName = model.UserName,
                Password = model.PassWord,
                HostName = "localhost",
                Port = model.Port,
            };
            exchangeName = model.ExChangeModel.ExChangeName;
            routeKey = model.ExChangeModel.RouteKey;
            queueName = model.ExChangeModel.QueueName;
        }
        /// <summary>
        /// 创建连接
        /// </summary>
        public void Connection()
        {
            try
            {
                //创建连接
                connetction = factory.CreateConnection();
                //创建信道
                channel = connetction.CreateModel();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }

        public void CreateQueueDir()
        {
            //定义一个direct类型的交换机
            channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
            //定义一个队列
            channel.QueueDeclare(queueName, false, false, false, null);
            //将队列绑定交换机
            channel.QueueBind(queueName, exchangeName, routeKey, null);
        }public void SendMsg(string msg)
        {
            var sendBytes = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
        }

        public void CloseChannel()
        {
            channel.Close();
        }

        public void CloseConnection()
        {
            connetction.Close();
        }

        public string GetMsg()
        {
            //事件基本消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            string msg = null;
            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                msg = message;
                Console.WriteLine($"收到消息: {message}");
                //确认该消息已被消费
                channel.BasicAck(ea.DeliveryTag, false);
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume(queueName, false, consumer);
            Console.WriteLine("消费者已启动");
            Console.ReadKey();
            CloseConnection();
            CloseChannel();
            return msg;
        }


    }
}

4.创建direct模式发送类

using netCoreTest.core.Model;
using System;
using System.Collections.Generic;
using System.Text;

namespace netCoreTest.core.ExchangeTypeModel
{

    /// <summary>
    /// Direct模式发送
    /// </summary>
    public class DirectPost
    {


        RabbitMQModel rabbitMQModel;

        public DirectPost()
        {
            HostModel hostModel = new HostModel();
            hostModel.UserName = "admin";
            hostModel.PassWord = "admin";
            hostModel.Host = "127.0.0.1";
            hostModel.Port = 5672;
            hostModel.ExChangeModel =new ExChangeModel {
                ExChangeName = "ClentName",
                QueueName = "Clent",
                RouteKey = "ClentRoute"
            };
            rabbitMQModel = new RabbitMQModel(hostModel);
            rabbitMQModel.Connection();

        }
        public void CreateQueue()
        {
            rabbitMQModel.CreateQueueDir();
        }
        public void SendMsg(string msg)
        {
            rabbitMQModel.SendMsg(msg);
        }
        public void GetMsg()
        {
            rabbitMQModel.GetMsg();
        }
    }
}

5.创建RabbitMQClient控制台应用程序

 

 

using netCoreTest.core;
using netCoreTest.core.ExchangeTypeModel;
using netCoreTest.core.Model;
using RabbitMQ.Client;
using System;

namespace RabbitMQClient
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("消息生产者开始生产数据!");
            Console.WriteLine("输入exit退出!");
            DirectPost directPost = new DirectPost();
            directPost.CreateQueue();
            string input;
           
            do
            {
                input = Console.ReadLine();
                directPost.SendMsg(input);

            } while (input.Trim().ToLower() != "exit");


        }
    }
}

6.创建RabbitMQService控制台应用程序

using netCoreTest.core;
using netCoreTest.core.ExchangeTypeModel;
using netCoreTest.core.Model;
using System;
using System.Text;

namespace RabbitMQServer
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");

            DirectPost directPost = new DirectPost();
            directPost.GetMsg();
        

        }
    }
}

7.执行RabbitMQclient和RabbitMQserver

 


原文链接:https://www.cnblogs.com/zhengyazhao/p/10869982.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:【转载】Asp.Net MVC网站提交富文本HTML标签内容抛出异常

下一篇:【转载】IIS一个网站如何绑定多个主机域名