怎么行使RabbitMQ

网上参考大神们的博客,自个儿做了贰个RabbitMQ即时发新闻的德姆o。

话不多说,直接上代码!

注:那份文书档案是自作者和几个朋友学习后联合实现的。

美高梅开户网址 1

 

壹:搭建三个消除方案框架:RabbitMQ_Demo

目录

  • RabbitMQ 概念
  • exchange交换机机制
    • 怎么样是调换机
    • binding?
    • Direct Exchange交换机
    • Topic Exchange交换机
    • Fanout Exchange交换机
    • Header Exchange交换机
  • RabbitMQ 的 Hello – Demo(springboot实现)
  • RabbitMQ 的 Hello Demo(spring xml实现)
  • RabbitMQ 在生育条件下采纳和产出的题目
    • Spring RabbitMQ 注解
    • 消息的 JSON 传输
    • 消息持久化,断线重连,ACK。

1.引言

RabbitMQ——Rabbit Message
Queue的简写,但无法只是知道其为消息队列,信息代理更适用。RabbitMQ
是七个由 Erlang
语言开发的AMQP(高级音信队列协议)的开源实现,其内部结构如下:

美高梅开户网址 2

RabbitMQ作为1个音信代理,首要和消息顶牛,负责接收并转化音信。RabbitMQ提供了保证的新闻机制、跟踪机制和灵活的新闻路由,帮助信息集群和分布式陈设。适用于排队算法、秒杀活动、信息分发、异步处理、数据同步、处理耗费时间职务、CQLX570S等选用场景。

上边大家就来学学下RabbitMQ。

1.施用VS的NuGet安装包管理工科具安装RabbitMQ.Client:

当中饱含四个部分:

RabbitMQ 概念

RabbitMQ
即3个音信队列,最重倘若用来落到实处应用程序的异步和解耦,同时也能起到消息缓冲,新闻分发的成效。RabbitMQ使用的是AMQP协议,它是一种贰进制协议。暗中认可运维端口
5672。

在 RabbitMQ 中,如下图结构:

美高梅开户网址 3

rabbitmq

  • 左边 P 代表 生产者,也正是往 RabbitMQ 发消息的顺序。
  • 高级中学档就是 RabbitMQ,中间囊括了 交流机 和 队列。
  • 左边 C 代表 消费者,也等于往 RabbitMQ 拿音讯的主次。

那么,在那之中相比较重要的定义有 多少个,分别为:虚拟主机,沟通机,队列,和绑定。

  • 虚拟主机:七个虚拟主机持有1组调换机、队列和绑定。为何需求多少个虚拟主机呢?一点也不细略,RabbitMQ其中,用户只幸亏虚拟主机的粒度进行权力决定。
    因而,尽管须要禁止A组访问B组的调换机/队列/绑定,必须为A和B分别创制2个虚拟主机。每1个RabbitMQ服务器都有2个默许的虚拟主机“/”。
  • 交换机:Exchange 用于转载音信,不过它不会做存款和储蓄 ,即使未有Queue bind 到 Exchange 的话,它会平素放任掉 Producer
    发送过来的音信。

    • 那里有1个相比重要的定义:***路由键 ***
      。信息到调换机的时候,交互机会转载到相应的行列中,那么究竟转载到哪些队列,就要依据该路由键。
  • 绑定:也正是调换机必要和队列相绑定,那之中如上海体育场合所示,是多对多的涉及。

二. 条件搭建

正文首要基于Windows下行使Vs Code 基于.net
core实行demo演示。开首从前大家要求预备好之下条件。

  • 安装Erlang运维环境
    下载安装Erlang。
  • 安装RabbitMQ
    下载安装Windows版本的RabbitMQ。
  • 启动RabbitMQ Server
    点击Windows先导按钮,输入RabbitMQ找到RabbitMQ Comman Prompt,以管理人身份运营。
  • 次第执行以下命令运营RabbitMQ服务

    rabbitmq-service install
    rabbitmq-service enable
    rabbitmq-service start
    
  • 执行rabbitmqlctl status检查RabbitMQ状态

  • 安装管理平台插件
    执行rabbitmq-plugins enable rabbitmq_management即可成功安装,使用暗中认可账号密码(guest/guest)登录即可。

