VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Python基础教程 >
  • python基础教程之RabbitMQ(2)

本站最新发布   Python从入门到精通|Python基础教程
试听地址  
https://www.xin3721.com/eschool/pythonxin3721/


创建一个名为"task1"的队列用来投递消息,durable=True队列持久化 12 channel.queue_declare(queue='task1', durable=True) 13 # 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),初学可以使用空字符串交换(exchange=''), 14 # 它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据 15 message = ' '.join(sys.argv[1:]) or "很高兴见到你" 16 # 发送消息到指定的路由和路由键 17 channel.basic_publish( 18 exchange='', 19 routing_key='task1', 20 body=message, 21 # 消息持久化 22 properties=pika.BasicProperties(delivery_mode=2) 23 ) 24 print(" [x] Sent %r" % (message,)) 25 # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接 26 connection.close()
复制代码

receiver.py

复制代码
 1 import time
 2 
 3 import pika
 4 # 创建凭证,使用rabbitmq用户密码登录
 5 # 使用前,必须得验证身份
 6 credentials = pika.PlainCredentials("guest","guest")
 7 # 建立一个到RabbitMQ服务器的连接。
 8 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',credentials=credentials))
 9 channel = connection.channel()
10 # 创建一个名为"task1"的队列用来投递消息
11 channel.queue_declare(queue='task1',durable=True)
12 
13 def callbak(ch,method,properties,body):
14     print("[x] Received %r"%body.decode("utf8"))
15     time.sleep(body.decode("utf8").count('.'))
16     print(" [x] Done")
17     # 消息确认
18     ch.basic_ack(delivery_tag=method.delivery_tag)
19 
20 # 只处理一条数据,作出响应后再处理下一条
21 channel.basic_qos(prefetch_count=1)
22 # 有消息来临,立即执行callbak,没有消息则夯住,等待消息
23 # 消息来了,立即去取,队列名字为task1
24 channel.basic_consume(queue='task1', on_message_callback=callbak)
25 # 开始消费,接收消息
26 channel.start_consuming()
复制代码

注:会发现此时消息是谁有时间谁去处理,而不是轮询分配的方式。

11.exchange交换机

前面的学习只搭建了一个工作队列,每个任务只分发给一个工作者(worker),如果有些任务需要多个工作者都能够拿到,那么就需要使用到exchange。一个消息给到多个消费者,这种模式叫"发布/订阅"

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。而是将消息发送给交换机,由交换机发送给队列。

exchange的四种模式

fanout Exchange
扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

direct Exchange
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。

topic Exchange
主题交换机是一种支持正则匹配的exchange,发送到topic exchange上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。该exchange的routing_key需要有一定的规则,交换机和队列的binding_key需要采用.#......的格式,每个部分用.分开,其中:*表示一个单词,#表示任意数量(零个或多个)单词。

headers Exchange
头交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。将一个exchange声明成Headers exchange,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。绑定exchange和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比direct exchange,首部交换机的优势是匹配的规则不被限定为字符串(string)。

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

匿名的交换器

前面我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串("")默认的交换机。

回想我们之前是如何发布一则消息:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

exchange参数就是交换机的名称。空字符串代表默认或者匿名交换机:消息将会根据指定的routing_key分发到指定的队列。

现在,我们就可以发送消息到一个具名交换机了:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

临时队列

临时队列手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。只需要在调用queue_declare方法的时候,不提供queue参数的值就可以了:

result = channel.queue_declare(queue="")

我们可以通过result.method.queue获得已经生成的随机队列名。它可能是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

result.method.queue

当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。exclusive标识符即可达到此目的。

result = channel.queue_declare(queue="",exclusive=True)  # 断开连接,队列立即删除

绑定(bindings)

已经创建了一个扇型交换机(fanout)和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换器和队列之间的联系我们称之为绑定(binding)。

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

你可以使用rabbitmqctl list_bindings 列出所有现存的绑定。

发布/订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

扇形交换机(fanout)

如:日志文件。

sender.py

