接上篇的 MessageHandlingServer 继续, 这一篇专注于梳理 oslo.messaging 中一个 rpc server 对消息的处理流程。
openstack 各服务 nova-compute, neutron-server, cinder-volume 等等每一个都对外提供了 rpc 服务,接受其它组件的 rpc 调用。例如,nova-api 接到一个创建虚拟机的请求,就是通过调用 nova-compute 的一个创建虚拟机的 rpc 接口来进行虚拟机创建的。
+------------------------+----------------+
| messaging.server.MessageHandlingServer | +------------------------------------------------+
| | +--------------------------------------------------------+ | messaging/_drivers/amqpdriver.AMQPListener |
| self._executor -----------+-----> | messaging._executors.impl_eventlet.EventletExecutor | | |
| | | | | self.conn |
| self.transport | | self.listener --------------------+---> | |
| | | | | |
| self.start() -----------+--------+-> self.start() -------------------+-------+--> self.poll() |
+-----------------------------------------+ | self._thread | | |
| self._greenpool | | self.__call__() |
| | | |
| self.dispatcher | | self.incoming |
+------------+-------------------------------------------+ +-----------+------------------------------------+
| |
V V
+-------------------------------------------+ +------------------------------------------------------+
| messaging.rpc.dispatcher.RPCDispatcher | | messaging/_drivers/amqpdriver.AMQPIncomingMessage |
| | | |
| self.endpoints | | |
| | | |
| self.__call__() ------------------+--------------------+---> self.acknowledge() |
| | | |
| self._dispatch_and_reply() ----------+--------------------+---> self.reply() |
| self._dispatch() | | |
| self._do_dispatch() | | |
+-------------------------------------------+ +------------------------------------------------------+
-
MessageHandlingServer
每个像 nova-compute 这样对外提供 rpc 接口的服务,都需要创建一个 MessageHandlingServer 对象。这个 MessageHandlingServer 承载着本服务全局唯一的一个
TRANSPORT 实例,一个 TRANSPORT 实例下又对应一个全局唯一的 amqpdriver, amqpdriver 中就是上一篇所说的 rabbitmq 连接池。作为服务的 MessageHandlingServer 从连接池中取出一个连接,在此连接中不停的进行 consume 操作,以不断地拿到外界对自己的请求消息。当启动一个 MessageHandlingServer 之后,就启动了一个 executor
# messaging.server.py def start(self):
if self._executor is not None: # 如果已有一个 executor,说明 start 被重复调用了,此时什么也不做。 return try: # 这个调用绕来绕去,实际上等同于 listener = self.transport.listen() # 在上一篇中,我们知道 transport.listen() 就是从连接池中取出一个连接,创建消息消费者,并新建一个 # AMQPListener 来作为所有消费者的回调。 listener = self.dispatcher._listen(self.transport) except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) # 创建进程全局唯一的一个EventletExecutor,理论上 executor 可以有多种,目前 oslo.messaging # 实现的 executor 只有两类,一个基于 eventlet 的, 一个是单线程阻塞式的,后者只是作为示例。 # 两个实现分别在 messaging/_executors/impl_eventlet.py 和 messaging/_executors/impl_blocking.py self._executor = self._executor_cls(self.conf, listener, self.dispatcher) self._executor.start()
-
EventletExecutor
executor 启动时,会启动一个 executor 协程来不断地对 listener 进行 poll 操作,并创建一个协程池。每次 poll
操作获取到消息,都从连接池中拿一个可用协程,并将消息传递给 dispatcher,由 dispatcher 来分发消息。# messaging/_executors/impl_eventlet.py def start(self): if self._thread is not None: # self._thread 变量承载的是 executor 协程,如果该协程已在,说明 start 方法重复调用了,此时什么也不做。 return
# 这个 forever_retry_uncaught_exceptions 旨在捕获 executor # 协程中的所有异常,在任何情况下都维持协程的持续运行。 # 这样说有点高大上,其实就是捕获所有异常,然后打印日志并重新运行协程。 @excutils.forever_retry_uncaught_exceptions def _executor_thread(): # 这是 executor 协程的工作内容 try: while self._running: # 从 listener 中 poll 到一个新消息放在 incoming 之中。 incoming = self.listener.poll() if incoming is not None: # 将这个消息传递给 dispatcher,spawn_with(xxx, pool=self._greenpool) 是指使用协程池 # self._greenpool 中一个协程来执行 xxx。 spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool) except greenlet.GreenletExit: return self._running = True # 启动 executor 协程 self._thread = eventlet.spawn(_executor_thread)
-
AMQPListener
AMQPListener 作为一个回调类,需要提供 __call__ 方法,当有一个新消息来到的时候,
就会把消息作为参数传递给 __call__ 方法。class AMQPListener(base.Listener):
def __init__(self, driver, conn): ... ... # self.conn 为 messaging._drivers.impl_rabbit.Connection 类型。 self.conn = conn # 收到的消息的列表 self.incoming = [] ... ... def __call__(self, message): ... ... # 每收到一个新的消息,都把基于消息生成 AMQPIncomingMessage 对象,并放置到消息列表中。 self.incoming.append(AMQPIncomingMessage(self, ctxt.to_dict(), message, unique_id, ctxt.msg_id, ctxt.reply_q)) def poll(self, timeout=None): while not self._stopped.is_set(): if self.incoming: # 如果消息列表仍有消息未处理,则直接取出一个消息返回。 return self.incoming.pop(0) # 消息列表中没有未处理消息,此时就对 Connection 进行 consume 操作,等待新消息的到来。 try: self.conn.consume(limit=1, timeout=timeout) except rpc_common.Timeout: # 如果长时间没有消息到来,则返回 None,避免一直占用协程 CPU 时间。 # 这样 executor 会再发起下一次 poll。 return None
-
RPCDispatcher
RPCDispatcher 否则消息的分发,在 executor 的 start 方法中, 我们看到代码以 self.dispatch(incoming) 的
方式调用。所以 RPCDispatcher 也需要提供一个 __call__ 方法。之后 dispatcher 所作的事情,概括来说就是从
incoming 中辨别出其目的 rpc 方法名,以及各 rpc 参数。从 endpoints 中取出相应的方法来调用,之后将返回值
编码发回 rpc 方法调用者。class RPCDispatcher(object):
def __init__(self, target, endpoints, serializer): # endpoints 是提供的 rpc 调用的方法集合。 # 以 nova-compute 为例,endpoints 中最主要的部分就是 nova/compute/manager.py # 中 ComputeManager 实例。 提供的 rpc 调用方法即是 ComputeManager 实例的方法。 self.endpoints = endpoints # 对于 rpc 方法返回的内容,需要使用序列化器进行编码之后返回。 self.serializer = serializer or msg_serializer.NoOpSerializer() def _listen(self, transport): # 前面 MessageHandlingServer 处已提及, dispatcher._listen 其实就是 transport._listen。 return transport._listen(self._target) def _do_dispatch(self, endpoint, method, ctxt, args): ... ... # 在这里真正调用了目的方法,把它返回结果进行序列化后返回。 result = getattr(endpoint, method)(ctxt, **new_args) return self.serializer.serialize_entity(ctxt, result) @contextlib.contextmanager def __call__(self, incoming): # oslo.messaging 中的消息都是使用显式的确认。所以如果发出去的一个消息如果收到了确认, # 一定意味着对端的软件已经收到了消息。这一点区别于自动确认,详情回顾[AMQP 协议][2]。 incoming.acknowledge() # 这里之所以是 yield,是由于 executor 中 spawn_with 的实现,其最终实现的结果是,将 # yield 后面这个方法单独放到一个协程之中运行。 yield lambda: self._dispatch_and_reply(incoming) def _dispatch_and_reply(self, incoming): try: # 这里进一步调用 _dispatch,将其返回的内容回复给 rpc 的调用者。 incoming.reply(self._dispatch(incoming.ctxt, incoming.message)) except ExpectedException as e: # 出现异常,将异常信息发送给 rpc 的调用者,以便其进行异常处理。 LOG.debug(u'Expected exception during message handling (%s)', e.exc_info[1]) incoming.reply(failure=e.exc_info, log_failure=False) ... ... def _dispatch(self, ctxt, message): # 这个方法,从消息中辨别去其目的方法名称,以及携带的参数。 method = message.get('method') args = message.get('args', {}) for endpoint in self.endpoints: ... ... # 从 endpoints 中查找到目的方法。 if hasattr(endpoint, method): ... ... try: # 由 _do_dispatch 来负责调用找到的目的方法。 return self._do_dispatch(endpoint, method, ctxt, args) finally: ... ... ... ...
-
AMQPIncomingMessage
AMQPIncomingMessage 是对 rabbitmq 消息的一个封装,提供消息的确认与回复等方法。上文
AMQPListener 收到一个新的消息时,就会使用消息初始化一个 AMQPIncomingMessage 实例。
后续在 RPCDispatcher 中就直接调用 AMQPIncomingMessage.acknowledge() 和 AMQPIncomingMessage.reply()
来确认和回复消息。class AMQPIncomingMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q): ... ... self.listener = listener self.message = message self.msg_id = msg_id self.reply_q = reply_q # message 是一个底层库的消息实例,提供底层库的 acknowledge 方法来确认 # 消息。 在 rabbitmq 来说,底层库就是指 kombu。 self.acknowledge_callback = message.acknowledge def _send_reply(self, conn, reply=None, failure=None, ending=False, log_failure=True): ... ... if self.reply_q: msg['_msg_id'] = self.msg_id # 使用 direct_send 方法来发送回复。 reply_q 是消息发过来的时候就附带的属性。 # 指示了,如果要回复该消息应该回复到哪个消息队列中。 conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) else: conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) def reply(self, reply=None, failure=None, log_failure=True): ... ... # self.listener 是 AMQPListener,这里借由它来从连接池中获取一个可用连接。 with self.listener.driver._get_connection( rpc_amqp.PURPOSE_SEND) as conn: # 每个回复由两种消息组成,第一种是回复的内容,第二种是说明消息已发送完毕。类似于警方报话时,说完内容加一句 "over"。 self._send_reply(conn, reply, failure, log_failure=log_failure) self._send_reply(conn, ending=True) def acknowledge(self): ... ... # 通过调用底层库的确认方法来确认消息。 self.acknowledge_callback()