本文共 6046 字,大约阅读时间需要 20 分钟。
Kombu 是一个兼容 AMQP 协议的消息队列抽象。通过本文,我们将深入分析 Kombu 中的 Producer 概念,了解其工作原理和实现细节。
以下代码用于说明Producer 的工作流程。代码来源于 liqiang.io。
def main(arguments): hub = Hub() exchange = Exchange('asynt_exchange') queue = Queue('asynt_queue', exchange, 'asynt_routing_key') def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt') print('message sent') def on_message(message): print('received: {0!r}'.format(message.body)) message.ack() # hub.stop() # 退出后只接收一条消息 conn = Connection('redis://localhost:6379') conn.register_with_event_loop(hub) def p_message(): print('kombu') with Consumer(conn, [queue], on_message=on_message): send_message(conn) hub.timer.call_repeatedly(3, p_message) hub.run_forever()if __name__ == '__main__': sys.exit(main(sys.argv[1:])) 在前文中,我们已经分析了 Kombu 的 Consumer 部分。现在我们将重点转向 Producer 部分。
Producer 类的定义如下:
class Producer: """消息生产者。 参数: - channel (kombu.Connection, ChannelT): 连接或 channel。 - exchange (kombu.entity.Exchange, str): 可选默认交换机名。 - routing_key (str): 可选默认路由键。 - serializer (callable): 可选默认序列化方法,默认为 JSON。 - compression (str): 可选压缩方法,默认为 None。 - auto_declare (bool): 若设置了默认交换机名,发布消息时会自动声明。 - on_return (callable): 基本返回回调。 """ exchange = None routing_key = '' serializer = None compression = None auto_declare = True on_return = None __connection__ = None
Producer 的初始化逻辑如下:
def __init__(self, channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None): self._channel = channel self.exchange = exchange self.routing_key = routing_key or self.routing_key self.serializer = serializer or self.serializer self.compression = compression or self.compression self.on_return = on_return or self.on_return self._channel_promise = None if self.exchange is None: self.exchange = Exchange('') if auto_declare is not None: self.auto_declare = auto_declare if self._channel: self.revive(self._channel) revive 方法将 channel 从连接池复活:
def revive(self, channel): """复活生产者在连接丢失时的状态。""" if is_connection(channel): connection = channel self.__connection__ = connection channel = ChannelPromise(lambda: connection.default_channel) if isinstance(channel, ChannelPromise): self._channel = channel self.exchange = self.exchange(channel) else: # channel 已是 concret Channel self._channel = channel if self.on_return: self._channel.events['basic_return'].add(self.on_return) self.exchange = self.exchange(channel)
通过 send_message 函数发送消息:
def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt') print('message sent') publish 方法的实现逻辑如下:
def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare): channel = self.channel message = channel.prepare_message( body, priority, content_type, content_encoding, headers, properties, ) if declare: maybe_declare = self.maybe_declare [maybe_declare(entity) for entity in declare] # 自动生成队列名称以备用于回复 reply_to = properties.get('reply_to') if isinstance(reply_to, Queue): properties['reply_to'] = reply_to.name return channel.basic_publish( message, exchange=exchange, routing_key=routing_key, mandatory=mandatory, immediate=immediate, ) prepare_message 方法负责组装消息:
def prepare_message(self, body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): """准备消息数据。""" properties = properties or {} properties.setdefault('delivery_info', {}) properties.setdefault('priority', priority or self.default_priority) return { 'body': body, 'content-encoding': content_encoding, 'content-type': content_type, 'headers': headers or {}, 'properties': properties or {}, } basic_publish 方法负责将消息发送到指定的交换机和队列:
def basic_publish(self, message, exchange, routing_key, **kwargs): """发布消息。""" self._inplace_augment_message(message, exchange, routing_key) if exchange: return self.typeof(exchange).deliver( message, exchange, routing_key, **kwargs ) # 匿名交换:路由键即为目标队列 return self._put(routing_key, message, **kwargs)
deliver 方法负责将消息交付给交换机:
class DirectExchange(ExchangeType): """直接交换机。交换机根据路由键直接路由消息。""" type = 'direct' def lookup(self, table, exchange, routing_key, default): return { queue for rkey, _, queue in table if rkey == routing_key } def deliver(self, message, exchange, routing_key, **kwargs): _lookup = self.channel._lookup _put = self.channel._put for queue in _lookup(exchange, routing_key): _put(queue, message, **kwargs) _put 方法负责将消息放入目标队列:
def _put(self, queue, message, **kwargs): """将消息放入目标队列。""" pri = self._get_message_priority(message, reverse=False) with self.conn_or_acquire() as client: client.lpush(self._q_for_pri(queue, pri), dumps(message))
_q_for_pri 方法用于生成队列名称:
def _q_for_pri(self, queue, pri): pri = self.priority(pri) if pri: return f"{queue}{self.sep}{pri}" return queue 通过以上分析可以看出,Kombu 的 Producer 负责将消息发布到指定的交换机和队列。消息的流程大致如下:
publish 方法。prepare_message 组装消息。basic_publish 将消息发送到交换机。_put 方法将消息放入目标队列。整个过程依赖于 Redis 连接池和多渠道轮询器,确保消息的高效传输和处理。
转载地址:http://dhcfz.baihongyu.com/