封装的一个使用pika进行操作RabbitMQ的方法。需要手动关闭连接。
支持创建queue、exchange、绑定routing_key等操作。

import pika
import json
from getConfig import AppConfig

RabbitMQConfig = AppConfig.get('RabbitMQConfig')


class RabbitMQConnect:
    def __init__(self, fun=None):
        """
        初始话MQ连接信息。
        queue:队列模式
        exchange: 交换器模式其中交换器模式种,共有 发布订阅模式(fanout)、路由模式(direct)、主题模式(topic)
        使用时,需要手动调用close_mq进行关闭连接否则连接数过多,影响性能。
        :param fun: 将回调的方法送出,用于处理从mq得到的数据,只送出一个参数,若为发布模式则无须传。
        """
        credentials = pika.PlainCredentials(username=RabbitMQConfig.get('user'),
                                            password=RabbitMQConfig.get('password'))  # mq用户名和密码
        # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(credentials=credentials,
                                      host=RabbitMQConfig.get('host'),
                                      port=RabbitMQConfig.get('port'),
                                      virtual_host=RabbitMQConfig.get('virtual_host')
                                      ))
        self.channel = self.connection.channel()
        self.result = None
        self._callback_fun = fun

    def create_queue(self, queue, durable=False):
        """
        声明消息队列,消息将在这个队列传递,如不存在,则创建
        :param queue: 队列名
        :param durable: 队列是否持久化True or False
        :return:
        """
        self.channel.queue_declare(queue=queue, durable=durable)

    def create_exchange(self, exchange, exchange_type, routing_key, queue, durable=False):
        """
        声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。
        durable = True 代表exchange持久化存储,False 非持久化存储,且自动绑定队列
        :param queue: 绑定的队列,若不传,则自动创建临时队列
        :param routing_key:路由键值,配合当exchange_type为direct时候需要指定routing_key
        :param exchange: 交换器
        :param exchange_type: fanout: 发布订阅模式, fanout纯广播/all。
                                这种模式下,传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。
                              direct:路由模式 direct有选择的接受消息
                                这种工作模式的原理是 消息发送至 exchange,exchange 根据路由键(routing_key)转发到相对应的 queue 上。
                              topic: 主题模式 按照通配符指定发送到队列   规则播
                                这种模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发消息到指定的 queue 。
                                不同点是 routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,
                                比如“#”是匹配全部,“*”是匹配一个词。
        :param durable: 持久化存储
        :return:
        """
        _result = self.channel.queue_declare(queue=queue, durable=durable)
        self.channel.exchange_declare(exchange=exchange, durable=durable, exchange_type=exchange_type)
        self.channel.queue_bind(exchange=exchange, queue=_result.method.queue,
                                routing_key=routing_key)

    def exchange_bind_exchange(self, destination, source, routing_key=''):
        """
        交换器绑定
        :param destination: 目标交换器
        :param source: 源交换器
        :param routing_key: 路由规则
        :return:
        """
        self.channel.exchange_bind(destination=destination, source=source, routing_key=routing_key)

    def production(self, message, routing_key='', exchange='', delivery_mode=2, exchange_type=None):
        """
        生产队列信息
        在topic下指定routing_key无用。
        :param message: 进MQ的信息
        :param routing_key: 路由键
        :param exchange: 此模式下需要选择类型。分别为发布订阅和发布消费模式
        :param delivery_mode: 2 为持久化 1为非持久化 默认为2
        :param exchange_type: 'fanout': 发布订阅模式。direct:发布消费模式  topic: 发布消费模式,按规则播
        :return:
        """
        if routing_key == '' and exchange == '':
            raise TypeError('Either queue or exchange is assigned')
        # if exchange != '':
        #     raise TypeError('Please select exchange type direct or fanout')
        self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json.dumps(message),
                                   properties=pika.BasicProperties(delivery_mode=delivery_mode))

    def queue_consumptions(self, queue, auto_ack=False):
        """
        消费队列信息 这里的模式是生产消费一对一形式
        开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
        :param queue: 所需要消费队列的名称
        :param auto_ack: 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。
                               True,无论调用callback成功与否,消息都被消费掉
        :return:
        """
        # 告诉rabbitmq,用callback来接收消息
        self.channel.basic_consume(queue, self._callback, auto_ack=auto_ack)
        # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
        self.channel.start_consuming()

    def queue_consumption(self, queue, auto_ack=False):
        """
        消费队列信息 这里的模式是生产消费一对一形式
        一次只处理一条信息。
        :param queue: 所需要消费队列的名称
        :param auto_ack: 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。
                               True,无论调用callback成功与否,消息都被消费掉
        :return:
        """
        return self.channel.basic_get(queue, auto_ack=auto_ack)

    def close_mq(self):
        """
        关闭连接
        :return:
        """
        self.connection.close()

    def _callback(self, ch, method, properties, body):
        """
        队列有信息后就会回调此方法,通过初始化时送的方法 self._callback_fun将队列订阅调用到的信息回调处理。
        :param ch:
        :param method:
        :param properties:
        :param body:
        :return:
        """
        ch.basic_ack(delivery_tag=method.delivery_tag)
        # print(body.decode())
        self._callback_fun(body.decode())

测试:

# 1 队列 routing_key 为python.# 绑定全部 exchange 【sea_whales_1、sea_whales、sea_whales_foanout】
# 2 队列 routing_key 为#.# 绑定全部 exchange 【sea_whales_1、sea_whales、sea_whales_foanout】
# 2
rmc.production(message="topic_sea_whales_No_Routeing_key_notype", exchange='sea_whales_1')

rmc.production(message="sea_whales_No_Routeing_key_notype_1", exchange='sea_whales')
# 1 2
rmc.production(message="sea_whales_No_Routeing_key_notype_2", exchange='sea_whales_foanout')
# 不带key
# 2
rmc.production(message="topic_sea_whales_No_Routeing_key", exchange='sea_whales_1', exchange_type='topic')

rmc.production(message="direct_sea_whales_No_Routeing_key", exchange='sea_whales', exchange_type='direct')
# 1 2
rmc.production(message="fanout_sea_whales_No_Routeing_key", exchange='sea_whales_foanout', exchange_type='fanout')

# 带key 走统配
# 1 2
rmc.production(message="topic_sea_whales_python.234", routing_key='python.234', exchange='sea_whales_1', exchange_type='topic')

rmc.production(message="direct_sea_whales_python.234", routing_key='python.234', exchange='sea_whales', exchange_type='direct')
# 1 2
rmc.production(message="fanout_sea_whales_12.234", routing_key='python.234', exchange='sea_whales_foanout', exchange_type='fanout')

# 带key 带统配
# 1 2
rmc.production(message="topic_sea_whales_python.#", routing_key='python.#', exchange='sea_whales_1', exchange_type='topic')
# 1
rmc.production(message="direct_sea_whales_python.#", routing_key='python.#', exchange='sea_whales', exchange_type='direct')
# 1 2
rmc.production(message="fanout_sea_whales_12.#", routing_key='python.#', exchange='sea_whales_foanout', exchange_type='fanout')

# 带key 全部统配
# 2
rmc.production(message="topic_sea_whales_12.13", routing_key='12.123', exchange='sea_whales_1', exchange_type='topic')

rmc.production(message="direct_sea_whales_12.123", routing_key='12.123', exchange='sea_whales', exchange_type='direct')
# 1 2
rmc.production(message="fanout_sea_whales_12.123", routing_key='12.123', exchange='sea_whales_foanout', exchange_type='fanout')