美高梅开户网址 4

1:RabbitMQ 公用类库项目

exchange调换机机制

3. Hello RabbitMQ

在起来在此以前大家先来驾驭下新闻模型:
美高梅开户网址 5
消费者(consumer)订阅某些队列。生产者(producer)创设消息,然后公布到行列(queue)中,队列再将音信发送到监听的主顾。

上面我们大家经过demo来明白RabbitMQ的为主用法。

 

2:1个劳动者控制台项目

怎么是沟通机

rabbitmq的message
model实际上海消防息不直接发送到queue中,中间有多少个exchange是做消息分发,producer甚至不知道音信发送到那一个队列中去。由此,当exchange收到message时,必须精确理解该怎么分发。是append到自然规则的queue,依然append到七个queue中,照旧被抛弃?那几个规则都以通过exchagne的四种type去定义的。

The core idea in the messaging model in RabbitMQ is that the producer
never sends any messages directly to a queue. Actually, quite often
the producer doesn’t even know if a message will be delivered to any
queue at all.

Instead, the producer can only send messages to an exchange. An
exchange is a very simple thing. On one side it receives messages from
producers and the other side it pushes them to queues. The exchange
must know exactly what to do with a message it receives. Should it be
appended to a particular queue? Should it be appended to many queues?
Or should it get discarded. The rules for that are defined by the
exchange type.

exchange是二个消息的agent,每五个虚构的host中都有定义。它的职分是把message路由到不相同的queue中。

三.1.音信的发送和接收

怎么行使RabbitMQ。始建RabbitMQ文件夹,打开命令提醒符,分别创立多少个控制台项目Send、Receive。

dotnet new console --name Send //创建发送端控制台应用
cd Send //进入Send目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

dotnet new console --name Receive //创建接收端控制台应用
cd Receive //进入Receive目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

作者们先来添加音讯发送端逻辑:

//Send.cs 
public static void Main(string[] args)
{
    //1.1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构建byte消息数据包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 发送数据包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

再来完善音讯接收端逻辑:

//Receive.cs  省略部分代码
public static void Main()
{
    //1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模拟耗时
                Console.WriteLine (" [x] Done");
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先运维消息接收端,再运维新闻发送端,结果如下图。

美高梅开户网址 6

从地点的代码中得以看看,发送端和消费端的代码前四步都以相同的。主要的区别在于发送端调用channel.BasicPublish艺术发送消息;而接收端供给实例化贰个EventingBasicConsumer实例来展开音信处理逻辑。其它一些急需留意的是:音讯接收端和出殡和埋葬端的队列名称(queue)必须保持1致,那里钦定的行列名称叫hello。

二.劳动者端代码:

三:八个顾客控制台项目

binding?

exchange和queue通过routing-key关联,那两者之间的涉及是正是binding。如下图所示,X表示调换机,砖红代表队列,沟通机通过三个routing-key去binding七个queue,routing-key有何效果吗?看Direct
exchange类型交换机。

美高梅开户网址 7

三.2. 循环调度

利用工作行列的便宜便是它亦可互为的处理队列。假设堆积了重重任务,大家只须要加上更加多的劳重力(workers)就足以了。我们先运行三个接收端,等待音讯接收,再起步三个发送端进行音信发送。

美高梅开户网址 8

咱俩扩张运营一个消费端后的运营结果:

美高梅开户网址 9

从图中可见,大家循环境与发展送肆条音信,八个新闻接收端按梯次被循环分配。
暗许情况下,RabbitMQ将按梯次将每条新闻发送给下一个买主。平均每一个顾客将获得1致数量的新闻。那种分发音信的主意叫做循环(round-robin)。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Producter
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName="localhost",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31         static void Main(string[] args)
32         {
33             DirectExchangeSendMsg();
34             Console.WriteLine("按任意键退出程序!");
35             Console.ReadKey();
36         }
37         /// <summary>
38         /// 单点精确路由模式
39         /// </summary>
40         private static void DirectExchangeSendMsg()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     var props = channel.CreateBasicProperties();
51                     props.Persistent = true;
52                     Console.WriteLine("请输入需要发送的消息:");
53                     string vadata = Console.ReadLine();
54                     while (vadata != "exit")
55                     {
56                         var msgBody = Encoding.UTF8.GetBytes(vadata);
57                         channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
58                         Console.WriteLine(string.Format("发送时间:{0},发送完毕,输入exit退出消息发送", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
59                         vadata = Console.ReadLine();
60                     }
61                 }
62             }
63         }
64     }
65 }

