博客
关于我
[源码分析] 消息队列 Kombu 之 Producer
阅读量:437 次
发布时间:2019-03-06

本文共 20916 字,大约阅读时间需要 69 分钟。

[源码分析] 消息队列 Kombu 之 Producer

目录

0x00 摘要

本系列我们介绍消息队列 Kombu。Kombu 的定位是一个兼容 AMQP 协议的消息队列抽象。通过本文,大家可以了解 Kombu 中的 Producer 概念。

0x01 示例代码

下面使用如下代码来进行说明。

本示例来自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感谢。

def main(arguments):    hub = Hub()    exchange = Exchange('asynt_exchange')    queue = Queue('asynt_queue', exchange, 'asynt_routing_key')    def send_message(conn):        producer = Producer(conn)        producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')        print('message sent')    def on_message(message):        print('received: {0!r}'.format(message.body))        message.ack()        # hub.stop()  # <-- exit after one message    conn = Connection('redis://localhost:6379')    conn.register_with_event_loop(hub)    def p_message():        print(' kombu ')    with Consumer(conn, [queue], on_message=on_message):        send_message(conn)        hub.timer.call_repeatedly(3, p_message)        hub.run_forever()if __name__ == '__main__':    sys.exit(main(sys.argv[1:]))

0x02 来由

前文已经完成了构建部分,Consumer部分,下面来到了Producer部分,即如下代码:

def send_message(conn):		producer = Producer(conn)    producer.publish('hello world', exchange=exchange, routing_key='asynt')   	print('message sent')

我们知道,Transport需要把Channel与文件信息联系起来,但是此时Transport信息如下,文件信息依然没有,这是我们以后需要留意的

transport = {Transport} 
Channel = {type}
Cycle = {type}
Management = {type}
channel_max = {int} 65535 channels = {list: 2} [
,
] client = {Connection}
cycle = {MultiChannelPoller}
after_read = {set: 0} set() eventflags = {int} 25 fds = {dict: 0} {} poller = {_poll}
default_connection_params = {dict: 2} {'port': 6379, 'hostname': 'localhost'} default_port = {int} 6379 driver_name = {str} 'redis' driver_type = {str} 'redis' implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'topic', 'fanout'}), 'heartbeats': False} manager = {Management}
polling_interval = {NoneType} None state = {BrokerState}

0x03 建立

3.1 定义

Producer中,主要变量是:

  • _channel :就是channel;
  • exchange :exchange;

但是本文示例没有传入exchange,这就有些奇怪,我们需要继续看看

class Producer:    """Message Producer.    Arguments:        channel (kombu.Connection, ChannelT): Connection or channel.        exchange (kombu.entity.Exchange, str): Optional default exchange.        routing_key (str): Optional default routing key.    """    #: Default exchange    exchange = None    #: Default routing key.    routing_key = ''    #: Default serializer to use. Default is JSON.    serializer = None    #: Default compression method.  Disabled by default.    compression = None    #: By default, if a defualt exchange is set,    #: that exchange will be declare when publishing a message.    auto_declare = True    #: Basic return callback.    on_return = None    #: Set if channel argument was a Connection instance (using    #: default_channel).    __connection__ = None

3.2 init

init代码如下。

def __init__(self, channel, exchange=None, routing_key=None,             serializer=None, auto_declare=None, compression=None,             on_return=None):    self._channel = channel    self.exchange = exchange    self.routing_key = routing_key or self.routing_key    self.serializer = serializer or self.serializer    self.compression = compression or self.compression    self.on_return = on_return or self.on_return    self._channel_promise = None    if self.exchange is None:        self.exchange = Exchange('')    if auto_declare is not None:        self.auto_declare = auto_declare    if self._channel:        self.revive(self._channel)

3.2.1 转换channel

这里有个重要转换。

  • 最开始是把输入参数 Connection 赋值到 self._channel。
  • 然后 revive 方法做了转换为 channel,即 self._channel 最终是 channel 类型。

但是 exchange 依然没有意义,是 direct 类型。

代码如下:

def revive(self, channel):    """Revive the producer after connection loss."""    if is_connection(channel):        connection = channel        self.__connection__ = connection        channel = ChannelPromise(lambda: connection.default_channel)    if isinstance(channel, ChannelPromise):        self._channel = channel        self.exchange = self.exchange(channel)    else:        # Channel already concrete        self._channel = channel        if self.on_return:            self._channel.events['basic_return'].add(self.on_return)        self.exchange = self.exchange(channel)

此时变量为:

producer = {Producer}  auto_declare = {bool} True channel = {Channel} 
compression = {NoneType} None connection = {Connection}
exchange = {Exchange} Exchange ''(direct) on_return = {NoneType} None routing_key = {str} '' serializer = {NoneType} None

逻辑如图:

+----------------------+               +-------------------+| Producer             |               | Channel           ||                      |               |                   |        +-----------------------------------------------------------+|                      |               |    client  +-------------> | Redis
|| channel +------------------> | | +-----------------------------------------------------------+| | | pool || exchange | +---------> | | <------------------------------------------------------------+| | | | | || connection | | +----> | connection +---------------+ || + | | | | | | |+--+-------------------+ | | +-------------------+ | | | | | | v | | | | | +-------------------+ +---+-----------------+ +--------------------+ | | | | | | Connection | | redis.Transport | | MultiChannelPoller | | | +----------------------> | | | | | | | | | | | | | | | _channels +--------+ | | | | | | cycle +------------> | _fd_to_chan | | | | | transport +---------> | | | _chan_to_sock | | +-------->+ | | | | | +------+ poller | | | | +-------------------+ +---------------------+ | | after_read | | | | | | | | | | | +--------------------+ | | | +------------------+ +---------------+ | | | | Hub | | | | | | | v | | | | | +------+------+ | | | | poller +---------------> | _poll | | publish | | | | | | +-------+ +--------------------------------+ | | | _poller+---------> | poll | | | | +------------------+ | | +-------+ | | | +-------------+ +-------------------+ | +-----> +----------------+ | Queue | | | | Exchange | | _channel | +---------+ | | | | | | | exchange +--------------------> | channel | | | | | | | | | +-------------------+ +----------------+

手机如图:

0x04 发送

发送消息是通过producer.publish完成。

def send_message(conn):    producer = Producer(conn)    producer.publish('hello world', exchange=exchange, routing_key='asynt')    print('message sent')

此时传入exchange作为参数。原来如果没有 Exchange,是可以在这里进行补救

producer.publish继续调用到如下,可以看到分为两步:

  • 调用channel的组装消息函数prepare_message
  • 调用channel的发送消息basic_publish

因此,最终发送消息还是通过channel完成。

def _publish(self, body, priority, content_type, content_encoding,             headers, properties, routing_key, mandatory,             immediate, exchange, declare):    channel = self.channel    message = channel.prepare_message(        body, priority, content_type,        content_encoding, headers, properties,    )    if declare:        maybe_declare = self.maybe_declare        [maybe_declare(entity) for entity in declare]    # handle autogenerated queue names for reply_to    reply_to = properties.get('reply_to')    if isinstance(reply_to, Queue):        properties['reply_to'] = reply_to.name    return channel.basic_publish(        message,        exchange=exchange, routing_key=routing_key,        mandatory=mandatory, immediate=immediate,    )

4.1 组装消息 in channel

channel 的组装消息函数prepare_message完成组装功能,基本上是为消息添加各种属性。

def prepare_message(self, body, priority=None, content_type=None,                    content_encoding=None, headers=None, properties=None):    """Prepare message data."""    properties = properties or {}    properties.setdefault('delivery_info', {})    properties.setdefault('priority', priority or self.default_priority)    return {'body': body,            'content-encoding': content_encoding,            'content-type': content_type,            'headers': headers or {},            'properties': properties or {}}

消息如下:

message = {dict: 5}  'body' = {str} 'aGVsbG8gd29ybGQ=' 'content-encoding' = {str} 'utf-8' 'content-type' = {str} 'text/plain' 'headers' = {dict: 0} {}  __len__ = {int} 0 'properties' = {dict: 5}   'delivery_mode' = {int} 2  'delivery_info' = {dict: 2} {'exchange': 'asynt_exchange', 'routing_key': 'asynt_routing_key'}  'priority' = {int} 0  'body_encoding' = {str} 'base64'  'delivery_tag' = {str} '1b03590e-501c-471f-a5f9-f4fdcbe3379a'  __len__ = {int} 5