复制代码
import pika
import sys
# 创建凭证,使用rabbitmq用户密码登录
credentials = pika.PlainCredentials("guest", "guest")
# 创建一个rabbitmq连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()
# 指定交换机类型
channel.exchange_declare(exchange='m1',exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "很高兴见到你"
# 指定发送给哪个交换机(exchange='')路由键(routing_key=''),参数body值发送的数据
channel.basic_publish(
    exchange='m1',
    routing_key='',
    body=message,
)
print(" [x] Sent %r" % (message,))
connection.close()
复制代码

receiver.py

复制代码
import time
import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m1', exchange_type='fanout')
# 随机创建一个队列,扇形交换机要将消息给所有人,所以不需要特意指定一个队列的名称。
result = channel.queue_declare(queue="", exclusive=True)
# 获取队列的名字
queue_name = result.method.queue
# 创建绑定,交换机和队列
channel.queue_bind(exchange='m1',
                   queue=queue_name)

def callbak(ch, method, properties, body):
    print("[x] Received %r" % body.decode("utf8"))
    time.sleep(body.decode("utf8").count('.'))
    print(" [x] Done")

channel.basic_consume(queue=queue_name, on_message_callback=callbak, auto_ack=True)
# 开始消费,接收消息
channel.start_consuming()
复制代码

这样我们就完成了。如果你想把结果保存到日志文件中,只需要打开控制台输入:

python receiver.py > logs_from_rabbit.log  # 使用该方式启动receiver.py

直连交换机

绑定(binding)是指交换机(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。

绑定的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,我们把它叫做绑定键(binding key)。以下是如何创建一个带绑定键的绑定。

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

直连交换机(direct exchange)将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。绑定的都是同一个路由,通过路由键来区分是谁处理任务。

比如:有三个任务,分别分配给三个人,任务1-->p1、任务2-->p2、任务3-->p3,你是一个任务发布者,说我想要任务1的结果了,对应的就是p1给你答案,想要任务2的结果了,对应的就是p2给你答案。

如上图所示:直连交换机绑定了Q1和Q2两个队列,第一个队列使用orange作为绑定键,第二个队列使用black和green作为绑定键,当路由键为orange时会被路由到Q1这个队列。为black或green时会被路由到Q2这个队列。

多个绑定(Multiple bindings)

多个队列使用相同的绑定键是合法的。这个例子中,我们可以添加一个X和Q1之间的绑定,使用black绑定键。这样一来,直连交换机就和扇型交换机的行为一样,会将消息广播到所有匹配的队列。带有black路由键的消息会同时发送到Q1和Q2。

sender.py

复制代码
import pika

routing_key=input("请输入routing_key:")
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()
# 指定直连交换机
channel.exchange_declare(exchange='task1',exchange_type='direct')
message = f"很高兴见到你{routing_key}"
channel.basic_publish(
    exchange='task1',
    routing_key=routing_key,
    body=message,
)
print(f"给{routing_key}发送消息成功")
connection.close()
复制代码

receiver1.py

复制代码
import time
import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='task1', exchange_type='direct')
# 随机创建一个队列
result = channel.queue_declare(queue="", exclusive=True)
# 获取队列的名字
queue_name = result.method.queue
# 绑定交换机和队列
channel.queue_bind(exchange='task1',
                   queue=queue_name,
                   routing_key="xiaoqi")

def callbak(ch, method, properties, body):
    print(body.decode("utf8"))

channel.basic_consume(queue=queue_name, on_message_callback=callbak, auto_ack=True)
# 开始消费,接收消息
channel.start_consuming()
复制代码

receiver2.py

复制代码
import time
import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='task1', exchange_type='direct')
# 随机创建一个队列
result = channel.queue_declare(queue="", exclusive=True)
# 获取队列的名字
queue_name = result.method.queue
# 绑定交换机和队列
channel.queue_bind(exchange='task1',
                   queue=queue_name,
                   routing_key="dada")

def callbak(ch, method, properties, body):
    print(body.decode("utf8"))

channel.basic_consume(queue=queue_name, on_message_callback=callbak, auto_ack=True)
# 开始消费,接收消息
channel.start_consuming()
复制代码

应用场景:可以是错误日志,不同日志保存到不同的文件。

如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:

python receive_logs_direct.py warning error > logs_from_rabbit.log  # 此方式开启receiver

主题交换机

发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.分隔开的词语列表。词语的个数可以随意,但是不要超过255字节。

* (星号) 用来表示一个单词.

# (井号) 用来表示任意数量(零个或多个)单词。

主题交换机是很强大的,它可以表现出跟其他交换机类似的行为

sender.py

复制代码
import pika

routing_key=input("请输入routing_key:")
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()
# 指定直连交换机
channel.exchange_declare(exchange='task2',exchange_type='topic')
message = f"很高兴见到你{routing_key}"
channel.basic_publish(
    exchange='task2',
    routing_key=routing_key,
    body=message,
)
print(f"给{routing_key}发送消息成功")
connection.close()
复制代码

receiver.py

复制代码
import time
import pika

credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='task2', exchange_type='topic')
# 随机创建一个队列
result = channel.queue_declare(queue="", exclusive=True)
# 获取队列的名字
queue_name = result.method.queue
# 绑定交换机和队列
channel.queue_bind(exchange='task2',
                   queue=queue_name,
                   routing_key="*.md")