花色布局如图:

Directed Exchange

路由键exchange,该调换机械收割到消息后会把消息发送到钦赐routing-key的queue中。这消息交流机是怎么了解的呢?其实,producer
deliver音信的时候会把routing-key add到 message
header中。routing-key只是一个messgae的attribute。

A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message.

Default Exchange
那种是例外的Direct
Exchange,是rabbitmq内部暗中认可的贰个调换机。该沟通机的name是空字符串,全部queue都暗中认可binding
到该交流机上。全部binding到该交流机上的queue,routing-key都和queue的name一样。

三.三. 音信确认

根据我们地点的demo,一旦RabbitMQ将消息发送到消费端,新闻就会即时从内部存款和储蓄器中移出,无论消费端是还是不是处理到位。在那种情景下,音信就会丢掉。

为了保障2个新闻永远不会丢掉,RabbitMQ扶助音讯确认(message
acknowledgments)
。当消费端接收音讯还要处理完了后,会发送2个ack(消息确认)非信号到RabbitMQ,RabbitMQ接收到那些时域信号后,就足以去除掉那条已经处理的新闻任务。但1旦消费端挂掉了(比如,通道关闭、连接丢失等)没有发送ack实信号。RabbitMQ就会驾驭有个别新闻尚未例行处理,RabbitMQ将会重新将新闻入队,要是有别的三个消费端在线,就会急速的再度发送到别的贰个消费端。

RabbitMQ中向来不音信超时的概念,唯有当消费端关闭或奔溃时,RabbitMQ才会另行分发音讯。

微调下Receive中的代码逻辑:

 //5. 构造消费者实例
 var consumer = new EventingBasicConsumer(channel);
 //6. 绑定消息接收后的事件委托
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模拟耗时
     Console.WriteLine(" [x] Done");
     // 7. 发送消息确认信号(手动消息确认)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 启动消费者
 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
 //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

重视改动的是将
autoAck:true修改为autoAck:fasle,以及在音讯处理实现后手动调用BasicAck主意举办手动新闻确认。

美高梅开户网址 10

从图中可见,音信发送端连接发送4条新闻,个中消费端一先被分配处理第3条消息,消费端二被循环分配第1条音信,第二条新闻由于尚未空闲消费者如故在队列中。
在开销端贰未处理完第2条新闻从前,手动中断(ctrl+c)。大家得以窥见RabbitMQ在下贰回分发时,会事先将被搁浅的消息分发给消费端1甩卖。

叁.买主端代码:

美高梅开户网址 11

Topic Exchange

通配符调换机,exchange会把消息发送到一个还是多少个满意通配符规则的routing-key的queue。其中*表号相称三个word,#相称多少个word和途径,路径之间通过.隔断。如餍足a.*.c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

三.四. 音信持久化

音讯确认确认保障了就算消费端至极,音信也不会丢掉可以被重新分发处理。但是壹旦RabbitMQ服务端极度,音讯依然会丢掉。除非大家内定durable:true,不然当RabbitMQ退出或奔溃时,音讯将依旧会丢掉。通过点名durable:true,并指定Persistent=true,来报告RabbitMQ将消息持久化。

//send.cs
//4. 申明队列(指定durable:true,告知rabbitmq对消息进行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 构建byte消息数据包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 发送数据包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

