上篇的 MessageHandlingServer 继续, 这一篇专注于梳理 oslo.messaging 中一个 rpc server 对消息的处理流程。

openstack 各服务 nova-compute, neutron-server, cinder-volume 等等每一个都对外提供了 rpc 服务,接受其它组件的 rpc 调用。例如,nova-api 接到一个创建虚拟机的请求,就是通过调用 nova-compute 的一个创建虚拟机的 rpc 接口来进行虚拟机创建的。

 +------------------------+----------------+                                                                                                                                     
 |  messaging.server.MessageHandlingServer |                                                                         +------------------------------------------------+          
 |                                         |        +--------------------------------------------------------+       |   messaging/_drivers/amqpdriver.AMQPListener   |          
 |    self._executor            -----------+----->  |  messaging._executors.impl_eventlet.EventletExecutor   |       |                                                |          
 |                                         |        |                                                        |       |     self.conn                                  |          
 |    self.transport                       |        |    self.listener                   --------------------+--->   |                                                |          
 |                                         |        |                                                        |       |                                                |          
 |    self.start()              -----------+--------+->  self.start()                     -------------------+-------+-->  self.poll()                                |          
 +-----------------------------------------+        |    self._thread                                        |       |                                                |          
                                                    |    self._greenpool                                     |       |     self.__call__()                            |          
                                                    |                                                        |       |                                                |          
                                                    |    self.dispatcher                                     |       |     self.incoming                              |          
                                                    +------------+-------------------------------------------+       +-----------+------------------------------------+          
                                                                 |                                                               |                                               
                                                                 V                                                               V                                               
                                                    +-------------------------------------------+                    +------------------------------------------------------+    
                                                    |   messaging.rpc.dispatcher.RPCDispatcher  |                    |    messaging/_drivers/amqpdriver.AMQPIncomingMessage |    
                                                    |                                           |                    |                                                      |    
                                                    |     self.endpoints                        |                    |                                                      |    
                                                    |                                           |                    |                                                      |    
                                                    |     self.__call__()     ------------------+--------------------+--->  self.acknowledge()                              |    
                                                    |                                           |                    |                                                      |    
                                                    |     self._dispatch_and_reply()  ----------+--------------------+--->  self.reply()                                    |    
                                                    |     self._dispatch()                      |                    |                                                      |    
                                                    |     self._do_dispatch()                   |                    |                                                      |    
                                                    +-------------------------------------------+                    +------------------------------------------------------+    
  • MessageHandlingServer

    每个像 nova-compute 这样对外提供 rpc 接口的服务,都需要创建一个 MessageHandlingServer 对象。这个 MessageHandlingServer 承载着本服务全局唯一的一个
    TRANSPORT 实例,一个 TRANSPORT 实例下又对应一个全局唯一的 amqpdriver, amqpdriver 中就是上一篇所说的 rabbitmq 连接池。作为服务的 MessageHandlingServer 从连接池中取出一个连接,在此连接中不停的进行 consume 操作,以不断地拿到外界对自己的请求消息。

    当启动一个 MessageHandlingServer 之后,就启动了一个 executor

    # messaging.server.py
    def start(self):
    
    
    
    if self._executor is not None:
        # 如果已有一个 executor,说明 start 被重复调用了,此时什么也不做。
        return
    
    
    try:
        # 这个调用绕来绕去,实际上等同于 listener = self.transport.listen()
        # 在上一篇中,我们知道 transport.listen() 就是从连接池中取出一个连接,创建消息消费者,并新建一个
        # AMQPListener 来作为所有消费者的回调。
        listener = self.dispatcher._listen(self.transport)
    except driver_base.TransportDriverError as ex:
        raise ServerListenError(self.target, ex)
    
    
    # 创建进程全局唯一的一个EventletExecutor,理论上 executor 可以有多种,目前 oslo.messaging
    # 实现的 executor 只有两类,一个基于 eventlet 的, 一个是单线程阻塞式的,后者只是作为示例。
    # 两个实现分别在 messaging/_executors/impl_eventlet.py 和 messaging/_executors/impl_blocking.py 
    self._executor = self._executor_cls(self.conf, listener,
                                        self.dispatcher)
    self._executor.start()
    

  • EventletExecutor

    executor 启动时,会启动一个 executor 协程来不断地对 listener 进行 poll 操作,并创建一个协程池。每次 poll
    操作获取到消息,都从连接池中拿一个可用协程,并将消息传递给 dispatcher,由 dispatcher 来分发消息。

    # messaging/_executors/impl_eventlet.py 
    def start(self):
        if self._thread is not None:
            # self._thread 变量承载的是 executor 协程,如果该协程已在,说明 start 方法重复调用了,此时什么也不做。
            return
    
    
    
    # 这个 forever_retry_uncaught_exceptions 旨在捕获 executor
    # 协程中的所有异常,在任何情况下都维持协程的持续运行。
    # 这样说有点高大上,其实就是捕获所有异常,然后打印日志并重新运行协程。
    @excutils.forever_retry_uncaught_exceptions
    def _executor_thread():
        # 这是 executor 协程的工作内容
        try:
            while self._running:
                # 从 listener 中 poll 到一个新消息放在 incoming 之中。
                incoming = self.listener.poll()
                if incoming is not None:
                    # 将这个消息传递给 dispatcher,spawn_with(xxx, pool=self._greenpool) 是指使用协程池
                    # self._greenpool 中一个协程来执行 xxx。
                    spawn_with(ctxt=self.dispatcher(incoming),
                            pool=self._greenpool)
        except greenlet.GreenletExit:
            return
    
    
    self._running = True
    # 启动 executor 协程
    self._thread = eventlet.spawn(_executor_thread)
    

  • AMQPListener

    AMQPListener 作为一个回调类,需要提供 __call__ 方法,当有一个新消息来到的时候,
    就会把消息作为参数传递给 __call__ 方法。

    class AMQPListener(base.Listener):
    
    
    
    def __init__(self, driver, conn):
        ... ...
        # self.conn 为 messaging._drivers.impl_rabbit.Connection 类型。
        self.conn = conn
        # 收到的消息的列表
        self.incoming = []
        ... ...
    
    
    def __call__(self, message):
        ...  ...
        # 每收到一个新的消息,都把基于消息生成 AMQPIncomingMessage 对象,并放置到消息列表中。
        self.incoming.append(AMQPIncomingMessage(self,
                                                ctxt.to_dict(),
                                                message,
                                                unique_id,
                                                ctxt.msg_id,
                                                ctxt.reply_q))
    
    
    def poll(self, timeout=None):
        while not self._stopped.is_set():
            if self.incoming:
                # 如果消息列表仍有消息未处理,则直接取出一个消息返回。
                return self.incoming.pop(0)
    
    
            # 消息列表中没有未处理消息,此时就对 Connection 进行 consume 操作,等待新消息的到来。
            try:
                self.conn.consume(limit=1, timeout=timeout)
            except rpc_common.Timeout:
                # 如果长时间没有消息到来,则返回 None,避免一直占用协程 CPU 时间。
                # 这样 executor 会再发起下一次 poll。
                return None
    

  • RPCDispatcher

    RPCDispatcher 否则消息的分发,在 executor 的 start 方法中, 我们看到代码以 self.dispatch(incoming) 的
    方式调用。所以 RPCDispatcher 也需要提供一个 __call__ 方法。之后 dispatcher 所作的事情,概括来说就是从
    incoming 中辨别出其目的 rpc 方法名,以及各 rpc 参数。从 endpoints 中取出相应的方法来调用,之后将返回值
    编码发回 rpc 方法调用者。

    class RPCDispatcher(object):
    
    
    
    def __init__(self, target, endpoints, serializer):
    
    
        # endpoints 是提供的 rpc 调用的方法集合。
        # 以 nova-compute 为例,endpoints 中最主要的部分就是 nova/compute/manager.py
        # 中 ComputeManager 实例。 提供的 rpc 调用方法即是 ComputeManager 实例的方法。
        self.endpoints = endpoints
    
    
        # 对于 rpc 方法返回的内容,需要使用序列化器进行编码之后返回。
        self.serializer = serializer or msg_serializer.NoOpSerializer()
    
    
    def _listen(self, transport):
        # 前面 MessageHandlingServer 处已提及, dispatcher._listen 其实就是 transport._listen。
        return transport._listen(self._target)
    
    
    def _do_dispatch(self, endpoint, method, ctxt, args):
        ... ...
        # 在这里真正调用了目的方法,把它返回结果进行序列化后返回。
        result = getattr(endpoint, method)(ctxt, **new_args)
        return self.serializer.serialize_entity(ctxt, result)
    
    
    @contextlib.contextmanager
    def __call__(self, incoming):
        # oslo.messaging 中的消息都是使用显式的确认。所以如果发出去的一个消息如果收到了确认,
        # 一定意味着对端的软件已经收到了消息。这一点区别于自动确认,详情回顾[AMQP 协议][2]。
        incoming.acknowledge()
        # 这里之所以是 yield,是由于 executor 中 spawn_with 的实现,其最终实现的结果是,将
        # yield 后面这个方法单独放到一个协程之中运行。
        yield lambda: self._dispatch_and_reply(incoming)
    
    
    def _dispatch_and_reply(self, incoming):
        try:
            # 这里进一步调用 _dispatch,将其返回的内容回复给 rpc 的调用者。
            incoming.reply(self._dispatch(incoming.ctxt,
                                        incoming.message))
        except ExpectedException as e:
            # 出现异常,将异常信息发送给 rpc 的调用者,以便其进行异常处理。
            LOG.debug(u'Expected exception during message handling (%s)',
                    e.exc_info[1])
            incoming.reply(failure=e.exc_info, log_failure=False)
            ... ...
    
    
    def _dispatch(self, ctxt, message):
    
    
        # 这个方法,从消息中辨别去其目的方法名称,以及携带的参数。
        method = message.get('method')
        args = message.get('args', {})
    
    
        for endpoint in self.endpoints:
            ... ...
            # 从 endpoints 中查找到目的方法。
            if hasattr(endpoint, method):
                ... ...
                try:
                    # 由 _do_dispatch 来负责调用找到的目的方法。
                    return self._do_dispatch(endpoint, method, ctxt, args)
                finally:
                    ... ...
            ... ...
    

  • AMQPIncomingMessage

    AMQPIncomingMessage 是对 rabbitmq 消息的一个封装,提供消息的确认与回复等方法。上文
    AMQPListener 收到一个新的消息时,就会使用消息初始化一个 AMQPIncomingMessage 实例。
    后续在 RPCDispatcher 中就直接调用 AMQPIncomingMessage.acknowledge() 和 AMQPIncomingMessage.reply()
    来确认和回复消息。

    class AMQPIncomingMessage(base.IncomingMessage):
    
    
    
    def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
        ... ...
        self.listener = listener
        self.message = message
        self.msg_id = msg_id
        self.reply_q = reply_q
        # message 是一个底层库的消息实例,提供底层库的 acknowledge 方法来确认
        # 消息。 在 rabbitmq 来说,底层库就是指 kombu。
        self.acknowledge_callback = message.acknowledge
    
    
    def _send_reply(self, conn, reply=None, failure=None,
                    ending=False, log_failure=True):
        ... ...
        if self.reply_q:
            msg['_msg_id'] = self.msg_id
            # 使用 direct_send 方法来发送回复。 reply_q 是消息发过来的时候就附带的属性。
            # 指示了,如果要回复该消息应该回复到哪个消息队列中。
            conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
        else:
            conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
    
    
    def reply(self, reply=None, failure=None, log_failure=True):
        ... ...
        # self.listener 是 AMQPListener,这里借由它来从连接池中获取一个可用连接。
        with self.listener.driver._get_connection(
                rpc_amqp.PURPOSE_SEND) as conn:
            # 每个回复由两种消息组成,第一种是回复的内容,第二种是说明消息已发送完毕。类似于警方报话时,说完内容加一句 "over"。
            self._send_reply(conn, reply, failure, log_failure=log_failure)
            self._send_reply(conn, ending=True)
    
    
    def acknowledge(self):
        ... ...
        # 通过调用底层库的确认方法来确认消息。
        self.acknowledge_callback()