封装的一个使用pika进行操作RabbitMQ的方法。需要手动关闭连接。
支持创建queue、exchange、绑定routing_key等操作。
import pika
import json
from getConfig import AppConfig
RabbitMQConfig = AppConfig.get('RabbitMQConfig')
class RabbitMQConnect:
def __init__(self, fun=None):
"""
初始话MQ连接信息。
queue:队列模式
exchange: 交换器模式其中交换器模式种,共有 发布订阅模式(fanout)、路由模式(direct)、主题模式(topic)
使用时,需要手动调用close_mq进行关闭连接否则连接数过多,影响性能。
:param fun: 将回调的方法送出,用于处理从mq得到的数据,只送出一个参数,若为发布模式则无须传。
"""
credentials = pika.PlainCredentials(username=RabbitMQConfig.get('user'),
password=RabbitMQConfig.get('password')) # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(credentials=credentials,
host=RabbitMQConfig.get('host'),
port=RabbitMQConfig.get('port'),
virtual_host=RabbitMQConfig.get('virtual_host')
))
self.channel = self.connection.channel()
self.result = None
self._callback_fun = fun
def create_queue(self, queue, durable=False):
"""
声明消息队列,消息将在这个队列传递,如不存在,则创建
:param queue: 队列名
:param durable: 队列是否持久化True or False
:return:
"""
self.channel.queue_declare(queue=queue, durable=durable)
def create_exchange(self, exchange, exchange_type, routing_key, queue, durable=False):
"""
声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。
durable = True 代表exchange持久化存储,False 非持久化存储,且自动绑定队列
:param queue: 绑定的队列,若不传,则自动创建临时队列
:param routing_key:路由键值,配合当exchange_type为direct时候需要指定routing_key
:param exchange: 交换器
:param exchange_type: fanout: 发布订阅模式, fanout纯广播/all。
这种模式下,传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。
direct:路由模式 direct有选择的接受消息
这种工作模式的原理是 消息发送至 exchange,exchange 根据路由键(routing_key)转发到相对应的 queue 上。
topic: 主题模式 按照通配符指定发送到队列 规则播
这种模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发消息到指定的 queue 。
不同点是 routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,
比如“#”是匹配全部,“*”是匹配一个词。
:param durable: 持久化存储
:return:
"""
_result = self.channel.queue_declare(queue=queue, durable=durable)
self.channel.exchange_declare(exchange=exchange, durable=durable, exchange_type=exchange_type)
self.channel.queue_bind(exchange=exchange, queue=_result.method.queue,
routing_key=routing_key)
def exchange_bind_exchange(self, destination, source, routing_key=''):
"""
交换器绑定
:param destination: 目标交换器
:param source: 源交换器
:param routing_key: 路由规则
:return:
"""
self.channel.exchange_bind(destination=destination, source=source, routing_key=routing_key)
def production(self, message, routing_key='', exchange='', delivery_mode=2, exchange_type=None):
"""
生产队列信息
在topic下指定routing_key无用。
:param message: 进MQ的信息
:param routing_key: 路由键
:param exchange: 此模式下需要选择类型。分别为发布订阅和发布消费模式
:param delivery_mode: 2 为持久化 1为非持久化 默认为2
:param exchange_type: 'fanout': 发布订阅模式。direct:发布消费模式 topic: 发布消费模式,按规则播
:return:
"""
if routing_key == '' and exchange == '':
raise TypeError('Either queue or exchange is assigned')
# if exchange != '':
# raise TypeError('Please select exchange type direct or fanout')
self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=delivery_mode))
def queue_consumptions(self, queue, auto_ack=False):
"""
消费队列信息 这里的模式是生产消费一对一形式
开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
:param queue: 所需要消费队列的名称
:param auto_ack: 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。
True,无论调用callback成功与否,消息都被消费掉
:return:
"""
# 告诉rabbitmq,用callback来接收消息
self.channel.basic_consume(queue, self._callback, auto_ack=auto_ack)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
self.channel.start_consuming()
def queue_consumption(self, queue, auto_ack=False):
"""
消费队列信息 这里的模式是生产消费一对一形式
一次只处理一条信息。
:param queue: 所需要消费队列的名称
:param auto_ack: 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。
True,无论调用callback成功与否,消息都被消费掉
:return:
"""
return self.channel.basic_get(queue, auto_ack=auto_ack)
def close_mq(self):
"""
关闭连接
:return:
"""
self.connection.close()
def _callback(self, ch, method, properties, body):
"""
队列有信息后就会回调此方法,通过初始化时送的方法 self._callback_fun将队列订阅调用到的信息回调处理。
:param ch:
:param method:
:param properties:
:param body:
:return:
"""
ch.basic_ack(delivery_tag=method.delivery_tag)
# print(body.decode())
self._callback_fun(body.decode())
测试:
# 1 队列 routing_key 为python.# 绑定全部 exchange 【sea_whales_1、sea_whales、sea_whales_foanout】
# 2 队列 routing_key 为#.# 绑定全部 exchange 【sea_whales_1、sea_whales、sea_whales_foanout】
# 2
rmc.production(message="topic_sea_whales_No_Routeing_key_notype", exchange='sea_whales_1')
rmc.production(message="sea_whales_No_Routeing_key_notype_1", exchange='sea_whales')
# 1 2
rmc.production(message="sea_whales_No_Routeing_key_notype_2", exchange='sea_whales_foanout')
# 不带key
# 2
rmc.production(message="topic_sea_whales_No_Routeing_key", exchange='sea_whales_1', exchange_type='topic')
rmc.production(message="direct_sea_whales_No_Routeing_key", exchange='sea_whales', exchange_type='direct')
# 1 2
rmc.production(message="fanout_sea_whales_No_Routeing_key", exchange='sea_whales_foanout', exchange_type='fanout')
# 带key 走统配
# 1 2
rmc.production(message="topic_sea_whales_python.234", routing_key='python.234', exchange='sea_whales_1', exchange_type='topic')
rmc.production(message="direct_sea_whales_python.234", routing_key='python.234', exchange='sea_whales', exchange_type='direct')
# 1 2
rmc.production(message="fanout_sea_whales_12.234", routing_key='python.234', exchange='sea_whales_foanout', exchange_type='fanout')
# 带key 带统配
# 1 2
rmc.production(message="topic_sea_whales_python.#", routing_key='python.#', exchange='sea_whales_1', exchange_type='topic')
# 1
rmc.production(message="direct_sea_whales_python.#", routing_key='python.#', exchange='sea_whales', exchange_type='direct')
# 1 2
rmc.production(message="fanout_sea_whales_12.#", routing_key='python.#', exchange='sea_whales_foanout', exchange_type='fanout')
# 带key 全部统配
# 2
rmc.production(message="topic_sea_whales_12.13", routing_key='12.123', exchange='sea_whales_1', exchange_type='topic')
rmc.production(message="direct_sea_whales_12.123", routing_key='12.123', exchange='sea_whales', exchange_type='direct')
# 1 2
rmc.production(message="fanout_sea_whales_12.123", routing_key='12.123', exchange='sea_whales_foanout', exchange_type='fanout')
最后一次更新于2021-08-20 13:30:29
0 条评论