oslo.messaging 0x03: rpc 客户端

oslo.messaging 0x03: rpc 客户端

前一篇 中已经提到,oslo.messaging 中的客户端实例 RPCClient 有两个方法
来完成消息的发送,它们分别是 call 和 cast。且这两个方法最终都是调用的
Transport 的 _send 方法。这一篇梳理一下这两个方法的内部实现细节。

  • RPCClient

    回到 RPCClient 的实现。RPCClient 与 transport 之间其实还隔着一个 _CallContext
    实例,但这里我们略过这一层。直接看 call 与 cast 方法的实现。
    首先看一下 oslo.messaging 开发者文档所说:

    A cast() invocation just sends the request and returns immediately.
    A call() invocation waits for the server to send a return value.
    就是说, call 与 cast 在使用上的差异就是 call 是需要等待 rpc 服务端回复的,而
    cast 则不需要。

    # oslo/messaging/rpc/client.py
    class RPCClient(object):
        def call(self, ctxt, method, **kwargs):
            ... ...
            try:
                # 注意看这里的 call 方法,在使用 transport._send() 的时候,指定
                # 了 wait_for_reply = True。
                result = self.transport._send(self.target, msg_ctxt, msg,
                                            wait_for_reply=True, timeout=timeout,
                                            retry=self.retry)
            except driver_base.TransportDriverError as ex:
                raise ClientSendError(self.target, ex)
            ... ...
    
    
    
    def cast(self, ctxt, method, **kwargs):
        ... ...
        try:
            # cast 方法则没有 wait_for_reply 参数。 _send() 方法的 wait_for_reply
            # 参数默认为 False。所以 cast 不需要等待服务端的返回消息。
            self.transport._send(self.target, ctxt, msg, retry=self.retry)
        except driver_base.TransportDriverError as ex:
            raise ClientSendError(self.target, ex)
        ... ...
    

  • RabbitDriver

    无论是 RPCClient 的 call 方法还是 cast 方法,都是通过 transport._send() 方法来达成消息
    的发送的。区别只是调用时传递的参数 wait_for_reply。在前一篇中已提到过 transport
    的方法都是直接转换为 RabbitDriver 的方法调用的。直接来看一下 RabbitDriver 的 _send() 方
    法。

    # RabbitDriver 继承自 amqpdriver.AMQPDriverBase,_send() 方法就是来自 AMQPDriverBase
    class RabbitDriver(amqpdriver.AMQPDriverBase):
        def _get_reply_q(self):
            # 前已述及,对于一个 openstack 服务,例如 nova-compute,
            # neutron-server 等,都有一个全局唯一的 Transport 实例,
            # 对应的,就有一个全局唯一的 RabbitDriver 实例和
            # reply_queue。 而 kombu 是不能在多线程环境中使用的,
            # 所以这里要为这个 self._reply_q 加锁。
            with self._reply_q_lock:
                if self._reply_q is not None:
                    # 如果全局唯一的那个 reply_queue self._reply_q 已创建,
                    # 则直接将其返回。
                    return self._reply_q
                # 如果 reply_queue 还未创建,则下面创建一下。
                reply_q = 'reply_' + uuid.uuid4().hex
                # 从连接池中获取一个连接,以下用这个连接来创建一个 reply_queue
                conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
                # ReplyWaiter 实例负责 reply_queue 的创建与消息的监听。
                self._waiter = ReplyWaiter(reply_q, conn,
                                            self._allowed_remote_exmods)
                self._reply_q = reply_q
                self._reply_q_conn = conn
            return self._reply_q
        def _send(self, target, ctxt, message,
                wait_for_reply=None, timeout=None,
                envelope=True, notify=False, retry=None):
            # 以下将关于 wait_for_reply 的地方摘出进行分析
            msg = message
            if wait_for_reply:
                # 从这里可以看出,对于 call 方法,代码往消息体中
                # 加入了 msg_id 与 _reply_q (reply_queue)。
                # 这些信息,都是服务端作回复时所必须的信息。
                msg_id = uuid.uuid4().hex
                msg.update({'_msg_id': msg_id})
                LOG.debug('MSG_ID is %s', msg_id)
                # _get_reply_q() 方法会获取一个 reply_queue。
                # 这里将 reply_queue 的名称填写进消息体中,后续
                # 服务就只要将回复发送至这个 reply_queue 即可。
                msg.update({'_reply_q': self._get_reply_q()})
    
    
    
        if wait_for_reply:
            # 在将 _msg_id 为 msg_id 的消息发送出去之前,
            # 这里先手将 reply_queue 建立,并且开始监听
            # 回复给 msg_id 消息。
            self._waiter.listen(msg_id)
    
    
        try:
            with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
                # 下面是具体的发送消息,不同的消息发送方式,采用不同的
                # 方法。不过对于需要 wait_for_reply 的消息,显然不会是
                # notify 和 fanout。
                if notify:
                    conn.notify_send(self._get_exchange(target),
                                    target.topic, msg, retry=retry)
                elif target.fanout:
                    conn.fanout_send(target.topic, msg, retry=retry)
                else:
                    topic = target.topic
                    if target.server:
                        topic = '%s.%s' % (target.topic, target.server)
                    conn.topic_send(exchange_name=self._get_exchange(target),
                                    topic=topic, msg=msg, timeout=timeout,
                                    retry=retry)
    
    
            if wait_for_reply:
                # 这里是具体等待回复的方法。如果该方法没有返回,那么 call
                # 方法也不会返回的。
                result = self._waiter.wait(msg_id, timeout)
                if isinstance(result, Exception):
                    raise result
                return result
        finally:
            if wait_for_reply:
                # 最后,要取消对 msg_id 回复消息的监听。
                # 无论前面出没出错,回复消息收没收到,这里实际上一个清理工作。
                self._waiter.unlisten(msg_id)
    

  • ReplyWaiter

    ReplyWaiter 负责监听消息的回复。它其实是一个简化版的 rpc 服务端。
    如同一个 rpc 服务端一样,它也在 rabbitmq 上建立 Consumer ,并且开启一个
    独立线程去监听消息。

    class ReplyWaiter(object):
    
    
    
    def __init__(self, reply_q, conn, allowed_remote_exmods):
        self.conn = conn
        # self.waiters 是 监听到的消息的一个中转站。
        # 每到有新消息来到,首先放置到中转站中,当
        # 有 wait(msg_id) 方法调用的时候,去中转站中
        # 取消息。
        self.waiters = ReplyWaiters()
        # 这里建立 Consumer,对于收取回复消息使用 DirectConsumer
        # 就足够了。回调方法为 self,也就是 ReplyWaiter 自身。
        # 所以,ReplyWaiter 提供了 __call__ 方法,见下文。
        self.conn.declare_direct_consumer(reply_q, self)
        # 下面几行是在处理监听消息的协程。
        # 协程运行的目标为 self.poll,就是不停的 consume 操作。
        self._thread_exit_event = threading.Event()
        self._thread = threading.Thread(target=self.poll)
        self._thread.daemon = True
        self._thread.start()
    
    
    def poll(self):
        # self._thread_exit_event 是停止 poll 协程的标志位。
        # 在 ReplyWaiter.stop() 时会设定。
        while not self._thread_exit_event.is_set():
            try:
                # limit=1 是说每次只作一个 consume 动作。
                # 具体可以查看 consume 的实现,其中允许一次 consume
                # 调用作多次 consume 动作。
                self.conn.consume(limit=1)
            except Exception:
                LOG.exception("Failed to process incoming message, "
                            "retrying...")
    
    
    def __call__(self, message):
        # 回调方法,如同 AMQPListener 一样,收到消息先确认
        message.acknowledge()
        incoming_msg_id = message.pop('_msg_id', None)
        # 取出 msg_id,将其与对应消息放到消息中转站之中。
        # 由于 msg_id 与消息是对存在,便于获取某一个 msg_id
        # 的回复消息。
        self.waiters.put(incoming_msg_id, message)
    
    
    def listen(self, msg_id):
        # listen 时相当于在消息中转站 self.waiters 中开设了一个新
        # 仓库,专门放置 msg_id 的回复消息。
        self.waiters.add(msg_id)
    
    
    def unlisten(self, msg_id):
        self.waiters.remove(msg_id)
    
    
    def wait(self, msg_id, timeout):
        # 设定时器
        timer = rpc_common.DecayingTimer(duration=timeout)
        timer.start()
        final_reply = None
        # 每个回复消息由两个部分组成,一部分是消息内容,一部分是
        # 标识消息发送完毕。如同警方对话机,先说内容,再报 "over"
        # 在这里,"over" 就是 ending。
        ending = False
        while not ending:
            # 检查定时器,是否已到超时时间。
            timeout = timer.check_return(self._raise_timeout_exception, msg_id)
            try:
                message = self.waiters.get(msg_id, timeout=timeout)
            except moves.queue.Empty:
                self._raise_timeout_exception(msg_id)
    
    
            reply, ending = self._process_reply(message)
            if not ending:
                final_reply = reply
        return final_reply