将新闻标记为持久性不能够一心保证消息不会丢掉。即使它告诉RabbitMQ将新闻保存到磁盘,不过当RabbitMQ接受消息还要还未曾保留时​​,依旧有1个极短的时光窗口。RabbitMQ
也许只是将音信保存到了缓存中,并从未将其写入到磁盘上。持久化是不可能肯定保险的,可是对于一个简约职责队列来说已经足足。假诺急需确认保证新闻队列的持久化,能够使用publisher
confirms.

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Consumer
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName = "127.0.0.1",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31 
32         static void Main(string[] args)
33         {
34             DirectAcceptExchange();
35 
36             Console.WriteLine("输入任意值退出程序!");
37             Console.ReadKey();
38         }
39 
40         private static void DirectAcceptExchange()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     while (true)
51                     {
52                         BasicGetResult msgResponse = channel.BasicGet(QueueName, autoAck: false);
53                         if (msgResponse != null)
54                         {
55                             var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
56                             Console.WriteLine(string.Format("接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
57                         }
58                         //System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
59                     }
60                 }
61             }
62         }
63     }
64 }

 

Fanout Exchange

扇形沟通机,该交换机会把音信发送到全部binding到该交流机上的queue。那种是publisher/subcribe格局。用来做广播最佳。
有着该exchagne上钦定的routing-key都会被ignore掉。

The fanout copies and routes a received message to all queues that are
bound to it regardless of routing keys or pattern matching as with
direct and topic exchanges. Keys provided will simply be ignored.

3.5. 公正分发

RabbitMQ的音信分发私下认可依照消费端的数据,按梯次循环分发。那样仅是保障了消费端被平均分发音信的数量,但却忽略了消费端的闲忙情状。这就也许出现某些消费端直接处理耗费时间任务处于阻塞状态,有个别消费端直接处理一般任务处于空置状态,而只是它们分配的职分数量一样。

美高梅开户网址 12

但大家得以经过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来告诉RabbitMQ,在未收打消费端的消息确认时,不再分发音信,也就确定保障了当消费端处于辛苦景观时,不再分配职责。

//Receive.cs
//4. 申明队列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

那时候你须求专注的是只要拥有的消费端都处于勤奋景色,你的行列只怕会被塞满。你供给留意这点,要么添加越来越多的消费端,要么选择任何策略。

4.先后结果:

2:开发以前,需求引用RabbitMQ包

Header Exchange

安装header attribute参数类型的沟通机。

4. Exchange

有心人的您或许发现上边的demo,生产者和顾客直接是通过1样队列名称举办相配衔接的。消费者订阅有个别队列,生产者创制消息公布到行列中,队列再将音讯转载到订阅的顾客。那样就会有二个局限性,即消费者三回只可以发送音信到某叁个行列。

那消费者怎么样才能发送音讯到多个消息队列呢?
RabbitMQ提供了Exchange,它相仿于路由器的效果,它用来对消息进行路由,将新闻发送到五个连串上。Exchange一方面从劳动者接收音讯,另一方面将消息推送到行列。但exchange必须精通怎么着处理接收到的音信,是将其附加到特定队列照旧外加到七个种类,仍然一贯忽略。而这个规则由exchange
type定义,exchange的规律如下图所示。
美高梅开户网址 13

周边的exchange type 有以下两种:

  • direct(鲜明的路由规则:消费端绑定的种类名称必须和音讯发表时钦赐的路由名称1致)
  • topic (情势相称的路由规则:扶助通配符)
  • fanout (音信广播,将音讯分发到exchange上绑定的具有队列上)

上边大家就来挨家挨户那介绍它们的用法。

美高梅开户网址 14

设置相应的Nuget包,只怕下载相关dll也能够,可是提议在线安装nuget,更有益于

RabbitMQ 的 Hello Demo

设置就不说了,建议根据合法文书档案上做。先贴代码,稍后解释,代码如下:

铺排 沟通机,队列,交换机与队列的绑定,音讯监视容器:

@Configuration
@Data
public class RabbitMQConfig {