4.2 发送消息 in channel

channel的发送消息basic_publish完成发送功能。此时使用了传入的参数exchange。

发送消息basic_publish方法是调用_put方法:

def basic_publish(self, message, exchange, routing_key, **kwargs):    """Publish message."""    self._inplace_augment_message(message, exchange, routing_key)    if exchange:        return self.typeof(exchange).deliver(            message, exchange, routing_key, **kwargs        )    # anon exchange: routing_key is the destination queue    return self._put(routing_key, message, **kwargs)

4.3 deliver in exchange

self.typeof(exchange).deliver代码接着来到exchange。本文是DirectExchange。

注意,这里用到了self.channel._put。就是Exchange的成员变量channel。

class DirectExchange(ExchangeType):    """Direct exchange.    The `direct` exchange routes based on exact routing keys.    """    type = 'direct'    def lookup(self, table, exchange, routing_key, default):        return {            queue for rkey, _, queue in table            if rkey == routing_key        }    def deliver(self, message, exchange, routing_key, **kwargs):        _lookup = self.channel._lookup        _put = self.channel._put        for queue in _lookup(exchange, routing_key):            _put(queue, message, **kwargs)

4.4 binding 转换

我们知道,Exchange的作用只是将发送的 routing_key 转化为 queue 的名字。这样发送就知道发到哪个 queue

因此依据_lookup方法得到对应的queue

def _lookup(self, exchange, routing_key, default=None):    """Find all queues matching `routing_key` for the given `exchange`.    Returns:        str: queue name -- must return the string `default`            if no queues matched.    """    if default is None:        default = self.deadletter_queue    if not exchange:  # anon exchange        return [routing_key or default]    try:        R = self.typeof(exchange).lookup(            self.get_table(exchange),            exchange, routing_key, default,        )    except KeyError:        R = []    if not R and default is not None:        warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(            exchange=exchange, routing_key=routing_key)),        )        self._new_queue(default)        R = [default]    return R

此处具体逻辑为:

第一,调用到channel的方法。这里的 exchange 名字为 asynt_exchange。

def get_table(self, exchange):    key = self.keyprefix_queue % exchange    with self.conn_or_acquire() as client:        values = client.smembers(key)        if not values:            raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))        return [tuple(bytes_to_str(val).split(self.sep)) for val in values]

我们看看Redis内容,发现集合内容如下:

127.0.0.1:6379> smembers _kombu.binding.asynt_exchange1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"

第二,因此得到对应binding为:

{b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}

即从 exchange 得到 routing_key ---> queue 的规则,然后再依据 routing_key 得到 queue。就知道 Consumer 和 Producer 需要依据哪个 queue 交换消息。

逻辑如下:

+---------------------------------+                                  |         exchange                |                                  |                                 |                 1 routing_key x  |                                 |+----------+                      |                                 |      +------------+| Producer |  +-----------------> |   routing_key x --->  queue x   |      |  Consumer  |+--------+-+                      |                                 |      +------------+         |                        |   routing_key y --->  queue y   |         |                        |                                 |           ^         |                        |   routing_key z --->  queue z   |           |         |                        |                                 |           |         |                        +---------------------------------+           |         |                                                                      |         |                                                                      |         |                                                                      |         |                                                                      |         |                                                                      |         |                                                                      |         |                                                                      |         |                                                                      |         |                                  +-----------+                       |         |        2 message                 |           |        3 message      |         +------------------------------->  |  queue X  |  +--------------------+                                            |           |                                            +-----------+

4.5 _put in channel

channel的_put 方法被用来继续处理,可以看到其最终调用到了client.lpush。

client为:

Redis
>>

代码为:

def _put(self, queue, message, **kwargs):    """Deliver message."""    pri = self._get_message_priority(message, reverse=False)    with self.conn_or_acquire() as client:        client.lpush(self._q_for_pri(queue, pri), dumps(message))

redis怎么区别不同的queue?

实际是每个 queue 被赋予一个字符串 name,这个 name 就是 redis 对应的 list 的 key。知道应该向哪个 list 放消息,后续就是向此 list 中 lpush 消息。

