# !/usr/bin/env python3
# -*- coding: utf-8 -*-
import pika
from retrying import retry
from getConfig import getConfigData
RabbitMQConfig = getConfigData('queueConfig.json')
def Singleton(cls):
_instance = {}
def _singleton(*args, **kwargs):
if cls not in _instance:
_instance[cls] = cls(*args, **kwargs)
return _instance[cls]
return _singleton
# @Singleton
def MQConnect():
credentials = pika.PlainCredentials(username=RabbitMQConfig.get('user'),
password=RabbitMQConfig.get('password')) # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(
pika.ConnectionParameters(credentials=credentials,
host=RabbitMQConfig.get('host'),
port=RabbitMQConfig.get('port'),
virtual_host=RabbitMQConfig.get('virtual_host')
))
return connection.channel()
# retry 报错(异常)重试机制
@retry()
def MQGetQueueMessages(queue, callback, auto_ack=False):
"""
消费队列信息 这里的模式是生产消费一对一形式
开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
:param callback: 消费者回调函数
:param queue: 所需要消费队列的名称
:param auto_ack: 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。
True,无论调用callback成功与否,消息都被消费掉
比如我从消息里拿一条消息,如果全处理完,你就不要帮我记着了。
如果没处理完,突然断开了,再连接上的时候,消息队列就会重新发消息。
:return:
"""
channel = MQConnect()
def _callback(ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
callback(body.decode())
channel.basic_consume(queue, _callback, auto_ack=auto_ack)
channel.start_consuming()
@retry()
def MQSetQueueMessages(interface: str, message: str):
"""
生产者,给MQ发送消息。只需要按照配置信息内填写过的参数,然后将消息内容字符串形式送入,后关闭连接。
:param interface: 根据配置信息中对应接口名称
:param message: 发送信息
:return:
"""
channel = MQConnect()
queue_info = RabbitMQConfig.get(interface)
if queue_info:
try:
channel.basic_publish(body=str.encode(message),
exchange=queue_info.get('exchange'), # 交换器
routing_key=queue_info.get('routing_key'), # 路由键
properties=pika.BasicProperties(
delivery_mode=queue_info.get('delivery_mode'))) # 持久化
except TypeError as e:
raise ValueError('Message must be a string!')
else:
raise IOError('Queue information error!')
"""
使用pika连接rabbitmq,每次发送消息之后都需要关闭吗?
==> 不需要,如果是生产者,会自动关闭,如果是消费者,有心跳机制,保证不关闭
以上代码单例模式装饰器与自动重连机制冲突,则若需要单例则注释重连机制
(建议不使用,可以直接使用pika会自动关闭)
"""
import time
def cab(x):
print(x)
def setMessage():
i = 0
while i < 150:
MQSetQueueMessages(interface='test', message=f"{i}")
i += 1
time.sleep(2)
# if i % 5 == 0:
# raise ValueError('哎呀,手动抛出错误~')
def getMessage():
MQGetQueueMessages(queue='test', callback=cab)
if __name__ == '__main__':
import _thread
try:
_thread.start_new_thread(getMessage, ())
# _thread.start_new_thread(setMessage, ())
# time.sleep(4)
setMessage()
except Exception as e:
print(str(e))
最后一次更新于2021-11-16 10:47:55
0 条评论