消息队列

RabbitMQ队列

第二大家在讲rabbitMQ在此以前我们要说一下python里的queue:贰者干的事体是相同的,都以队列,用于传递信息

在python的queue中有七个3个是线程queue,一个是进程queue(multiprocessing中的queue)。线程queue无法跨进度,用于五个线程之间进行数量同步交互;进度queue只是用于父进度与子进度,或然同属于同意父进度下的八个子进程举行相互。也等于说借使是七个完全部独用立的次第,尽管是python程序,也照例不可以用那么些历程queue来通信。那就算大家有四个单身的python程序,分属于七个经过,只怕是python和其他语言

安装:windows下

首先必要安装 Erlang环境

官网: 

Windows版下载地址:

Linux版:     使用yum安装

消息队列。 

下一场安装RabbitMQ了 

率先下载RabbitMQ 的Windows版本

下载地址:

安装pika:

从前设置过了pip,直接打开cmd,运营pip install pika

安装完结之后,完成1个最简便的行列通讯:

美高梅开户网址 1

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创立2个大旨的socket,然后建立3个管道,在管道中发新闻,然后声Bellamy个queue,起个类别的名字,之后真正的发音讯(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要壹运营就直接运维下去,他时时刻刻收一条,永远在此处卡住。

在地点不管是produce照旧consume,里面都声称了多个queue,那么些是怎么吗?因为大家不知情是主顾先早先运维依然生产者先运维,那样只要未有注解的话就会报错。

上面大家来看一下1对多,即多个劳动者对应多少个顾客:

第三大家运营一个顾客,然后不断的用produce去发送数据,大家得以见到顾客是通过壹种轮询的格局实行不断的收受多少,每一个消费者消费二个。

那正是说只要大家顾客收到了新闻,然后处理这一个音讯需求30分钟,在处理的经过中,消费者断电了宕机了,这消费者还没有拍卖完,大家设这么些职分我们务必处理完,那大家应该有一个确认的新闻,说那个职务成功了也许是从未有过到位,所以小编的生产者要肯定消费者是还是不是把这些职分处理完了,消费者处理完之后要给那几个生产者服务器端发送二个确认消息,生产者才会把那几个义务从新闻队列中去除。借使未有处理完,消费者宕机了,未有给劳动者发送确认新闻,那就意味着尚未处理完,那大家看看rabbitMQ是怎么处理的

作者们能够在顾客的callback中添加贰个time.sleep()举办模拟宕机。callback是1个回调函数,只要事件一触发就会调用那个函数。函数执行完了就象征新闻处理完了,假设函数没有拍卖完,那就申明。。。。

大家得以看来在顾客代码中的basic_consume()中有一个参数叫no_ack=True,那一个意思是那条消息是不是被处理完都不会发送确认音信,1般大家不加那个参数,rabbitMQ默许就会给您设置成新闻处理完了就自动发送确认,大家今后把这些参数去掉,并且在callback中添加一句话运维:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

美高梅开户网址 2美高梅开户网址 3美高梅开户网址 4

运作的结果就是,小编先运营二遍生产者,数据被消费者一收到到了,不过本身把顾客一宕机,甘休运转,那么消费者2就收下了新闻,即只要消费者未有发送确认消息,生产者就不会把新闻删除。

RabbitMQ音讯持久化:

我们能够转变好多的音信队列,那大家怎么查看音信队列的图景呢:rabbitmqctl.bat
list_queues

美高梅开户网址 5

近期的情事是,消息队列中还有音信,不过服务器宕机了,那那些新闻就丢了,那小编就要求以此音讯强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在每回证明队列的时候添加三个durable参数(客户端和劳务器端都要抬高那一个参数),

美高梅开户网址 6

在那些状态下,大家把rabbitMQ服务器重启,发现只有队列名留下了,不过队列中的音讯并未有了,那样大家还亟需在劳动者basic_publish中添加八个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

如此那般就能够使得新闻持久化

当今是2个劳动者对应七个顾客,很公正的收发收发,可是其实的情况是,大家机器的布局是差异等的,有的配置是单核的一对配置是多核的,大概i7处理器处理4条音讯的时候和其余的电脑处理一条信息的时间基本上,那差的微型计算机那里就会积聚新闻,而好的微处理器那里就会形成闲置,在实际中做运营的,大家会在负载均衡中装置权重,什么人的布置高权重高,职分就多一点,不过在rabbitMQ中,我们只做了一个简约的拍卖就能够完成公道的音信分发,你有多大的力量就处理多少新闻

即:server端给客户端发送信息的时候,先检查今后还有稍稍新闻,固然当前音信尚未处理完结,就不会发送给那些消费者新闻。固然当前的顾客未有音信就发送

本条只需求在消费者端举行修改加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在变更3个consume二,在callback中sleep20秒来模拟

美高梅开户网址 7美高梅开户网址 8美高梅开户网址 9

本身先运维四个produce,被consume接受,然后在起步二个,就被consumer贰接受,可是因为consumer第22中学sleep20秒,处理慢,所以那时在开发银行produce,就又给了consume实行拍卖

 

python学习之RabbitMQ—–音讯队列,

RabbitMQ队列

前言:此番整治写一篇有关rabbitMQ的博客,比较上壹篇redis,感觉rabbitMQ难度是增强不少。那篇博客会插入一些英文讲解,但是简单通晓的。rabbitMQ的下载与安装,请参见redis&rabbitMQ安装。

Publish\Subscrible(新闻公布\订阅)

眼下都以一对1的发送接收数据,这自个儿想一对多,想广播1样,生产者发送叁个音讯,全部顾客都收下音讯。那我们如何做呢?今年大家即将用到exchange了

exchange在一端收音信,在另一端就把音讯放进queue,exchange必须准确的掌握收到的音信要干什么,是还是不是应当发到二个一定的queue依旧发给许多queue,可能说把她丢掉,这么些都被exchange的品类所定义

exchange在概念的时候是有项目标,以控制到底是这几个queue符合条件,能够承受音信:

fanout:全数bind到此exchange的queue都得以接受音信

direct:通过rounroutingKey和exchange决定的1贰分唯壹的queue能够吸收音讯

topic:全部符合routingKey的routingKey所bind的queue还行信息

headers:通过headers来决定把消息发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此间的exchange在此以前是空的,未来赋值log;在此处也未尝评释queue,广播不须要写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在顾客那里大家有定义了几个queue,注意一投注明中的内容。然而大家在发送端未有注解queue,为何发送端不须求接收端须求呢?在consume里有一个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还必要叁个queue_name

运作结果:

美高梅开户网址 10美高梅开户网址 11美高梅开户网址 12美高梅开户网址 13

就也正是收音机1样,实时播报,打开五个买主,生产者发送一条数据,然后2个顾客同时吸收到

RabbitMQ队列

先是大家在讲rabbitMQ此前大家要说一下python里的queue:二者干的工作是千篇一律的,都以队列,用于传递新闻

在python的queue中有四个三个是线程queue,3个是进度queue(multiprocessing中的queue)。线程queue无法跨进度,用于八个线程之间开展多少同步交互;进度queue只是用于父进度与子进程,或许同属于同意父进程下的多少个子进程实行相互。也便是说要是是四个完全部独用立的先后,纵然是python程序,也如故不可见用那个进度queue来通讯。那若是我们有三个单身的python程序,分属于多个经过,或许是python和任何语言

安装:windows下

率先须要设置 Erlang环境 官网: 
Windows版下载地址:
Linux版:     使用yum安装   然后安装RabbitMQ了  首先下载RabbitMQ
的Windows版本 下载地址:

安装pika:

此前设置过了pip,直接打开cmd,运维pip install pika

安装实现之后,实现八个最简便的类别通讯:

美高梅开户网址 14

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创建3个为主的socket,然后建立三个管道,在管道中发新闻,然后表明1(Wissu)个queue,起个体系的名字,之后真正的发新闻(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要一运行就一贯运维下去,他持续收一条,永远在此处卡住。

在上头不管是produce依旧consume,里面都宣示了三个queue,这几个是怎么吧?因为我们不知情是顾客先起初运维仍然生产者先运转,这样壹旦未有注解的话就会报错。

下边大家来看一下一对多,即一个劳动者对应四个买主:

第二我们运转贰个顾客,然后不断的用produce去发送数据,大家得以看出顾客是通过一种轮询的章程展开连发的接受多少,每种消费者消费二个。

这便是说只要大家顾客收到了音信,然后处理那一个音信要求30分钟,在处理的经过中,消费者断电了宕机了,那消费者还尚未拍卖完,我们设这一个职务大家务必处理完,这咱们应该有一个承认的新闻,说这一个义务到位了依然是绝非到位,所以本人的劳动者要承认消费者是不是把这么些职务处理完了,消费者处理完之后要给这些生产者服务器端发送多少个认可音讯,生产者才会把那个职分从音信队列中剔除。借使未有拍卖完,消费者宕机了,没有给劳动者发送确认新闻,那就代表向来不拍卖完,那大家看看rabbitMQ是怎么处理的

我们能够在消费者的callback中添加3个time.sleep()举行效仿宕机。callback是二个回调函数,只要事件一触发就会调用这些函数。函数执行完了就意味着新闻处理完了,倘诺函数未有拍卖完,那就认证。。。。

咱俩得以看到在顾客代码中的basic_consume()中有三个参数叫no_ack=True,那个意思是那条音信是不是被处理完都不会发送确认音讯,一般大家不加这几个参数,rabbitMQ暗许就会给您设置成音讯处理完了就机关发送确认,大家今后把这一个参数去掉,并且在callback中添加一句话运营:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

美高梅开户网址 15美高梅开户网址 16美高梅开户网址 17

运作的结果便是,作者先运维3遍生产者,数据被消费者1接到到了,不过自身把顾客1宕机,截止运营,那么消费者贰就收到了消息,即只要消费者未有发送确认消息,生产者就不会把新闻删除。

RabbitMQ消息持久化:

咱俩能够转变好多的新闻队列,那大家怎么查看音讯队列的境况呢:rabbitmqctl.bat
list_queues

美高梅开户网址 18

如今的情景是,新闻队列中还有音讯,可是服务器宕机了,那这一个新闻就丢了,那本身就须求以此消息强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在每一次注解队列的时候拉长三个durable参数(客户端和劳务器端都要丰裕这些参数),

美高梅开户网址 19

在那一个状态下,我们把rabbitMQ服务珍视启,发现唯有队列名留下了,不过队列中的消息并没有了,那样大家还亟需在劳动者basic_publish中添加一个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

如此那般就足以使得新闻持久化

今昔是二个劳动者对应多个买主,很公正的收发收发,但是实际上的状态是,大家机器的布局是不均等的,有的配置是单核的部分配置是多核的,可能i7处理器处理四条消息的时候和其余的微处理器处理一条新闻的时光基本上,那差的总括机那里就会积聚新闻,而好的计算机那里就会形成闲置,在实际中做运行的,大家会在负载均衡中装置权重,哪个人的安排高权重高,职责就多一点,可是在rabbitMQ中,大家只做了二个简单易行的处理就能够完毕公道的信息分发,你有多大的力量就处理多少新闻

即:server端给客户端发送新闻的时候,先检查今后还有稍稍音讯,要是当前音信尚未处理达成,就不会发送给那一个消费者音讯。假使当前的买主未有音讯就发送

其3头必要在消费者端进行改动加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在变更一个consume2,在callback中sleep20秒来模拟

美高梅开户网址 20美高梅开户网址 21美高梅开户网址 22

自家先运转五个produce,被consume接受,然后在起步二个,就被consumer贰接受,但是因为consumer第22中学sleep20秒,处理慢,所以此时在开行produce,就又给了consume进行处理

 

rabbitMQ是新闻队列;想想此前的大家学过队列queue:threading
queue(线程queue,多少个线程之间开始展览多少交互)、进程queue(父进度与子进度展开互动只怕同属于同1父进度下的七个子进程展开相互);固然多个单身的次第,那么之间是不能够经过queue进行互动的,那时候我们就必要一在这之中路代理即rabbitMQ

rabbitMQ是消息队列;想想在此以前的大家学过队列queue:threading
queue(线程queue,多个线程之间开始展览多少交互)、进度Queue(父进程与子进度展开互动可能同属于同1父进度下的三个子进度展开彼此);假若五个单身的次第,那么之间是不可能透过queue实行互动的,那时候大家就须要3个中路代理即rabbitMQ.

有选择的吸收接纳音讯(exchange_type = direct)

RabbitMQ还扶助依照重大字发送,即:队列绑定关键字,发送者将数据依照重点字发送到音信exchange,exchange依据重点字判定应该将数据发送到钦赐的行列

美高梅开户网址 23

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

特别密切的过滤(exchange_type=topic)

美高梅开户网址 24

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

以上都以服务器端发消息,客户端收音讯,新闻流是单向的,那如若大家想要发一条命令给长途的客户端去履行,然后想让客户端执行的结果回到,则那种情势叫做rpc

RabbitMQ RPC

美高梅开户网址 25

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是进入一个不通格局,没有音讯就等候新闻,有新闻就收过来

self.connection.process_data_events()是一个非阻塞版的start_consuming,正是说发了3个东西给客户端,每过一点时光去检查有未有音信,假如未有新闻,能够去干其他业务

reply_to = self.callback_queue是用来接收反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第二在客户端会通过uuid④生成,第2在劳动器端再次来到执行结果的时候也会传过来三个,所以说只要服务器端发过来的correlation_id与协调的id相同
,那么服务器端发出来的结果就必将是本人正好客户端发过去的授命的施行结果。以往就3个劳务器端二个客户端,无所谓缺人不承认。以后客户端是非阻塞版的,大家得以不让它打字与印刷未有音讯,而是实行新的命令,那样就两条音信,不必然按顺序落成,那大家就须求去肯定各类重临的结果是哪些命令的施行结果。

完整的方式是如此的:生产者发了多少个指令给顾客,不知底客户端曾几何时回来,还是要去收结果的,然而它又不想进去阻塞形式,想每过壹段时间看那一个新闻收回来没有,假如音讯收回来了,就意味着收完了。 

运维结果:

美高梅开户网址 26美高梅开户网址 27

劳动器端开启,然后在开发银行客户端,客户端先是等待音信的出殡和埋葬,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

Publish\Subscrible(消息发表\订阅)

眼下都以一对一的出殡和埋葬接收数据,那本人想壹对多,想广播1样,生产者发送1个新闻,全部消费者都收到音讯。那大家怎么做吧?这一年大家将要用到exchange了

exchange在一端收音讯,在另一端就把新闻放进queue,exchange必须准确的驾驭收到的音讯要怎么,是或不是应当发到二个特定的queue依旧发给许多queue,恐怕说把她扬弃,这一个都被exchange的体系所定义

exchange在概念的时候是有档次的,以控制到底是这么些queue符合条件,还行音讯:

fanout:全体bind到此exchange的queue都足以接受音信

direct:通过rounroutingKey和exchange决定的丰裕唯一的queue能够吸收接纳信息

topic:全体符合routingKey的routingKey所bind的queue能够承受消息

headers:通过headers来控制把消息发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此处的exchange在此以前是空的,现在赋值log;在那里也从未表明queue,广播不必要写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在消费者那里大家有定义了二个queue,注意一下诠释中的内容。不过大家在发送端没有评释queue,为啥发送端不须要接收端须要吗?在consume里有二个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还亟需2个queue_name

运维结果:

美高梅开户网址 28美高梅开户网址 29美高梅开户网址 30美高梅开户网址 31

就相当于收音机壹样,实时播报,打开四个买主,生产者发送一条数据,然后一个顾客同时收纳到

音信队列:

 

有选取的吸收接纳信息(exchange_type = direct)

RabbitMQ还帮助依据主要字发送,即:队列绑定关键字,发送者将数据依据重点字发送到新闻exchange,exchange依照重点字判定应该将数据发送到钦点的行列

美高梅开户网址 32

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

更为细心的过滤(exchange_type=topic)

美高梅开户网址 33

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

上述都以服务器端发音信,客户端收新闻,音讯流是单向的,那如果我们想要发一条命令给长途的客户端去实施,然后想让客户端执行的结果回到,则那种形式叫做rpc

RabbitMQ RPC

美高梅开户网址 34

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是进入一个堵塞情势,未有音信就等候新闻,有信息就收过来

self.connection.process_data_events()是3个非阻塞版的start_consuming,就是说发了贰个东西给客户端,每过一点光阴去检查有未有音信,如若未有音信,能够去干别的业务

reply_to = self.callback_queue是用来接收反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第2在客户端会通过uuid四生成,第2在劳动器端重回执行结果的时候也会传过来2个,所以说只要服务器端发过来的correlation_id与和谐的id相同
,那么服务器端发出来的结果就决然是本身刚刚客户端发过去的通令的执行结果。今后就1个服务器端多少个客户端,无所谓缺人不认同。今后客户端是非阻塞版的,大家得以不让它打字与印刷未有新闻,而是实行新的下令,那样就两条消息,不自然按梯次完毕,那大家就供给去肯定各样重返的结果是哪个命令的履行结果。

完整的形式是如此的:生产者发了八个命令给买主,不知道客户端何时回来,依然要去收结果的,可是它又不想进去阻塞格局,想每过一段时间看这么些音讯收回来未有,假设信息收回来了,就意味着收完了。 

运转结果:

美高梅开户网址 35美高梅开户网址 36

劳动器端开启,然后在起步客户端,客户端先是等待音信的发送,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

RabbitMQ队列
首先大家在讲rabbitMQ在此以前大家要说一下python里的queue:二者干的事务是如出壹辙的,都以队列,用于…

  • RabbitMQ
  • ZeroMQ
  • ActiveMQ
  • ………..

壹、简单的rabbitMQ队列通信

美高梅开户网址 37

由上航海用教室能够,数据是首发给exchange调换器,exchage再发放相应队列。pika模块是python对rabbitMQ的API接口。接收端有三个回调函数,一接收到数量就调用该函数。一条消息被3个消费者收到后,该音讯就从队列删除。OK,理解上边的学问后,先来看望三个简约的rabbitMQ列队通讯。

send端:

 1 import pika
 2 #连上rabbitMQ
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()       #生成管道,在管道里跑不同的队列
 5 
 6 #声明queue
 7 channel.queue_declare(queue='hello1')
 8 
 9 #n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange.
10 #向队列里发数据
11 channel.basic_publish(exchange='',      #先把数据发给exchange交换器,exchage再发给相应队列
12                       routing_key='hello1', #向"hello'队列发数据
13                       body='HelloWorld!!')  #发的消息
14 print("[x]Sent'HelloWorld!'")
15 connection.close()

receive端:

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()
 5 
 6 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
 7 # We could avoid that if we were sure that the queue already exists. For example if send.py program
 8 # was run before. But we're not yet sure which program to run first. In such cases it's a good
 9 # practice to repeat declaring the queue in both programs.
10 channel.queue_declare(queue='hello1')#声明队列,保证程序不出错
11 
12 
13 def callback(ch,method,properties,body):
14     print("-->ch",ch)
15     print("-->method",method)
16     print("-->properties",properties)
17     print("[x] Received %r" % body)         #一条消息被一个消费者接收后,该消息就从队列删除
18 
19 
20 channel.basic_consume(callback,              #回调函数,一接收到消息就调用回调函数
21                       queue='hello1',
22                       no_ack=False)    #消费完毕后向服务端发送一个确认,默认为False
23 
24 print('[*] Waiting for messages.To exit press CTRL+C')
25 channel.start_consuming()

运作结果:(下面的代码对应自笔者写的诠释相信是看得懂的~)

美高梅开户网址 38美高梅开户网址 39

rabbitMQ_1_send.py
 [x] Sent 'Hello World!'


rabbitMQ_2_receive.py
 [*] Waiting for messages. To exit press CTRL+C
-->ch <pika.adapters.blocking_connection.BlockingChannel object at 0x000000000250AEB8>
-->method <Basic.Deliver(['consumer_tag=ctag1.f9533f4c8c59473c8096817670ad69d6', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello1'])>
-->properties <BasicProperties>
 [x] Received b'Hello World!!'

View Code

因此深远的测试,有以下八个意识:

  1. 先运行rabbitMQ_1_send.py发送数据,rabbitMQ_2_receive.py未运维。发现当receive运营时还是可以接收数据。
  2. 运作多少个(eg:贰个)接收数据的客户端,再运维发送端,客户端一收受数额,再运行发送端,客户端二收到多少,再运维发送端,客户端3吸收数量。

RabbitMQ会暗中认可把p发的新闻依次分发给各种消费者(c),跟负载均衡差不离。

 

原理:

二、全英文ack

在看上边的事例,你会发觉有一句代码no_ack=False(消费完结后向服务端发送三个承认,暗中同意为False),以自己西班牙语肆级飘过的品位,看完上边关于ack的任课感觉写得很牛啊!!于是分享一下:

Doing a task can take a few seconds. You
may wonder what happens if one of the consumers starts a long task and
dies with it only partly done. With our current code once RabbitMQ
delivers message to the customer it immediately removes it from memory.
In this case, if you kill a worker we will lose the message it was just
processing. We’ll also lose all the messages that were dispatched to
this particular worker but were not yet handled.

美高梅开户网址 ,But we don’t want to lose any tasks. If a
worker dies, we’d like the task to be delivered to another
worker.

In order to make sure a message is never
lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is
sent back from the consumer to tell RabbitMQ that a particular message
had been received, processed and that RabbitMQ is free to delete
it.

If a consumer dies (its channel is
closed, connection is closed, or TCP connection is lost) without sending
an ack, RabbitMQ will understand that a message wasn’t processed fully
and will re-queue it. If there are other consumers online at the same
time, it will then quickly redeliver it to another consumer. That way
you can be sure that no message is lost, even if the workers
occasionally die.

There aren’t any message timeouts;
RabbitMQ will redeliver the message when the consumer dies. It’s fine
even if processing a message takes a very, very long time.

Message
acknowledgments are turned on by default. In previous examples we
explicitly turned them off via the no_ack=True flag. It’s time to
remove this flag and send a proper acknowledgment from the worker, once
we’re done with a task.

Using this code we can be sure that even
if you kill a worker using CTRL+C while it was processing a message,
nothing will be lost. Soon after the worker dies all unacknowledged
messages will be redelivered.

自个儿把发送端和接收端分别比作生产者与消费者。生产者发送任务A,消费者接受任务A并处理,处理完后生产者将音讯队列中的职责A删除。今后我们相遇了1个题材:若是买主接受职务A,但在处理的长河中突然宕机了。而那时生产者将音信队列中的职务A删除。实际上任务A并未能如愿拍卖完,也就是丢失了义务/新闻。为消除那一个标题,应使消费者收到职分并打响拍卖完后发送2个ack到生产者!生产者收到ack后就知晓任务A已被成功拍卖,那时才从信息队列旅长职责A删除,假使未有收取ack,就须求把职务A发送给下2个买主,直到职务A被成功拍卖。

 

美高梅开户网址 40

三、音信持久化

前边已经知道,生产者生产数据,消费者再起步是能够接收数据的。

可是,生产者生产数据,然后重启rabbitMQ,消费者是无力回天接收数据。

eg:新闻在传输进程中rabbitMQ服务器宕机了,会发现后边的音信队列就不设有了,那时大家即将用到音信持久化,音讯持久化会让队列不趁着服务器宕机而熄灭,会永远的保留下去。上面看下关于音讯持久化的英文讲解:

We have learned how to make sure that
even if the consumer dies, the task isn’t lost(by default, if wanna
disable  use no_ack=True). But our tasks will still be lost if RabbitMQ
server stops.

When RabbitMQ quits or crashes it will forget the
queues and messages unless you tell it not to. Two things are
required to make sure that messages aren’t lost: we need to mark both
the queue and messages as durable.

First, we
need to make sure that RabbitMQ will never lose our queue. In order to
do so, we need to declare it as durable:

      1 channel.queue_declare(queue=’hello’,
durable=True)

Although this command is correct by
itself, it won’t work in our setup. That’s because we’ve already defined
a queue called hello which is not durable. RabbitMQ doesn’t allow you to redefine an
existing queue with different parameters and will return an
error(会曝错) to any program that tries to do that. But there is
a quick workaround – let’s declare a queue with different name, for
exampletask_queue:

      1
channel.queue_declare(queue=’task_queue’, durable=True)

This queue_declare change needs to be
applied to both the producer and consumer code.

At that point we’re sure that
the task_queue queue won’t be lost even if RabbitMQ restarts. Now we
need to mark our messages as persistent –
by supplying a delivery_mode property with a value 2.

      1
channel.basic_publish(exchange=”,
      2
                      routing_key=”task_queue”,
      3
                      body=message,
      4
                      properties=pika.BasicProperties(
      5
                         delivery_mode = 2,      # make message
persistent
      6
                      ))

地点的英文对信息持久化讲得很好。音信持久化分为两步:

  • 持久化队列。通过代码完结持久化hello队列:channel.queue_declare(queue=’hello’,
    durable=True)
  • 持久化队列中的新闻。通过代码完结:properties=pika.BasicProperties( delivery_mode = 2, )

此间有个点要留心下:

设若你在代码中已落到实处持久化hello队列与队列中的消息。那么您重启rabbitMQ后重国民党的新生活运动行代码也许会爆错!

因为: RabbitMQ doesn’t allow you to
redefine an existing queue with different parameters and will return an
error.

为了化解这么些标题,能够声Bellamy个与重启rabbitMQ在此以前区别的种类名(queue_name).

 

一、安装和主导使用

四、新闻公平分发

如若Rabbit只管按顺序把新闻发到各种消费者身上,不考虑消费者负载的话,很也许出现,3个机器配置不高的买主那里堆积了过多音信处理不完,同时陈设高的消费者却直接很轻松。为解决此题材,能够在依次消费者端,配置perfetch=一,意思便是报告RabbitMQ在自身这一个消费者当前音讯还没处理完的时候就不要再给笔者发新消息了。

美高梅开户网址 41

 

带音信持久化+公平分发的1体化代码

生产者端:

美高梅开户网址 42美高梅开户网址 43

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)  #队列持久化
 9  
10 message = ' '.join(sys.argv[1:]) or"Hello World!"
11 channel.basic_publish(exchange='',
12                       routing_key='task_queue',
13                       body=message,
14                       properties=pika.BasicProperties(
15                          delivery_mode = 2, # make message persistent消息持久化
16                       ))
17 print(" [x] Sent %r" % message)
18 connection.close()

View Code

消费者端:

美高梅开户网址 44美高梅开户网址 45

 1 #!/usr/bin/env python
 2 import pika
 3 import time
 4  
 5 connection =pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8  
 9 channel.queue_declare(queue='task_queue', durable=True)
10 print(' [*] Waiting for messages. To exit press CTRL+C')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14     time.sleep(body.count(b'.'))
15     print(" [x] Done")
16     ch.basic_ack(delivery_tag =method.delivery_tag)   
17  
18 channel.basic_qos(prefetch_count=1)
19 channel.basic_consume(callback,
20                       queue='task_queue')
21  
22 channel.start_consuming()

View Code

自小编在运转方面程序时对消费者端里回调函数的一句代码(ch.basic_ack(delivery_tag
=method.delivery_tag))10分吸引。那句代码去掉消费者端也能一如既往收到音信啊。那句代码有毛线用处??

生产者端音信持久后,要求在消费者端加上(ch.basic_ack(delivery_tag
=method.delivery_tag)): 保险消息被消费后,消费端发送三个ack,然后服务端从队列删除该音讯.

 

安装RabbitMQ服务
 

5、音信揭露与订阅

事先的事例都基本都以1对一的消息发送和吸收接纳,即消息只可以发送到钦命的queue里,但多少时候你想让您的消息被全数的queue收到,类似广播的遵守,那时候就要用到exchange了。PS:有趣味的询问redis的公布与订阅,能够看看本人写的博客python之redis。

An exchange is a very simple thing. On
one side it receives messages from producers and the other side it
pushes them to queues. The exchange must know exactly what to do with a
message it receives. Should it be appended to a particular queue? Should
it be appended to many queues? Or should it get discarded(丢弃). The
rules for that are defined by the exchange type.

Exchange在概念的时候是有项目标,以控制到底是哪些Queue符合条件,能够接到音讯

 

fanout: 全体bind到此exchange的queue都得以接受音信

direct: 通过routingKey和exchange决定的可怜唯1的queue能够吸收新闻

topic:全体符合routingKey(此时得以是1个表明式)的routingKey所bind的queue能够接过新闻

 

表明式符号表明: #表示三个或三个字符,*意味着任何字符
     
    例:#.a会匹配a.a,aa.a,aaa.a等
               
*.a会匹配a.a,b.a,c.a等
          
 注:使用RoutingKey为#,Exchange
Type为topic的时候一定于选用fanout

 

上边作者分别讲下fanout,direct,topic:

1、fanout

fanout: 全体bind到此exchange的queue都能够吸收接纳音讯

美高梅开户网址 46

send端:

美高梅开户网址 47美高梅开户网址 48

 1 import pika
 2 import sys
 3 
 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel=connection.channel()
 6 
 7 channel.exchange_declare(exchange='logs',
 8                       type='fanout')
 9 
10 message=''.join(sys.argv[1:])or"info:HelloWorld!"
11 channel.basic_publish(exchange='logs',
12                       routing_key='',  #fanout的话为空(默认)
13                       body=message)
14 print("[x]Sent%r"%message)
15 connection.close()

View Code

receive端:

美高梅开户网址 49美高梅开户网址 50

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel=connection.channel()
 5 
 6 channel.exchange_declare(exchange='logs',type='fanout')
 7 
 8 #不指定queue名字(为了收广播),rabbit会随机分配一个queue名字,
 9 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
10 result=channel.queue_declare(exclusive=True)
11 queue_name=result.method.queue
12 
13 #把声明的queue绑定到交换器exchange上
14 channel.queue_bind(exchange='logs',
15                 queue=queue_name)
16 
17 print('[*]Waitingforlogs.ToexitpressCTRL+C')
18 
19 def callback(ch,method,properties,body):
20     print("[x]%r"%body)
21 
22 
23 channel.basic_consume(callback,
24                       queue=queue_name,
25                       no_ack=True)
26 
27 channel.start_consuming()

View Code

有八个点要留心下:

  • fanout-广播,send端的routing_key=”, #fanout的话为空(暗许)

  • receive端有一句代码:result=channel.queue_declare(exclusive=True),作用:不点名queue名字(为了收广播),rabbitMQ会随机分配二个queue名字,exclusive=True会在接纳此queue的顾客断开后,自动将queue删除。

 

二、有取舍的收到音讯(exchange
type=direct)

RabbitMQ还援救依据重点字发送,即:队列绑定关键字,发送者将数据遵照重大字发送到音讯exchange,exchange依据 关键字
判定应该将数据发送至钦赐队列。

美高梅开户网址 51

send端:

美高梅开户网址 52美高梅开户网址 53

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localh'))ost
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 severity = sys.argv[1] iflen(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14                       routing_key=severity, #关键字不为空,告知消息发送到哪里(info,error~)
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()

View Code

receive端:

美高梅开户网址 54美高梅开户网址 55

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 result =channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0])
17     sys.exit(1)
18  
19 for severity in severities:
20     channel.queue_bind(exchange='direct_logs',
21                        queue=queue_name,
22                        routing_key=severity)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" %(method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

实际最开端笔者看代码是1脸懵逼的~
上面是小编在cmd进行测试的截图(合营着截图看会不难掌握些),贰个send端,四个receive端(先起receive端,再起receive端):

send端:

美高梅开户网址 56

receive端-1:

美高梅开户网址 57

receive端-2:

美高梅开户网址 58

 

叁、更仔细的音讯过滤topic(供参考)

Although using the direct exchange
improved our system, it still has limitations – it can’t do routing
based on multiple criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix tool, which routes logs based on both severity
(info/warn/crit…) and facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’.

备感自身英文水准不高啊~,笔者相比着垃圾有道翻译,加上自个儿的驾驭,大概知道地点在讲什么样。

举例来说:
假诺是系统的失实,就把音信发送到A,假若是MySQL的不当,就把消息发送到B。不过对B来说,想完毕接收MySQL的错误音信,可以用有选拔的收纳音信(exchange type=direct),让机要字为error就落到实处了哟!现在B有个须要:不是独具的错误音信都接受,只接受钦命的荒唐。在某种消息再开始展览过滤,那便是更周到的音讯过滤topic。

 

send端:

美高梅开户网址 59美高梅开户网址 60

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')  #类型为topic
10  
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

View Code

receive端:

美高梅开户网址 61美高梅开户网址 62

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')
10  
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18  
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

 

 

python安装RabbitMQ模块

六、RPC(Remote Procedure Call)

牧马人PC的定义可看作者百度的(其实就象是作者从前做的FTP,笔者从客户端发1个下令,服务端重临相关音信):

美高梅开户网址 63美高梅开户网址 64

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

View Code

上面重点讲下CR-VPC通讯,作者刚伊始学挺难的,学完未来觉得大切诺基PC通信的想想很有启发性,代码的例证写得也很牛!!

美高梅开户网址 65

client端发的新闻被server端接收后,server端会调用callback函数,执行任务后,还索要把相应的音讯发送到client,可是server如何将音讯发还给client?假设有三个client连接server,server又怎么通晓是要发放哪个client??

安德拉PC-server默许监听rpc_queue.肯定无法把要发放client端的新闻发到rpc_queue吧(rpc_queue是监听client端发到server端的数额)。

理所当然的方案是server端另起二个queue,通过queue将音信重临给对应client。但难题又来了,queue是server端起的,故client端肯定不清楚queue_name,连queue_name都不知情,client端接收毛线的多寡??

消除措施:

客户端在发送指令的同时告诉服务端:任务履行完后,数据经过某队列再次来到结果。客户端监听该队列就OK了。

client端:

 1 import pika
 2 import uuid
 3 
 4 
 5 class FibonacciRpcClient(object):
 6     def __init__(self):
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 8 
 9         self.channel = self.connection.channel()
10         #随机建立一个queue,为了监听返回的结果
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue   ##队列名
13 
14         self.channel.basic_consume(self.on_response,  #一接收客户端发来的指令就调用回调函数on_response
15                                    no_ack=True,
16                                    queue=self.callback_queue)
17 
18     def on_response(self, ch, method, props, body):  #回调
19         #每条指令执行的速度可能不一样,指令1比指令2先发送,但可能指令2的执行结果比指令1先返回到客户端,
20         #此时如果没有下面的判断,客户端就会把指令2的结果误认为指令1执行的结果
21         if self.corr_id == props.correlation_id:
22             self.response = body
23 
24     def call(self, n):
25         self.response = None    ##指令执行后返回的消息
26         self.corr_id = str(uuid.uuid4())   ##可用来标识指令(顺序)
27         self.channel.basic_publish(exchange='',
28                                    routing_key='rpc_queue', #client发送指令,发到rpc_queue
29                                    properties=pika.BasicProperties(
30                                        reply_to=self.callback_queue, #将指令执行结果返回到reply_to队列
31                                        correlation_id=self.corr_id,
32                                    ),
33                                    body=str(n))
34         while self.response is None:
35             self.connection.process_data_events() #去queue接收数据(不阻塞)
36         return int(self.response)
37 
38 
39 fibonacci_rpc = FibonacciRpcClient()
40 
41 print(" [x] Requesting fib(30)")
42 response = fibonacci_rpc.call(30)
43 print(" [.] Got %r" % response)

server端:

 1 import pika
 2 import time
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     host='localhost'))
 6 
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='rpc_queue')
10 
11 
12 def fib(n):
13     if n == 0:
14         return 0
15     elif n == 1:
16         return 1
17     else:
18         return fib(n - 1) + fib(n - 2)
19 
20 
21 def on_request(ch, method, props, body):
22     n = int(body)
23 
24     print(" [.] fib(%s)" % n)
25     response = fib(n)  #从客户端收到的消息
26 
27     ch.basic_publish(exchange='',   ##服务端发送返回的数据到props.reply_to队列(客户端发送指令时声明)
28                      routing_key=props.reply_to,  #correlation_id (随机数)每条指令都有随机独立的标识符
29                      properties=pika.BasicProperties(correlation_id= \
30                                                          props.correlation_id),
31                      body=str(response))
32     ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端持久化
33 
34 
35 channel.basic_qos(prefetch_count=1)  #公平分发
36 channel.basic_consume(on_request,    #一接收到消息就调用on_request
37                       queue='rpc_queue')
38 
39 print(" [x] Awaiting RPC requests")
40 channel.start_consuming()

 