如下方法完成转换功能。

def _q_for_pri(self, queue, pri):    pri = self.priority(pri)    if pri:        return f"{queue}{self.sep}{pri}"    return queue

现在发消息之后,redis内容如下,我们可以看出来,消息作为list 的item,放入到之中。

127.0.0.1:6379> lrange asynt_queue 0 -11) "{\"body\": \"aGVsbG8gd29ybGQ=\", \"content-encoding\": \"utf-8\", \"content-type\": \"text/plain\", \"headers\": {}, \"properties\": {\"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"asynt_exchange\", \"routing_key\": \"asynt_routing_key\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"df7af424-e1ab-4c08-84b5-1cd5c97ed25d\"}}"127.0.0.1:6379>

0x05 总结

现在我们总结如下:

  • Producers: 发送消息的抽象类;
  • Consumers:接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息;
  • Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列;
  • Queue:对应的 queue 抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
  • Channel:与AMQP中概念类似,可以理解成共享一个Connection的多个轻量化连,就是真实redis连接;

于是逻辑链已经形成,大约是这样的:

  • Producer的publish方法接受参数Exchange,于是就发送消息到此Exchange;
  • Producer调用channel的组装消息函数prepare_message为消息添加各种属性;
  • Producer调用channel的发送消息basic_publish发送消息,此时使用了传入的参数exchange。
  • basic_publish方法调用exchange.deliver(exchange, routing_key)来发送消息;
  • Exchange中有成员变量Channel,也有成员变量Queues,每个queue对应一个routing_key;
  • deliver使用_lookup方法依据key得到对应的queue;
  • deliver使用Exchange成员变量Channel的_put方法来向queue中投放消息;
  • Channel拿到自己的redis连接池,即client为Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>;于是可以基于此进行redis操作;
  • redis怎么区别不同的queue,实际是每个queue被赋予一个字符串name,这就是redis对应的list的key;
  • 既然得到了名字为queue的list,则向此list中lpush消息。
  • Consumer去Queue取消息;

动态逻辑如下:

+------------+                        +------------+               +------------+      +-----------------------+       |  producer  |                        |  channel   |               |  exchange  |      | Redis
| +---+--------+ +----+-------+ +-------+----+ +----------+------------+ | | | | | | | |publish('', exchange, routing_key) | | | | | | | | prepare_message | | | | | | | | +----------------------------------> | | | | | | | | basic_publish (exchange, routing_key)| | | | | | | | +----------------------------------> | | | | | | | | | deliver(exchange, routing_key)| | | | | | | +-----------------------------> | | | | | | | | | | | | _lookup(exchange, routing_key) | | | | | | | | | | | _put(queue, message) | | | v | | | | <---------------------------+ | | | | | | | _q_for_pri(queue, pri) | | | + | | v | | | | | client.lpush | | | | +--------------------------------------------------> | | | | | v v v v

手机如下:

0xFF 参考

转载地址:http://dhcfz.baihongyu.com/

你可能感兴趣的文章
中国剩余定理证明过程
查看>>
java中Object.equals()简单用法
查看>>
poj 2187 Beauty Contest(凸包求解多节点的之间的最大距离)
查看>>
程序员的开发文档
查看>>
mybatis generator修改默认生成的sql模板
查看>>
算法 - 如何从股票买卖中,获得最大收益
查看>>
算法 - 链表操作思想 && case
查看>>
C#之反射、元数据详解
查看>>
通俗易懂设计模式解析——单例模式
查看>>
通俗易懂设计模式解析——抽象工厂模式
查看>>
前端数据渲染及mustache模板引擎的简单实现
查看>>
设计模式系列之工厂模式三兄弟(Factory Pattern)
查看>>
OAuth2.0认证详解
查看>>
在滴滴和头条干了 2 年后端开发,太真实…
查看>>
你还在用命令看日志?快用 Kibana 吧,一张图片胜过千万行日志!
查看>>
Linux中对用户操作
查看>>
Linux查看CUDA和cuDNN版本
查看>>
C#获取Excel中所有的Sheet名称
查看>>
[最全整理]关于决策树的一切
查看>>
100天搞定机器学习|Day9-12 支持向量机
查看>>