    final static String queueName = "spring-boot";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("spring-boot-exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }
    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

配备接收消息者(即消费者):

public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

计划发送音信者(即生产者):

@RestController
public class Test {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping(value = "/test/{abc}",method = RequestMethod.GET)
    public String test(@PathVariable(value = "abc") String abc){
        rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!");
        return  "abc";
    }
}

以上便可达成三个简单易行的 RabbitMQ
德姆o,具体代码在:点这里

这正是说,那里,分为多个部分分析:发音讯,沟通机队列,收音讯。

  • 对此发送音信:大家一般能够动用 RabbitTemplate,这几个是 Spring
    封装给了大家,便于大家发送音讯,大家调用
    rabbitTemplate.convertAndSend("spring-boot", xxx); 即可发送音信。
  • 对此沟通机队列:如上代码,我们需求配置交流机
    TopicExchange,配置队列 Queue,并且配备他们中间的绑定 Binding
  • 对于收受音讯:首先须要创造八个新闻监听容器,然后把我们的接受者注册到该容器中,那样,队列中有音信,那么就会调用接收者的呼应的不二诀要。如上代码
    container.setMessageListener(listenerAdapter);
    在那之中,MessageListenerAdapter 可以看做是
    我们接收者的一个包裹类,new MessageListenerAdapter(receiver, "receiveMessage");
    指明了一旦有消息来,那么调用接收者哪个方法进行处理。

4.1 fanout

本着由表及里的合计,大家先来理解下美高梅开户网址 ,fanout的广播路由体制。fanout的路由机制如下图,即发送到
fanout 类型exchange的新闻都会散发到全数绑定该exchange的队列上去。

美高梅开户网址 15

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到指定exchange,fanout类型无需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);

顾客示例代码:

//申明fanout类型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到指定fanout类型exchange,无需指定路由键
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");

 

摸索:RabbitMQ.Client  
安装新型版即可,不知情怎么设置nuget,请移步 

RabbitMQ 的 Hello Demo(spring xml实现)

spring
xml格局落成RabbitMQ简单,可读性较好,配置简单,配置和落到实处如下所示。

上文已经讲述了rabbitmq的布署,xml形式通过properites文件存放用户配置音信:

mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672

配置application-mq.xml配置文件,注脚连接、沟通机、queue以及consumer监听。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
    <description>rabbitmq 连接服务配置</description>

    <!-- 连接配置 -->
    <context:property-placeholder location="classpath:mq.properties" />
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- spring template声明-->
    <rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory" />

    <!--申明queue-->
    <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
    <!--申明exchange交换机并绑定queue-->
    <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_key" key="test_queue_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <!--consumer配置监听-->
    <bean id="reveiver" class="com.demo.mq.receive.Reveiver" />
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="test_queue_key" ref="reveiver" method="receiveMessage"/>
    </rabbit:listener-container>
</beans>

上述代码中,引进properties文件就不多说了。

<rabbit:connection-factory>标签表明成立connection的factory工厂。

<rabbit-template>声明spring
template,和上文spring中使用template一样。template可声明exchange。

<rabbit:queue>声称3个queue并安装queue的安排项,直接看标签属性就能够明白queue的配置项。

<rabbit:direct-exchange>表明沟通机并绑定queue。

<rabbit:listener-container>表明监听container并布署consumer和监听routing-key。

结余就总结了,application-context.xml中把rabbitmq配置import进去。

<?xml version="1.0" encoding="UTF-8"?>
<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:task="http://www.springframework.org/schema/task"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

    <context:component-scan base-package="com.demo.**" />
    <import resource="application-mq.xml" />
</beans>

Producer实现,发送新闻依然选拔template的convertAndSend() deliver音讯。

@Service
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    private final static Logger logger = LoggerFactory.getLogger(Producer.class);

    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(queueKey, object);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("exeception={}",e);
        }

    }
}

配置consumer

package com.demo.mq.receive;

import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;

@Service
public class Reveiver {
    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("reveice msg=" + message.toString());
        latch.countDown();
    }
}

测试deliver消息

Controller
@RequestMapping("/demo/")
public class TestController {
    private final static Logger logger = LoggerFactory.getLogger(TestController.class);
    @Resource
    private Producer producer;


    @RequestMapping("/test/{msg}")
    public String send(@PathVariable("msg") String msg){
        logger.info("#TestController.send#abc={msg}", msg);
        System.out.println("msg="+msg);
        producer.sendDataToQueue("test_queue_key",msg);
        return "index";
    }
}