中间转播申明出处: 

pip install pika
or
easy_install pika
or
源码

https://pypi.python.org/pypi/pika

2、完结最简易的行列通讯

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)   #接受到消息后不返回ack,无论本地是否处理完消息都会在队列中消失
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

注:windows连linux上的rabbitMQ会产出报错,要求提供用户名密码

三、RabbitMQ新闻分发轮询

先运转音信生产者,然后再分别运行3个买主,通过生产者多发送几条消息,你会发现,这几条音信会被逐壹分配到种种消费者身上

美高梅开户网址 66

 

在那种形式下,RabbitMQ会私下认可把p发的消息公平的逐条分发给各样消费者(c),跟负载均衡差不离

美高梅开户网址 67美高梅开户网址 68

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

美高梅开户网址 69美高梅开户网址 70

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

因此推行pubulish.py和consume.py能够完结地点的音讯公平分发,那假诺c1收到新闻随后宕机了,会冒出哪些情状呢?rabbitMQ是哪些处理的?以往大家模拟一下

美高梅开户网址 71美高梅开户网址 72

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

美高梅开户网址 73美高梅开户网址 74

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

在consume.py的callback函数里扩充了time.sleep模拟函数处理,通过上面程序举行效仿发现,c一收取到新闻后不曾处理完突然宕机,音讯就从队列上海消防灭了,rabbitMQ把音讯删除掉了;假若程序需要音信必须求拍卖完才能从队列里删除,那大家就必要对先后开始展览处理一下

