oslo.messaging 0x03: rpc 客户端
在前一篇 中已经提到,oslo.messaging 中的客户端实例 RPCClient 有两个方法
来完成消息的发送,它们分别是 call 和 cast。且这两个方法最终都是调用的
Transport 的 _send 方法。这一篇梳理一下这两个方法的内部实现细节。
-
RPCClient
回到 RPCClient 的实现。RPCClient 与 transport 之间其实还隔着一个 _CallContext
实例,但这里我们略过这一层。直接看 call 与 cast 方法的实现。
首先看一下 oslo.messaging 开发者文档所说:A cast() invocation just sends the request and returns immediately.
A call() invocation waits for the server to send a return value.
就是说, call 与 cast 在使用上的差异就是 call 是需要等待 rpc 服务端回复的,而
cast 则不需要。# oslo/messaging/rpc/client.py class RPCClient(object): def call(self, ctxt, method, **kwargs): ... ... try: # 注意看这里的 call 方法,在使用 transport._send() 的时候,指定 # 了 wait_for_reply = True。 result = self.transport._send(self.target, msg_ctxt, msg, wait_for_reply=True, timeout=timeout, retry=self.retry) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) ... ...
def cast(self, ctxt, method, **kwargs): ... ... try: # cast 方法则没有 wait_for_reply 参数。 _send() 方法的 wait_for_reply # 参数默认为 False。所以 cast 不需要等待服务端的返回消息。 self.transport._send(self.target, ctxt, msg, retry=self.retry) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) ... ...
-
RabbitDriver
无论是 RPCClient 的 call 方法还是 cast 方法,都是通过 transport._send() 方法来达成消息
的发送的。区别只是调用时传递的参数 wait_for_reply。在前一篇中已提到过 transport
的方法都是直接转换为 RabbitDriver 的方法调用的。直接来看一下 RabbitDriver 的 _send() 方
法。# RabbitDriver 继承自 amqpdriver.AMQPDriverBase,_send() 方法就是来自 AMQPDriverBase class RabbitDriver(amqpdriver.AMQPDriverBase): def _get_reply_q(self): # 前已述及,对于一个 openstack 服务,例如 nova-compute, # neutron-server 等,都有一个全局唯一的 Transport 实例, # 对应的,就有一个全局唯一的 RabbitDriver 实例和 # reply_queue。 而 kombu 是不能在多线程环境中使用的, # 所以这里要为这个 self._reply_q 加锁。 with self._reply_q_lock: if self._reply_q is not None: # 如果全局唯一的那个 reply_queue self._reply_q 已创建, # 则直接将其返回。 return self._reply_q # 如果 reply_queue 还未创建,则下面创建一下。 reply_q = 'reply_' + uuid.uuid4().hex # 从连接池中获取一个连接,以下用这个连接来创建一个 reply_queue conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) # ReplyWaiter 实例负责 reply_queue 的创建与消息的监听。 self._waiter = ReplyWaiter(reply_q, conn, self._allowed_remote_exmods) self._reply_q = reply_q self._reply_q_conn = conn return self._reply_q def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None, envelope=True, notify=False, retry=None): # 以下将关于 wait_for_reply 的地方摘出进行分析 msg = message if wait_for_reply: # 从这里可以看出,对于 call 方法,代码往消息体中 # 加入了 msg_id 与 _reply_q (reply_queue)。 # 这些信息,都是服务端作回复时所必须的信息。 msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug('MSG_ID is %s', msg_id) # _get_reply_q() 方法会获取一个 reply_queue。 # 这里将 reply_queue 的名称填写进消息体中,后续 # 服务就只要将回复发送至这个 reply_queue 即可。 msg.update({'_reply_q': self._get_reply_q()})
if wait_for_reply: # 在将 _msg_id 为 msg_id 的消息发送出去之前, # 这里先手将 reply_queue 建立,并且开始监听 # 回复给 msg_id 消息。 self._waiter.listen(msg_id) try: with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn: # 下面是具体的发送消息,不同的消息发送方式,采用不同的 # 方法。不过对于需要 wait_for_reply 的消息,显然不会是 # notify 和 fanout。 if notify: conn.notify_send(self._get_exchange(target), target.topic, msg, retry=retry) elif target.fanout: conn.fanout_send(target.topic, msg, retry=retry) else: topic = target.topic if target.server: topic = '%s.%s' % (target.topic, target.server) conn.topic_send(exchange_name=self._get_exchange(target), topic=topic, msg=msg, timeout=timeout, retry=retry) if wait_for_reply: # 这里是具体等待回复的方法。如果该方法没有返回,那么 call # 方法也不会返回的。 result = self._waiter.wait(msg_id, timeout) if isinstance(result, Exception): raise result return result finally: if wait_for_reply: # 最后,要取消对 msg_id 回复消息的监听。 # 无论前面出没出错,回复消息收没收到,这里实际上一个清理工作。 self._waiter.unlisten(msg_id)
-
ReplyWaiter
ReplyWaiter 负责监听消息的回复。它其实是一个简化版的 rpc 服务端。
如同一个 rpc 服务端一样,它也在 rabbitmq 上建立 Consumer ,并且开启一个
独立线程去监听消息。class ReplyWaiter(object):
def __init__(self, reply_q, conn, allowed_remote_exmods): self.conn = conn # self.waiters 是 监听到的消息的一个中转站。 # 每到有新消息来到,首先放置到中转站中,当 # 有 wait(msg_id) 方法调用的时候,去中转站中 # 取消息。 self.waiters = ReplyWaiters() # 这里建立 Consumer,对于收取回复消息使用 DirectConsumer # 就足够了。回调方法为 self,也就是 ReplyWaiter 自身。 # 所以,ReplyWaiter 提供了 __call__ 方法,见下文。 self.conn.declare_direct_consumer(reply_q, self) # 下面几行是在处理监听消息的协程。 # 协程运行的目标为 self.poll,就是不停的 consume 操作。 self._thread_exit_event = threading.Event() self._thread = threading.Thread(target=self.poll) self._thread.daemon = True self._thread.start() def poll(self): # self._thread_exit_event 是停止 poll 协程的标志位。 # 在 ReplyWaiter.stop() 时会设定。 while not self._thread_exit_event.is_set(): try: # limit=1 是说每次只作一个 consume 动作。 # 具体可以查看 consume 的实现,其中允许一次 consume # 调用作多次 consume 动作。 self.conn.consume(limit=1) except Exception: LOG.exception("Failed to process incoming message, " "retrying...") def __call__(self, message): # 回调方法,如同 AMQPListener 一样,收到消息先确认 message.acknowledge() incoming_msg_id = message.pop('_msg_id', None) # 取出 msg_id,将其与对应消息放到消息中转站之中。 # 由于 msg_id 与消息是对存在,便于获取某一个 msg_id # 的回复消息。 self.waiters.put(incoming_msg_id, message) def listen(self, msg_id): # listen 时相当于在消息中转站 self.waiters 中开设了一个新 # 仓库,专门放置 msg_id 的回复消息。 self.waiters.add(msg_id) def unlisten(self, msg_id): self.waiters.remove(msg_id) def wait(self, msg_id, timeout): # 设定时器 timer = rpc_common.DecayingTimer(duration=timeout) timer.start() final_reply = None # 每个回复消息由两个部分组成,一部分是消息内容,一部分是 # 标识消息发送完毕。如同警方对话机,先说内容,再报 "over" # 在这里,"over" 就是 ending。 ending = False while not ending: # 检查定时器,是否已到超时时间。 timeout = timer.check_return(self._raise_timeout_exception, msg_id) try: message = self.waiters.get(msg_id, timeout=timeout) except moves.queue.Empty: self._raise_timeout_exception(msg_id) reply, ending = self._process_reply(message) if not ending: final_reply = reply return final_reply