4.2. direct

direct相对于fanout就属于完全合营、单播的方式,路由体制如下图,即队列名称和音讯发送时内定的路由完全合营时,新闻才会发送到钦定队列上。
美高梅开户网址 16

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到direct类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);

消费者示例代码:

//申明direct类型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//绑定队列到direct类型exchange,需指定路由键routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");

注:在第二步事先,你须要安装RabbitMQ客户端,可从

安装好之后,早先激动的写代码啦!

RabbitMQ 在生产环境下利用和产出的难点

在生产条件中,由于 Spring 对 RabbitMQ
提供了有的有益于的注释,所以率先能够利用那么些申明。例如:

  • @EnableRabbit:@EnableRabbit 和 @Configuration
    评释在1个类中组成使用,借使此类能够回到一个RabbitListenerContainerFactory 类型的
    bean,那么就约等于能够把该终端(消费端)和 RabbitMQ
    实行延续。Ps:(生成端不是透过 RabbitListenerContainerFactory 来和
    RabbitMQ 连接,而是经过 RabbitTemplate )
  • @RabbitListener:当对应的连串中有消息的时候,该阐明修饰下的法门会被执行。
  • @RabbitHandler:接收者能够监听多少个系列,分歧的系列音信的项目恐怕两样,该表明能够使得不一致的消息让不相同措施来响应。

具体那些表明的使用,可以参考那里的代码:点这里

首先,生产环境下的 RabbitMQ
恐怕不会在劳动者可能消费者本机上,所以必要再行定义
ConnectionFactory,即:

@Bean
ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
    connectionFactory.setUsername(userName);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(vhost);
    return connectionFactory;
}

此处,能够另行设置须要一而再的 RabbitMQ 的
ip,端口,虚拟主机,用户名,密码。

下一场,能够先从生产端思考,生产端必要三番五次 RabbitMQ,那么能够透过
RabbitTemplate 进行连接。 Ps:(RabbitTemplate
用于生产端发送音信到沟通机中),如下代码:

@Bean(name="myTemplate")
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(integrationEventMessageConverter());
    template.setExchange(exchangeName);
    return template;
}

在该代码中,new RabbitTemplate(connectionFactory);
设置了生产端连接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter());
设置了 生产端发送给调换机的音讯是以如何格式的,在
integrationEventMessageConverter() 代码中:

public MessageConverter integrationEventMessageConverter() {
    Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
    return messageConverter;
}

如上 Jackson2JsonMessageConverter 指明了 JSON。上述代码的末段
template.setExchange(exchangeName); 指明了
要把劳动者要把音信发送到哪个交流机上。

有了上述,那么,大家即可使用
rabbitTemplate.convertAndSend("spring-boot", xxx); 发送音讯,xxx
表示任意档次,因为上述的装置会帮大家把这么些品种转化成 JSON 传输。

随之,生产端发送我们说过了,那么以往能够看看消费端:

对此消费端,大家得以只创建
SimpleRabbitListenerContainerFactory,它能够帮我们转移
RabbitListenerContainer,然后大家再利用 @RabbitListener
钦点接收者收到音讯时处理的主意。

@Bean(name="myListenContainer")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setMessageConverter(integrationEventMessageConverter());
    factory.setConnectionFactory(connectionFactory());
    return factory;
}

这其中
factory.setMessageConverter(integrationEventMessageConverter());
钦点了大家接受音讯的时候,以 JSON
传输的消息能够转换到对应的档次传入到方法中。例如:

@Slf4j
@Component
@RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot")
public class Receiver {
    @RabbitHandler
    public void receiveTeacher(Teacher teacher) {
        log.info("##### = {}",teacher);
    }
}

或是出现的题材:

4.3. topic

topic是direct的晋升版,是壹种模式匹配的路由机制。它协助使用二种通配符来开始展览形式相称:符号#和符号*。其中*相称四个单词,
#则象征相配0个或多个单词,单词之间用.细分。如下图所示。
美高梅开户网址 17

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到topic类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);

顾客示例代码:

//申明topic类型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到topic类型exchange,需指定路由键routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");

        不过RabbitMQ又是借助于Erlang
