博客
关于我
[源码分析] 消息队列 Kombu 之 Producer
阅读量:450 次
发布时间:2019-03-06

本文共 6046 字,大约阅读时间需要 20 分钟。

[源码分析] 消息队列 Kombu 之 Producer

摘要

Kombu 是一个兼容 AMQP 协议的消息队列抽象。通过本文,我们将深入分析 Kombu 中的 Producer 概念,了解其工作原理和实现细节。


示例代码

以下代码用于说明Producer 的工作流程。代码来源于 liqiang.io

def main(arguments):
hub = Hub()
exchange = Exchange('asynt_exchange')
queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # 退出后只接收一条消息
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print('kombu')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(3, p_message)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))

来由

在前文中,我们已经分析了 Kombu 的 Consumer 部分。现在我们将重点转向 Producer 部分。


建立

3.1 定义

Producer 类的定义如下:

class Producer:
"""消息生产者。
参数:
- channel (kombu.Connection, ChannelT): 连接或 channel。
- exchange (kombu.entity.Exchange, str): 可选默认交换机名。
- routing_key (str): 可选默认路由键。
- serializer (callable): 可选默认序列化方法,默认为 JSON。
- compression (str): 可选压缩方法,默认为 None。
- auto_declare (bool): 若设置了默认交换机名,发布消息时会自动声明。
- on_return (callable): 基本返回回调。
"""
exchange = None
routing_key = ''
serializer = None
compression = None
auto_declare = True
on_return = None
__connection__ = None

3.2 初始化

Producer 的初始化逻辑如下:

def __init__(self, channel, exchange=None, routing_key=None, 
serializer=None, auto_declare=None, compression=None,
on_return=None):
self._channel = channel
self.exchange = exchange
self.routing_key = routing_key or self.routing_key
self.serializer = serializer or self.serializer
self.compression = compression or self.compression
self.on_return = on_return or self.on_return
self._channel_promise = None
if self.exchange is None:
self.exchange = Exchange('')
if auto_declare is not None:
self.auto_declare = auto_declare
if self._channel:
self.revive(self._channel)

3.2.1 复活 channel

revive 方法将 channel 从连接池复活:

def revive(self, channel):  
"""复活生产者在连接丢失时的状态。"""
if is_connection(channel):
connection = channel
self.__connection__ = connection
channel = ChannelPromise(lambda: connection.default_channel)
if isinstance(channel, ChannelPromise):
self._channel = channel
self.exchange = self.exchange(channel)
else:
# channel 已是 concret Channel
self._channel = channel
if self.on_return:
self._channel.events['basic_return'].add(self.on_return)
self.exchange = self.exchange(channel)

发送

4.1 发送消息

通过 send_message 函数发送消息:

def send_message(conn):  
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt')
print('message sent')

4.2 内部实现

publish 方法的实现逻辑如下:

def _publish(self, body, priority, content_type, content_encoding,  
headers, properties, routing_key, mandatory,
immediate, exchange, declare):
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
content_encoding, headers, properties,
)
if declare:
maybe_declare = self.maybe_declare
[maybe_declare(entity) for entity in declare]
# 自动生成队列名称以备用于回复
reply_to = properties.get('reply_to')
if isinstance(reply_to, Queue):
properties['reply_to'] = reply_to.name
return channel.basic_publish(
message,
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
)

4.3 组装消息

prepare_message 方法负责组装消息:

def prepare_message(self, body, priority=None, content_type=None,  
content_encoding=None, headers=None, properties=None):
"""准备消息数据。"""
properties = properties or {}
properties.setdefault('delivery_info', {})
properties.setdefault('priority', priority or self.default_priority)
return {
'body': body,
'content-encoding': content_encoding,
'content-type': content_type,
'headers': headers or {},
'properties': properties or {},
}

4.4 发送消息

basic_publish 方法负责将消息发送到指定的交换机和队列:

def basic_publish(self, message, exchange, routing_key, **kwargs):  
"""发布消息。"""
self._inplace_augment_message(message, exchange, routing_key)
if exchange:
return self.typeof(exchange).deliver(
message, exchange, routing_key, **kwargs
)
# 匿名交换:路由键即为目标队列
return self._put(routing_key, message, **kwargs)

4.5 交换机交付

deliver 方法负责将消息交付给交换机:

class DirectExchange(ExchangeType):  
"""直接交换机。交换机根据路由键直接路由消息。"""
type = 'direct'
def lookup(self, table, exchange, routing_key, default):
return {
queue for rkey, _, queue in table
if rkey == routing_key
}
def deliver(self, message, exchange, routing_key, **kwargs):
_lookup = self.channel._lookup
_put = self.channel._put
for queue in _lookup(exchange, routing_key):
_put(queue, message, **kwargs)

4.6 将消息放入队列

_put 方法负责将消息放入目标队列:

def _put(self, queue, message, **kwargs):  
"""将消息放入目标队列。"""
pri = self._get_message_priority(message, reverse=False)
with self.conn_or_acquire() as client:
client.lpush(self._q_for_pri(queue, pri), dumps(message))

4.7 生成队列名称

_q_for_pri 方法用于生成队列名称:

def _q_for_pri(self, queue, pri):  
pri = self.priority(pri)
if pri:
return f"{queue}{self.sep}{pri}"
return queue

总结

通过以上分析可以看出,Kombu 的 Producer 负责将消息发布到指定的交换机和队列。消息的流程大致如下:

  • Producer 调用 publish 方法。
  • prepare_message 组装消息。
  • basic_publish 将消息发送到交换机。
  • 交换机根据路由键找到目标队列。
  • _put 方法将消息放入目标队列。
  • 消费者从队列中获取消息进行处理。
  • 整个过程依赖于 Redis 连接池和多渠道轮询器,确保消息的高效传输和处理。

    转载地址:http://dhcfz.baihongyu.com/

    你可能感兴趣的文章
    NI笔试——大数加法
    查看>>
    NLog 自定义字段 写入 oracle
    查看>>
    NLog类库使用探索——详解配置
    查看>>
    NLP 基于kashgari和BERT实现中文命名实体识别(NER)
    查看>>
    NLP 项目:维基百科文章爬虫和分类【01】 - 语料库阅读器
    查看>>
    NLP_什么是统计语言模型_条件概率的链式法则_n元统计语言模型_马尔科夫链_数据稀疏(出现了词库中没有的词)_统计语言模型的平滑策略---人工智能工作笔记0035
    查看>>
    NLP学习笔记:使用 Python 进行NLTK
    查看>>
    NLP的神经网络训练的新模式
    查看>>
    NLP采用Bert进行简单文本情感分类
    查看>>
    NLP问答系统:使用 Deepset SQUAD 和 SQuAD v2 度量评估
    查看>>
    NLP:使用 SciKit Learn 的文本矢量化方法
    查看>>
    Nmap扫描教程之Nmap基础知识
    查看>>
    Nmap端口扫描工具Windows安装和命令大全(非常详细)零基础入门到精通,收藏这篇就够了
    查看>>
    NMAP网络扫描工具的安装与使用
    查看>>
    NMF(非负矩阵分解)
    查看>>
    nmon_x86_64_centos7工具如何使用
    查看>>
    NN&DL4.1 Deep L-layer neural network简介
    查看>>
    NN&DL4.3 Getting your matrix dimensions right
    查看>>
    NN&DL4.8 What does this have to do with the brain?
    查看>>
    nnU-Net 终极指南
    查看>>