oslo.messaging 中的 kombu

代码结构图如下:

                                                            +----------------------------------------+
                                                            |                                        |
nova.rpc.get_server --> messaging.server.get_rpc_server --> | messaging.server.MessageHandlingServer |
                                                            |                                        |
                                                            |   self.transport                       |
                                                            |         |                              |
                                                            +---------+------------------------------+
                                                                      |                                                                     
                                                                      v                                                                     
+----------------------------------------------+            +----------------------------------------+            +--------------------------+
|                                              |            |                                        |            |                          |
| messaging._drivers.impl_rabbit.RabbitDriver  |            | messaging.transport.Transport          |            |  messaging.rpc.RPCClient |    <---- nova.rpc.get_client
|                                              |            |                                        |            |                          |
|    self._connection_pool                     | <----------+-- self._driver                         |  <---------+---- self.transport       |
|            |                                 |            |                                        |            |                          |
+------------+---------------------------------+            +----------------------------------------+            |     self.call()          |
             |                                                                                                    |     self.cast()          |
             v                                                                                                    |                          | 
+----------------------------------------------+            +-------------------------------------------+         +--------------------------+ 
|                                              |            |                                           |
| messaging._driver.amqp.ConnectionPool        |            | messaging._drivers.impl_rabbit.Connection |
|                                              |            |                                           |
|   self.connection_cls -----------------------+----------> |   self.connection ------------------------+------------> kombu.connection.Connection()
|   self.create()                              |            |                                           |
|   self.get()                                 |            |   self.declare_consumer()        |        |              +------------------------------------------------+
|   self.put()                                 |            |   self.declare_direct_consumer() +--------+------------> |                                                |
+----------------------------------------------+            |   self.declare_fanout_consumer() |        |              | messaging._drivers.impl_rabbit.DirectConsumer  |
                                                            |   self.declare_topic_consumer()  |        |              | messaging._drivers.impl_rabbit.FanoutConsumer  |
                                                            |                                           |              | messaging._drivers.impl_rabbit.TopicConsumer   |
                                                            |   self.consumers                          |              |                                                |           
                                                            |   self.consume()                          |              |    self.exchange  -----------------------------+-------->  kombu.entity.Exchange()
                                                            |                                           |              |                                                |           
                                                            |                                           |              |    self.queue ---------------------------------+-------->  kombu.entity.Queue()
                                                            |   |self.direct_send()                     |              |                                                |           
                                                            |   |self.fanout_send()                     |              |    self.consume()  ----------------------------+-------->  kombu.entity.Queue().consume()
                                                            |   |self.topic_send()                      |              |                                                |           
                                                            |   |self.notify_send()                     |              +------------------------------------------------+           
                                                            |   |self.publisher_send()                  |                                                                         
                                                            |           |                               |
                                                            +-----------+-------------------------------+                                                                         
                                                                        |                                                                                                      
                                                                        |                                                                                                       
                                                                        v                                                                                                       

                                                            +------------------------------------------------+                                                        
                                                            |                                                |
                                                            | messaging._drivers.impl_rabbit.DirectPublisher |                                        
                                                            | messaging._drivers.impl_rabbit.FanoutPublisher |
                                                            | messaging._drivers.impl_rabbit.TopicPublisher  |
                                                            | messaging._drivers.impl_rabbit.NotifyPublisher |
                                                            |                                                |
                                                            |   self.exchange -------------------------------+--------->  kombu.entity.Exchange()
                                                            |                                                |
                                                            |   self.producer -------------------------------+--------->  kombu.messaging.Producer()
                                                            |                                                |
                                                            |   self.send() ---------------------------------+--------->  kombu.messaging.Producer().publish()
                                                            |                                                |
                                                            +------------------------------------------------+

Transport

以 nova 为例,每个 nova-* 进程有一个全局的 Transport 实例,该实例是对消息库如
kombu 的封装。该全局的实例在 nova/rpc.py 中创建。

每个进程的 main() 都会有这一句:

config.parse_args(sys.argv)

parse_args 中调用了rpc.init:

def parse_args(argv, default_config_files=None):

    ... ... 
    rpc.init(CONF)

下面看一下 rpc.init 如何初始化 Transport 实例:

def init(conf):
    global TRANSPORT, NOTIFIER
    exmods = get_allowed_exmods()
    # 调用 oslo.messaging 接口来创建 Transport 实例。
    TRANSPORT = messaging.get_transport(conf,
                                        allowed_remote_exmods=exmods,
                                        aliases=TRANSPORT_ALIASES)

get_transport 根据配置加载相应的消息库 driver,配置与 driver 的对应关系
在 setup.cfg 中定义:

# oslo.messaging/setup.cfg
[entry_points]
oslo.messaging.drivers =
    rabbit = oslo.messaging._drivers.impl_rabbit:RabbitDriver
    kombu = oslo.messaging._drivers.impl_rabbit:RabbitDriver


# oslo/messaging/transport.py
# get_transport 使用 stevedore 来管理消息库模块
from stevedore import driver
def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
    try:
        mgr = driver.DriverManager('oslo.messaging.drivers',
                                url.transport.split('+')[0],
                                invoke_on_load=True,
                                invoke_args=[conf, url],
                                invoke_kwds=kwargs)
    except RuntimeError as ex:
        raise DriverLoadFailure(url.transport, ex)

    return Transport(mgr.driver)

Transport 实例的几个方法,分别去直接调用相应消息库 driver 的方法。

# oslo/messaging/transport.py
class Transport(object):
    def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
            retry=None):
        if not target.topic:
            raise exceptions.InvalidTarget('A topic is required to send',
                                        target)
        return self._driver.send(target, ctxt, message,
                                wait_for_reply=wait_for_reply,
                                timeout=timeout, retry=retry)

    def _listen(self, target):
        if not (target.topic and target.server):
            raise exceptions.InvalidTarget('A server\'s target must have '
                                        'topic and server names specified',
                                        target)
        return self._driver.listen(target)

MessageHandlingServer

每个充当 rpcserver 角色的 nova-* 进程,都有自己的 MessageHandlingServer 实例。
创建 MessageHandlingServer 实例时将全局的 Transport 实例传递进去,
MessageHandlingServer 对与消息服务器的交互都通过 Transport 实例的方法来完成,
具体来说主要就是 Transport()._listen()。

#nova/service.py
class Service(service.Service):
    ... ...
    def start(self):
        ... ...
        self.rpcserver = rpc.get_server(target, endpoints, serializer)
        self.rpcserver.start()

# nova/rpc.py
def get_server(target, endpoints, serializer=None):
    assert TRANSPORT is not None
    serializer = RequestContextSerializer(serializer)
    return messaging.get_rpc_server(TRANSPORT,
                                    target,
                                    endpoints,
                                    executor='eventlet',
                                    serializer=serializer)

# oslo/messaging/rpc/server.py
def get_rpc_server(transport, target, endpoints,
                executor='blocking', serializer=None):
    dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
    return msg_server.MessageHandlingServer(transport, dispatcher, executor)

在服务启动的时候,调用 Transport 实例的 _listen 方法。该方法会通过底层的
消息库在消息服务器上创建消息交换机与相关队列。

# MessageHandlingServer
def start(self):
    try:
        listener = self.dispatcher._listen(self.transport)
    except driver_base.TransportDriverError as ex:
        raise ServerListenError(self.target, ex)

# RPCDispatcher
def _listen(self, transport):
    return transport._listen(self._target)

RPCClient

需要往消息服务器发送消息时,生成 RPCClient 的实例,然后调用实例的 call 与 cast
方法来进行消息的发送。而这两个方法,则直接调用 Transport()._send() 方法。

# oslo/messaging/rpc/client.py
class RPCClient(object):

    def call(self, ctxt, method, **kwargs):
        ... ...
        try:
            result = self.transport._send(self.target, msg_ctxt, msg,
                                        wait_for_reply=True, timeout=timeout,
                                        retry=self.retry)
        except driver_base.TransportDriverError as ex:
            raise ClientSendError(self.target, ex)
        ... ...


    def cast(self, ctxt, method, **kwargs):
        ... ...
        try:
            self.transport._send(self.target, ctxt, msg, retry=self.retry)
        except driver_base.TransportDriverError as ex:
            raise ClientSendError(self.target, ex)
        ... ...

RabbitDriver

RabbitDriver 是对 kombu 消息库封装的一部分,结合连接封装 Connection 和连接池封装
ConnectionPool 对外提供消息的控制接口。 接口交付于 Transport 实例。