OTP平台,所以,安装RabbitMQ在此以前,须要先从

 

新闻持久化

在生产条件中,大家需求考虑万一劳动者挂了,消费者挂了,恐怕 rabbitmq
挂了怎么。一般的话,借使劳动者挂了只怕消费者挂了,其实是尚未影响,因为新闻就在队列之中。那么万壹rabbitmq
挂了,此前在队列之中的新闻如何做,其实能够做新闻持久化,RabbitMQ
会把音信保存在磁盘上。

做法是足以先从 Connection 对象中获得2个 Channel
信道对象,然后再能够因此该指标设置 音讯持久化。

5. RPC

QashqaiPC——Remote Procedure Call,远程进程调用。
这RabbitMQ怎么样实行远程调用呢?示意图如下:
美高梅开户网址 18
首先步,首借使开始展览长途调用的客户端须求钦赐接收远程回调的体系,并注脚消费者监听此行列。
第三步,远程调用的服务端除了要注脚消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的行列中去。

远程调用客户端:

 //申明唯一guid用来标识此次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息唯一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //创建消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

长途调用服务端:

//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

       
关于那部分的情节,推荐阅读:

三:达成RabbitMQ基本收发信息的效应

生产者大概消费者断线重连

此处 Spring 有自动重连机制。

6. 总结

基于上边的demo和对两种区别exchange路由体制的就学,大家发现RabbitMQ主纵然涉嫌到以下多少个为主概念:

  1. Publisher:生产者,音讯的发送方。
  2. Connection:互联网连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:沟通器(路由器),负责消息的路由到相应队列。
  5. Binding:队列与沟通器间的涉嫌绑定。消费者将关怀的类别绑定到钦赐调换器上,以便Exchange能准确分发新闻到内定队列。
  6. Queue:队列,音讯的缓冲存款和储蓄区。
  7. Virtual
    Host:虚拟主机,虚拟主机提供财富的逻辑分组和分手。包罗连接,沟通,队列,绑定,用户权限,策略等。
  8. Broker:新闻队列的服务器实体。
  9. Consumer:消费者,消息的接收方。

这一次作为入门就讲到那里,下次我们来教学下EventBus +
RabbitMQ
什么样促成事件的分发。

参考资料:
RabbitMQ Tutorials
Demo路径——RabbitMQ

 

1:在RabbitMQ_Lib类库中新建类:MyRabbitMQ.cs

ACK 确认机制

各类Consumer也许必要壹段时间才能处理完收到的数据。假若在那几个进度中,Consumer出错了,极度退出了,而数据还并未有处理到位,那么
非凡不幸,那段数据就丢掉了。因为我们应用no-ack的不二秘籍开展确认,也正是说,每一遍Consumer接到数据后,而任由是不是处理落成,RabbitMQ Server会马上把那一个Message标记为形成,然后从queue中删去了。

若果三个Consumer分外退出了,它处理的数量可见被此外的Consumer处理,那样数据在那种情景下就不会丢掉了(注意是那种地方下)。
为了保障数据不被遗失,RabbitMQ支持音讯确认机制,即acknowledgments。为了保障数据能被正确处理而不仅是被Consumer收到,那么大家无法运用no-ack。而应该是在拍卖完数据后发送ack。

在处理数量后发送的ack,正是告诉RabbitMQ数据现已被吸收,处理完了,RabbitMQ能够去安全的删减它了。
倘若Consumer退出驾驭则并未有发送ack,那么RabbitMQ就会把那么些Message发送到下七个Consumer。这样就确定保证了在Consumer极度退出的图景下数据也不会丢掉。

  此德姆o只是‘direct’格局的音信发送接收方式。

美高梅开户网址 19美高梅开户网址 20

村办对 RabbitMQ ACK 的1对疑云,求助:点这里