def callbak(ch, method, properties, body):
    print(body.decode("utf8"))
# 有消息来临,立即执行callbak,从队列q1中取数据;没有消息则夯住,等待消息
channel.basic_consume(queue=queue_name, on_message_callback=callbak, auto_ack=True)
# 开始消费,接收消息
channel.start_consuming()
复制代码

官方文档:http://rabbitmq.mr-ping.com/tutorials_with_python/[5]Topics.html

12.RPC远程过程调用

RPC远程过程调用,在计算机A上的进程,调用另外一台计算机B的进程,A上的进程被挂起,B上的被调用进程开始执行后,产生返回值给A,A继续执行。它是一个计算机通信协议。

RPC的关键不在于它是什么通信协议,重点在于可以通过RPC进行解耦服务。

应用:

一个电商的下单过程,涉及物流、支付、库存、红包等多个系统,多个系统又在多个服务器上,由不同的技术团队负责,整个下单过程,需要所有团队进行远程调用。

下单  { 库存>减少库存    
        支付>扣款    
        红包>减免红包    
        物流>生成订单}

python实现RPC

利用RabbitMQ构建一个RPC系统,包含了客户端和RPC服务器,依旧使用pika模块

Callback queue 回调队列

一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to

复制代码
result = channel.queue_declare(queue="",exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)
复制代码

Correlation id 关联标识

一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

客户端发送请求:某个应用将请求信息交给客户端,然后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息。

服务器端工作流: 等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中。

客户端接受处理结果: 客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用。

client.py

复制代码
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials("guest", "guest")
        # 客户端启动时,创建回调队列,会开启会话用于发送RPC请求以及接受响应
        # 建立连接,指定服务器的ip地址
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost', credentials=credentials))
        # 建立一个会话,每个channel代表一个会话任务
        self.channel = self.connection.channel()

        # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次
        # exclusive=True 参数是指只对首次声明它的连接可见
        # exclusive=True 会在连接断开的时候,自动删除
        result = self.channel.queue_declare(queue="", exclusive=True)
        # 将次队列指定为当前客户端的回调队列,用户接收服务端给客户端发送的信息
        self.callback_queue = result.method.queue
        # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理;
        # 有消息来临,立即执行callbak,从队列on_response中取数据;没有消息则夯住,等待消息
        self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)

    # 对回调队列中的响应进行处理的函数
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    # 发出RPC请求
    # 例如这里服务端就是一场马拉松,跑完一段将棒交给下一个人,这个过程是发送rpc请求
    def call(self, n):
        # 初始化 response
        self.response = None
        # 生成correlation_id 关联标识,通过python的uuid库,生成全局唯一标识ID,保证时间空间唯一性
        self.corr_id = str(uuid.uuid4())
        # 发送RPC请求内容到RPC请求队列`task2`,同时发送的还有`reply_to`和`correlation_id`
        self.channel.basic_publish(exchange='',
                                   routing_key='main_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

# 建立客户端
fibonacci_rpc = FibonacciRpcClient()

# 发送RPC请求,丢进rpc队列,等待客户端处理完毕,给与响应
print("发送了请求sum(100)")
response = fibonacci_rpc.call(100)

print("得到远程结果响应%r" % response)
复制代码

server.py

复制代码
import pika

credentials = pika.PlainCredentials("guest", "guest")
# 建立连接,服务器地址为localhost,也可指定ip地址
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))
# 建立会话
channel = connection.channel()
# 模拟一个进程,例如接力比赛
def sum(n):
    n+=100
    return n
# 对RPC请求队列中的请求进行处理

def on_request(ch, method, props, body):
    print(body,type(body))
    n = int(body)
    print(" 正在处理sum(%s)..." % n)
    # 调用数据处理方法
    response = sum(n)
    # 将处理结果(响应)发送到回调队列
    ch.basic_publish(exchange='',
                     # reply_to代表回复目标
                     routing_key=props.reply_to,
                     # correlation_id(关联标识):用来将RPC的响应和请求关联起来。
                     properties=pika.BasicProperties(correlation_id= props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(" 处理完成sum(%s)" % n)

# 负载均衡,同一时刻发送给该服务器的请求不超过一个
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='main_queue',on_message_callback=on_request)
print("等待接收rpc请求")

#开始消费
channel.start_consuming()
复制代码

相关教程
        
关于我们--广告服务--免责声明--本站帮助-友情链接--版权声明--联系我们       黑ICP备07002182号