# oslo/messaging/_drivers/impl_rabbit.py 和 oslo/messaging/_drivers/amqpdriver.py
class RabbitDriver(amqpdriver.AMQPDriverBase):

    def __init__(self, conf, url,
                default_exchange=None,
                allowed_remote_exmods=None):
        # RabbitDriver 实例在初始化时,创建一个连接池,连接池 ConnectionPool
        # 的实现后续介绍
        connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection)

            ... ...
        self._connection_pool = connection_pool

    # 发送消息的接口, 上文已提到 Transport 实例的 _send 方法调用该方法。
    def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
            retry=None):
        return self._send(target, ctxt, message, wait_for_reply, timeout,
                        retry=retry)

    def _send(self, target, ctxt, message,
            wait_for_reply=None, timeout=None,
            envelope=True, notify=False, retry=None):

        try:
            # get_connection 从 connection_pool 中获取一个可用连接 Connection 实例。
            with self._get_connection() as conn:
                # 对应不同的消息发送方式,分别调用 Connection 实例的发送方法。
                if notify:
                    conn.notify_send(self._get_exchange(target),
                                    target.topic, msg, retry=retry)
                elif target.fanout:
                    conn.fanout_send(target.topic, msg, retry=retry)
                else:
                    topic = target.topic
                    if target.server:
                        topic = '%s.%s' % (target.topic, target.server)
                    conn.topic_send(exchange_name=self._get_exchange(target),
                                    topic=topic, msg=msg, timeout=timeout,
                                    retry=retry)
            ... ...

    # 建立对消息的监听,上文已提到 Transport 实例的 _listen 方法会调用该方法。
    def listen(self, target):
        # 首先从连接池中取到一个可用连接。
        conn = self._get_connection(pooled=False)

        # 创建一个工厂类来做为后续所有 Consumer 的回调函数,负责处理收到的消息。
        # AMQPListener 定义了 __call__ 方法,可以直接调用。
        listener = AMQPListener(self, conn)

        # 通过 Connection 实例的 declare_* 方法创建消息消费者
        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic=target.topic,
                                    callback=listener)
        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic='%s.%s' % (target.topic,
                                                    target.server),
                                    callback=listener)
        conn.declare_fanout_consumer(target.topic, listener)

        return listener

Connection 和 ConnectionPool

前方已提到 Oslo.messaging 实现了自己的连接池机制,而没有使用 kombu 所提供的。
Oslo.messaging 的连接池是 collections.deque() 的数据结构。

ConnectionPool 的实现如下:

# oslo/messaging/_drivers/pool.py 和 oslo/messaging/_drivers/amqp.py
class ConnectionPool(pool.Pool):
    def __init__(self, max_size=4):
        ...  ...
        # 定义连接池连接数最大值,默认为 4
        self._max_size = max_size
        # 定前连接数
        self._current_size = 0
        self._cond = threading.Condition()
        # 连接池数据结构,放置所有的 Connection 实例。
        self._items = collections.deque()

    # 从连接池中获取一个连接的接口,注意该接口同时也是往连接池中新建连接的接口
    # 接口中没有明显的将连接添加进 self._items 数据结构,实际上一个连接在此方法
    # 中新建,当连接使用完毕进行释放时,会由 put 接口加到 self._items 中。
    def get(self):
        with self._cond:
            while True:
                try:
                    # 首先尝试从连接池数据结构中获取新的可用连接。
                    return self._items.popleft()
                except IndexError:
                    pass

                # 如果没有可用连接,且连接数未达上限,则允许跳出循环下文调用
                # create 方法新建连接。
                if self._current_size < self._max_size:
                    self._current_size += 1
                    break

                # 如果没有可用连接,且连接数已达上限,则只能等待连接放回连接池
                # 的信号,注意有超时限值。
                self._cond.wait(timeout=1)

        try:
            # 新建一个连接 Connection 实例
            return self.create()
        except Exception:
            with self._cond:
                # 建立失败,回滚连接池当前数量
                self._current_size -= 1
            raise

    # 新使用完的连接放回 self._items 数据结构。
    def put(self, item):
        """Return an item to the pool."""
        with self._cond:
            self._items.appendleft(item)
            self._cond.notify()

    # 新建连接实例
    def create(self):
        # connection_cls 对于 Rabbitmq 来说就是 messaging._drivers.impl_rabbit.Connection
        return self.connection_cls(self.conf, self.url)

ConnectionPool 与 Connection 实例之间有一个类型用于衔接,
上文略过未提的 get_connection 方法,可以看到它拿到的是 ConnectionContext 实例。

# oslo/messaging/_drivers/amqpdriver.py
def _get_connection(self, pooled=True):
        return rpc_amqp.ConnectionContext(self.conf,
                                        self._url,
                                        self._connection_pool,
                                        pooled=pooled)

ConnectionContext 实例所起的最主要作用即是协同维护 ConntionPool 的数据结构。