public class MyRabbitMQ
    {
        //连接工厂
        private ConnectionFactory factory { get; set; } = new ConnectionFactory
        {
            HostName = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "guest"
        };

        private IConnection connection { get; set; }

        private IModel channel { get; set; }

        //生产方 构造函数
        public MyRabbitMQ(string exchangeName = "",string exchangeType = ExchangeType.Direct)
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个转发器
            channel.ExchangeDeclare(exchangeName, exchangeType);
        }

        //消费方 构造函数
        public MyRabbitMQ(string exchangeName = "", string queueName = "", string routingKey = "")
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个队列
            channel.QueueDeclare(queueName, true, false, false);

            //队列绑定
            channel.QueueBind(queueName, exchangeName, routingKey);
        }

        public void SendMessage(string message="", string exchangeName = "", string routingKey = "")
        {
            channel.BasicPublish(exchangeName, routingKey, null, Encoding.UTF8.GetBytes(message));
        }

        public QueueingBasicConsumer ReceiveMessage(string queueName = "")
        {
            //EventingBasicConsumer
            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, true, consumer);
            return consumer;
        }
    }

总结

  1. RabbitMQ 功能:异步,解耦,缓冲,新闻分发。
  2. RabbitMQ 首要分为贰个部分,生产者,交流机和队列,消费者。
  3. 亟待留意新闻持久化,目标为了幸免 RabbitMQ 宕机;考虑 ACK
    机制,目标为了假如买主对新闻的处理战败了,那么继续要怎么处理。

View Code

写在终极

  1. 写出来,说出来才精晓对不对,知道不对才能校订,校订了才能成长。
  2. 在技能上面,希望大家眼里都容不得沙子。假若有畸形的地点依然须求创新的地点希望能够提出,非常谢谢。

能够见见,首先有八个构造函数,三个是给劳动者接纳,一个是给买主应用,注意参数有所差别,能够见到生产者与顾客关切的点是不一样的。

不管生产恐怕开销,都是几个客户端,都供给成立一个RabbitMQ连接并创立三个channel,才得以展开连锁的操作,那些操作都以由channel发起的,那样说应该比较白话了。

组织生产者的时候,主倘使成立一个转载器,转载器的名字及项目须求定义,

转载器常用类型包罗两种:direct、fanout、topic,

那二种档次那里说的更明白:

本例子中是以topic为例子的

 

2:MQ_Producter项目中发送新闻(生产者中发送音讯)

美高梅开户网址 21美高梅开户网址 22

class Program
    {
        static void Main(string[] args)
        {
            string exchangeName = "07281616_exchange_topic";
            string routingkeya = "0728.a.c.routingkey";
            string routingkeyb = "0728.b.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, ExchangeType.Topic);
            for (int i = 0; i < 3600; i++)
            {
                System.Threading.Thread.Sleep(1000);
                if (i % 2 == 0)
                {
                    var message = $"{routingkeyb} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeyb}");
                    myMQ.SendMessage(message, exchangeName, routingkeyb);
                }
                else
                {
                    var message = $"{routingkeya} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeya}");
                    myMQ.SendMessage(message, exchangeName, routingkeya);
                }

            }
        }
    }

View Code

此处发送了3600次,每秒发三遍音信,奇数和偶数发送的路由规则不一致,会有多个分化的客户端来接受,那样便于大家测试新闻是还是不是被分发到了分歧的连串上

 

3:多少个买主项目进展消息的接收

消费者一:

美高梅开户网址 23美高梅开户网址 24

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281616_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody}  (only for {routingRule})");
            }
        }
    }

View Code

 

买主2:

美高梅开户网址 25美高梅开户网址 26

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281626_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.a.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody} (only for {routingRule})");
            }
        }
    }

View Code

 

四个买主分别接收分化队列上的音讯

 

4:运行!

先编写翻译一下,到bin目录下先运营生产者,在运作八个顾客

美高梅开户网址 27

 

也足以先关掉消费端,过7秒再关闭生产端,在web
管理界面能够见到明日有三个系列里有新闻,3个三条一个肆条

美高梅开户网址 28

 

 四:总结

壹体化例子的持有代码都在这里了,代码里相关心释也很精晓,是自家自个儿完结的率先个RabbitMQ收发功能,实际运用中必将能够有千千万万恢弘,新手们有疑忌大概自身晓得的有狼狈的地方,烦请评论处提议哈,大家共同提升!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图