美高梅开户网址 75美高梅开户网址 76

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

美高梅开户网址 77美高梅开户网址 78

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    #time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

通过把consume.py接收端里的no_ack``=``True去掉之后并在callback函数里面添加ch.basic_ack(delivery_tag ``= method.delivery_tag,就能够兑现新闻不被处理完不可能在队列里清除

查看消息队列数:

美高梅开户网址 79

4、消息持久化

假若消息在传输进程中rabbitMQ服务器宕机了,会发觉前面包车型地铁音信队列就不存在了,那时大家就要用到新闻持久化,音讯持久化会让队列不随着服务器宕机而泯没,会永远的保存下去

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

5、信息公平分发

万1Rabbit只管按顺序把音信发到种种消费者身上,不思索消费者负载的话,十分的大概出现,多少个机器配置不高的顾客那里堆积了多如牛毛消息处理不完,同时陈设高的买主却直接很轻松。为化解此难点,能够在挨家挨户消费者端,配置perfetch=1,意思便是告诉RabbitMQ在本身这些消费者当前音信还没处理完的时候就不要再给笔者发新音讯了

美高梅开户网址 80

channel.basic_qos(prefetch_count=1)

带音讯持久化+公平分发

美高梅开户网址 81美高梅开户网址 82

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

美高梅开户网址 83美高梅开户网址 84

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

6、Publish\Subscribe(音信发表\订阅) 

事先的事例都基本都是一对1的消息发送和接到,即新闻只可以发送到钦命的queue里,但有点时候你想让您的新闻被全部的Queue收到,类似广播的作用,那时候就要用到exchange了,

An exchange is a very simple thing. On one
side it receives messages from producers and the other side it pushes
them to queues. The exchange must know exactly what to do with a message
it receives. Should it be appended to a particular queue? Should it be
appended to many queues? Or should it get discarded. The rules for that
are defined by the exchange type.

Exchange在概念的时候是有品种的,以决定到底是什么样Queue符合条件,能够接过消息

fanout: 全部bind到此exchange的queue都足以选择新闻
direct: 通过routingKey和exchange决定的要命唯壹的queue能够收起音信
topic:全数符合routingKey(此时能够是三个表明式)的routingKey所bind的queue尚可音讯

headers: 通过headers
来决定把音信发给哪些queue

表明式符号表明:#代表二个或七个字符,*表示任何字符

     
 例:#.a会匹配a.a,aa.a,aaa.a等
           
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange
Type为topic的时候一定于接纳fanout 

壹fanout收下全部广播:广播表示近来音讯是实时的,假若未有一个消费者在接受音讯,音讯就会扬弃,在此地消费者的no_ack已经无用,因为fanout不会管你处理音信截至未有,发过的音信不会重发,记住广播是实时的

美高梅开户网址 85

 

美高梅开户网址 86美高梅开户网址 87

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',   #广播不用声明queue
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

publish.py

美高梅开户网址 88美高梅开户网址 89

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,
                                                # exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue

channel.queue_bind(exchange='logs',         # 绑定转发器,收转发器上面的数据
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

consume.py

②有选择的吸收新闻 direct:
 同fanout壹样,
no_ack在此要安装为True,不然队列里多少不会清空(纵然也不会重发)**

RabbitMQ还援助依据重点字发送,即:队列绑定关键字,发送者将数据根据重大字发送到音讯exchange,exchange依据关键字 判定应该将数据发送至钦定队列

美高梅开户网址 90

 

美高梅开户网址 91美高梅开户网址 92

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

publish.py

美高梅开户网址 93美高梅开户网址 94

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

③更加细致的信息过滤 topic:

Although using the direct exchange improved our system, it still has
limitations – it can’t do routing based on multiple
criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix
tool, which routes logs based on both severity (info/warn/crit…) and
facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’

美高梅开户网址 95

美高梅开户网址 96美高梅开户网址 97

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

publish.py

美高梅开户网址 98美高梅开户网址 99

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

 

纳瓦拉PC(Remote procedure
call )双向通信

To illustrate how an RPC service could be
used we’re going to create a simple client class. It’s going to expose a
method named call which sends an RPC request and
blocks until the answer is received:

美高梅开户网址 100

rpc client:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import uuid,time


class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, #只要收到消息就执行on_response
                                   no_ack=True,     #不用ack确认
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:    #验证码核对
            self.response = body


    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        print(self.corr_id)
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,    #发送返回信息的队列name
                                       correlation_id=self.corr_id,     #发送uuid 相当于验证码
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()   #非阻塞版的start_consuming
            print("no messages")
            time.sleep(0.5)     #测试
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()    #实例化
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)       #执行call方法
print(" [.] Got %r" % response)

rpc server:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,    #回信息队列名
                     properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


#channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,
                      queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图