class ConnectionContext(rpc_common.Connection):

    def __init__(self, conf, url, connection_pool, pooled=True):
        ... ...
        self.connection_pool = connection_pool
        if pooled:
        # 从连接池获取连接
            self.connection = connection_pool.get()
        else:
        # ConnectionContext 的另一部分功能,处理不使用连接池的情况。
        # 直接创建真正的连接实例。
            self.connection = connection_pool.connection_cls(conf, url)
        # 标识是否使用了连接池,默认为 True。
        self.pooled = pooled

    def _done(self):
    # 还记得前文所说,连接池中的连接都是由 put 方法放回连接池结构中的吗。
    # 在各种场景下,销毁一个 ConnectionContext 都会走到这里,实际将 Connection
    # 实例通过 connection_pool.put() 方法将实例放回连接池。
        if self.connection:
            if self.pooled:
                # Reset the connection so it's ready for the next caller
                # to grab from the pool
                self.connection.reset()
                self.connection_pool.put(self.connection)
            else:
            # 如果不使用连接池,则可以真正去关闭连接。
                try:
                    self.connection.close()
                except Exception:
                    pass
            self.connection = None

    # 以下三个方法,保证各种销毁 ConnectionContext 时,都不真得销毁,而是
    # 调用 _done 方法。
    def __exit__(self, exc_type, exc_value, tb):
    # 对于与 with 语句搭配使用的情况时调用,释放 Connection
        self._done()

    def __del__(self):
    # 直接清理 ConnectionContext 实例时调用,释放 Connection
        self._done()

    def close(self):
    # 显式关闭连接时调用
        self._done()

通过以上代码的分析,可以理清连接池 ConnectionPool 的维护逻辑。下面可以看一下
Connection 实例的结构如何。

下面以 publisher_send 发送消息,与 declare_consumer 创建消息消费者为例
分别分析一下 Connection 实例。 direct_send 与 declare_direct_consumer
等都是调用这两个方法来工作的。

class Connection(object):

    def __init__(self, conf, url):
        ... ...
        # 真正的 kombu Connection 是本 Connection 实例封装的对象。
        self.connection = kombu.connection.Connection(
            self._url, ssl=self._ssl_params, login_method=self._login_method,
            failover_strategy="shuffle")
        ... ...

    def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
                    **kwargs):
        ... ...
        def _publish():
        # 每次发送消息时,都会生成一个 Publisher 实例,并执行消息的发送动作
        # Publisher 实例实现见下文。
            publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
            publisher.send(msg, timeout)

        # self.ensure 处理发送消息的重试与错误处理。
        self.ensure(_error_callback, _publish, retry=retry)

    def declare_consumer(self, consumer_cls, topic, callback):

        def _declare_consumer():
            # 创建 Consumer 实例,具体实例为 DirectConsumer 或 TopicConsumer 等。
            # DirectConsumer 见下文。
            consumer = consumer_cls(self.conf, self.channel, topic, callback,
                                    six.next(self.consumer_num))
            # 维护了 consumers 列表,包含本连接所有 Consumer
            self.consumers.append(consumer)
            return consumer

        # self.ensure 处理发送消息的重试与错误处理。
        return self.ensure(_connect_error, _declare_consumer)


class DirectPublisher(Publisher):

    def __init__(self, channel, exchange_name, routing_key, **kwargs):
        ... ...
        # 初始化时调用重连逻辑。
        self.reconnect(channel)

    def reconnect(self, channel):
        ... ...
        # 将创建 Exchange 与 Producer 的逻辑直接写在重连方法中,故初次连接与故障
        # 重连可以使用相同的一条逻辑。
        self.exchange = kombu.entity.Exchange(name=self.exchange_name,
                                            **self.kwargs)
        self.producer = kombu.messaging.Producer(exchange=self.exchange,
                                                channel=channel,
                                                routing_key=self.routing_key)
        ... ...

    def send(self, msg, timeout=None):
        ... ...
        # 调用底层的 Producer 来发送消息。
        self.producer.publish(msg, headers={'ttl': (timeout * 1000)})

class DirectConsumer(ConsumerBase):

    def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):

            ... ...
        # 创建消息交换机。
        self.exchange = kombu.entity.Exchange(name=msg_id,
                                            type='direct',
                                            durable=options['durable'],
                                            auto_delete=options['auto_delete'])
            ... ...

    def reconnect(self, channel):
        # 将创建 Queue 的逻辑直接写在重连方法中,故初次连接与故障
        # 重连可以使用相同的一条逻辑。
        self.channel = channel
        self.kwargs['channel'] = channel
        self.queue = kombu.entity.Queue(**self.kwargs)
        try:
            # 主动建立将要订阅的队列。
            self.queue.declare()
        except Exception as e:
            ... ...

    def consume(self, *args, **kwargs):
        ... ...
        # 获取一个队列的消息,注意这里省略了 kombu.Consumer,而是直接使用 Queue 的底层接口。
        self.queue.consume(*args, callback=_callback, **options)
        ... ...