oslo.messaging 中的 kombu
代码结构图如下:
+----------------------------------------+
| |
nova.rpc.get_server --> messaging.server.get_rpc_server --> | messaging.server.MessageHandlingServer |
| |
| self.transport |
| | |
+---------+------------------------------+
|
v
+----------------------------------------------+ +----------------------------------------+ +--------------------------+
| | | | | |
| messaging._drivers.impl_rabbit.RabbitDriver | | messaging.transport.Transport | | messaging.rpc.RPCClient | <---- nova.rpc.get_client
| | | | | |
| self._connection_pool | <----------+-- self._driver | <---------+---- self.transport |
| | | | | | |
+------------+---------------------------------+ +----------------------------------------+ | self.call() |
| | self.cast() |
v | |
+----------------------------------------------+ +-------------------------------------------+ +--------------------------+
| | | |
| messaging._driver.amqp.ConnectionPool | | messaging._drivers.impl_rabbit.Connection |
| | | |
| self.connection_cls -----------------------+----------> | self.connection ------------------------+------------> kombu.connection.Connection()
| self.create() | | |
| self.get() | | self.declare_consumer() | | +------------------------------------------------+
| self.put() | | self.declare_direct_consumer() +--------+------------> | |
+----------------------------------------------+ | self.declare_fanout_consumer() | | | messaging._drivers.impl_rabbit.DirectConsumer |
| self.declare_topic_consumer() | | | messaging._drivers.impl_rabbit.FanoutConsumer |
| | | messaging._drivers.impl_rabbit.TopicConsumer |
| self.consumers | | |
| self.consume() | | self.exchange -----------------------------+--------> kombu.entity.Exchange()
| | | |
| | | self.queue ---------------------------------+--------> kombu.entity.Queue()
| |self.direct_send() | | |
| |self.fanout_send() | | self.consume() ----------------------------+--------> kombu.entity.Queue().consume()
| |self.topic_send() | | |
| |self.notify_send() | +------------------------------------------------+
| |self.publisher_send() |
| | |
+-----------+-------------------------------+
|
|
v
+------------------------------------------------+
| |
| messaging._drivers.impl_rabbit.DirectPublisher |
| messaging._drivers.impl_rabbit.FanoutPublisher |
| messaging._drivers.impl_rabbit.TopicPublisher |
| messaging._drivers.impl_rabbit.NotifyPublisher |
| |
| self.exchange -------------------------------+---------> kombu.entity.Exchange()
| |
| self.producer -------------------------------+---------> kombu.messaging.Producer()
| |
| self.send() ---------------------------------+---------> kombu.messaging.Producer().publish()
| |
+------------------------------------------------+
Transport
以 nova 为例,每个 nova-* 进程有一个全局的 Transport 实例,该实例是对消息库如
kombu 的封装。该全局的实例在 nova/rpc.py 中创建。
每个进程的 main() 都会有这一句:
config.parse_args(sys.argv)
parse_args 中调用了rpc.init:
def parse_args(argv, default_config_files=None):
... ...
rpc.init(CONF)
下面看一下 rpc.init 如何初始化 Transport 实例:
def init(conf):
global TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
# 调用 oslo.messaging 接口来创建 Transport 实例。
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
get_transport 根据配置加载相应的消息库 driver,配置与 driver 的对应关系
在 setup.cfg 中定义:
# oslo.messaging/setup.cfg
[entry_points]
oslo.messaging.drivers =
rabbit = oslo.messaging._drivers.impl_rabbit:RabbitDriver
kombu = oslo.messaging._drivers.impl_rabbit:RabbitDriver
# oslo/messaging/transport.py
# get_transport 使用 stevedore 来管理消息库模块
from stevedore import driver
def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
try:
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
return Transport(mgr.driver)
Transport 实例的几个方法,分别去直接调用相应消息库 driver 的方法。
# oslo/messaging/transport.py
class Transport(object):
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
return self._driver.send(target, ctxt, message,
wait_for_reply=wait_for_reply,
timeout=timeout, retry=retry)
def _listen(self, target):
if not (target.topic and target.server):
raise exceptions.InvalidTarget('A server\'s target must have '
'topic and server names specified',
target)
return self._driver.listen(target)
MessageHandlingServer
每个充当 rpcserver 角色的 nova-* 进程,都有自己的 MessageHandlingServer 实例。
创建 MessageHandlingServer 实例时将全局的 Transport 实例传递进去,
MessageHandlingServer 对与消息服务器的交互都通过 Transport 实例的方法来完成,
具体来说主要就是 Transport()._listen()。
#nova/service.py
class Service(service.Service):
... ...
def start(self):
... ...
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
# nova/rpc.py
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
# oslo/messaging/rpc/server.py
def get_rpc_server(transport, target, endpoints,
executor='blocking', serializer=None):
dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
在服务启动的时候,调用 Transport 实例的 _listen 方法。该方法会通过底层的
消息库在消息服务器上创建消息交换机与相关队列。
# MessageHandlingServer
def start(self):
try:
listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
# RPCDispatcher
def _listen(self, transport):
return transport._listen(self._target)
RPCClient
需要往消息服务器发送消息时,生成 RPCClient 的实例,然后调用实例的 call 与 cast
方法来进行消息的发送。而这两个方法,则直接调用 Transport()._send() 方法。
# oslo/messaging/rpc/client.py
class RPCClient(object):
def call(self, ctxt, method, **kwargs):
... ...
try:
result = self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
... ...
def cast(self, ctxt, method, **kwargs):
... ...
try:
self.transport._send(self.target, ctxt, msg, retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
... ...
RabbitDriver
RabbitDriver 是对 kombu 消息库封装的一部分,结合连接封装 Connection 和连接池封装
ConnectionPool 对外提供消息的控制接口。 接口交付于 Transport 实例。
# oslo/messaging/_drivers/impl_rabbit.py 和 oslo/messaging/_drivers/amqpdriver.py
class RabbitDriver(amqpdriver.AMQPDriverBase):
def __init__(self, conf, url,
default_exchange=None,
allowed_remote_exmods=None):
# RabbitDriver 实例在初始化时,创建一个连接池,连接池 ConnectionPool
# 的实现后续介绍
connection_pool = rpc_amqp.get_connection_pool(conf, url, Connection)
... ...
self._connection_pool = connection_pool
# 发送消息的接口, 上文已提到 Transport 实例的 _send 方法调用该方法。
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
return self._send(target, ctxt, message, wait_for_reply, timeout,
retry=retry)
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None,
envelope=True, notify=False, retry=None):
try:
# get_connection 从 connection_pool 中获取一个可用连接 Connection 实例。
with self._get_connection() as conn:
# 对应不同的消息发送方式,分别调用 Connection 实例的发送方法。
if notify:
conn.notify_send(self._get_exchange(target),
target.topic, msg, retry=retry)
elif target.fanout:
conn.fanout_send(target.topic, msg, retry=retry)
else:
topic = target.topic
if target.server:
topic = '%s.%s' % (target.topic, target.server)
conn.topic_send(exchange_name=self._get_exchange(target),
topic=topic, msg=msg, timeout=timeout,
retry=retry)
... ...
# 建立对消息的监听,上文已提到 Transport 实例的 _listen 方法会调用该方法。
def listen(self, target):
# 首先从连接池中取到一个可用连接。
conn = self._get_connection(pooled=False)
# 创建一个工厂类来做为后续所有 Consumer 的回调函数,负责处理收到的消息。
# AMQPListener 定义了 __call__ 方法,可以直接调用。
listener = AMQPListener(self, conn)
# 通过 Connection 实例的 declare_* 方法创建消息消费者
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
callback=listener)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic,
target.server),
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)
return listener
Connection 和 ConnectionPool
前方已提到 Oslo.messaging 实现了自己的连接池机制,而没有使用 kombu 所提供的。
Oslo.messaging 的连接池是 collections.deque() 的数据结构。
ConnectionPool 的实现如下:
# oslo/messaging/_drivers/pool.py 和 oslo/messaging/_drivers/amqp.py
class ConnectionPool(pool.Pool):
def __init__(self, max_size=4):
... ...
# 定义连接池连接数最大值,默认为 4
self._max_size = max_size
# 定前连接数
self._current_size = 0
self._cond = threading.Condition()
# 连接池数据结构,放置所有的 Connection 实例。
self._items = collections.deque()
# 从连接池中获取一个连接的接口,注意该接口同时也是往连接池中新建连接的接口
# 接口中没有明显的将连接添加进 self._items 数据结构,实际上一个连接在此方法
# 中新建,当连接使用完毕进行释放时,会由 put 接口加到 self._items 中。
def get(self):
with self._cond:
while True:
try:
# 首先尝试从连接池数据结构中获取新的可用连接。
return self._items.popleft()
except IndexError:
pass
# 如果没有可用连接,且连接数未达上限,则允许跳出循环下文调用
# create 方法新建连接。
if self._current_size < self._max_size:
self._current_size += 1
break
# 如果没有可用连接,且连接数已达上限,则只能等待连接放回连接池
# 的信号,注意有超时限值。
self._cond.wait(timeout=1)
try:
# 新建一个连接 Connection 实例
return self.create()
except Exception:
with self._cond:
# 建立失败,回滚连接池当前数量
self._current_size -= 1
raise
# 新使用完的连接放回 self._items 数据结构。
def put(self, item):
"""Return an item to the pool."""
with self._cond:
self._items.appendleft(item)
self._cond.notify()
# 新建连接实例
def create(self):
# connection_cls 对于 Rabbitmq 来说就是 messaging._drivers.impl_rabbit.Connection
return self.connection_cls(self.conf, self.url)
ConnectionPool 与 Connection 实例之间有一个类型用于衔接,
上文略过未提的 get_connection 方法,可以看到它拿到的是 ConnectionContext 实例。
# oslo/messaging/_drivers/amqpdriver.py
def _get_connection(self, pooled=True):
return rpc_amqp.ConnectionContext(self.conf,
self._url,
self._connection_pool,
pooled=pooled)
ConnectionContext 实例所起的最主要作用即是协同维护 ConntionPool 的数据结构。
class ConnectionContext(rpc_common.Connection):
def __init__(self, conf, url, connection_pool, pooled=True):
... ...
self.connection_pool = connection_pool
if pooled:
# 从连接池获取连接
self.connection = connection_pool.get()
else:
# ConnectionContext 的另一部分功能,处理不使用连接池的情况。
# 直接创建真正的连接实例。
self.connection = connection_pool.connection_cls(conf, url)
# 标识是否使用了连接池,默认为 True。
self.pooled = pooled
def _done(self):
# 还记得前文所说,连接池中的连接都是由 put 方法放回连接池结构中的吗。
# 在各种场景下,销毁一个 ConnectionContext 都会走到这里,实际将 Connection
# 实例通过 connection_pool.put() 方法将实例放回连接池。
if self.connection:
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
self.connection.reset()
self.connection_pool.put(self.connection)
else:
# 如果不使用连接池,则可以真正去关闭连接。
try:
self.connection.close()
except Exception:
pass
self.connection = None
# 以下三个方法,保证各种销毁 ConnectionContext 时,都不真得销毁,而是
# 调用 _done 方法。
def __exit__(self, exc_type, exc_value, tb):
# 对于与 with 语句搭配使用的情况时调用,释放 Connection
self._done()
def __del__(self):
# 直接清理 ConnectionContext 实例时调用,释放 Connection
self._done()
def close(self):
# 显式关闭连接时调用
self._done()
通过以上代码的分析,可以理清连接池 ConnectionPool 的维护逻辑。下面可以看一下
Connection 实例的结构如何。
下面以 publisher_send 发送消息,与 declare_consumer 创建消息消费者为例
分别分析一下 Connection 实例。 direct_send 与 declare_direct_consumer
等都是调用这两个方法来工作的。
class Connection(object):
def __init__(self, conf, url):
... ...
# 真正的 kombu Connection 是本 Connection 实例封装的对象。
self.connection = kombu.connection.Connection(
self._url, ssl=self._ssl_params, login_method=self._login_method,
failover_strategy="shuffle")
... ...
def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
**kwargs):
... ...
def _publish():
# 每次发送消息时,都会生成一个 Publisher 实例,并执行消息的发送动作
# Publisher 实例实现见下文。
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
publisher.send(msg, timeout)
# self.ensure 处理发送消息的重试与错误处理。
self.ensure(_error_callback, _publish, retry=retry)
def declare_consumer(self, consumer_cls, topic, callback):
def _declare_consumer():
# 创建 Consumer 实例,具体实例为 DirectConsumer 或 TopicConsumer 等。
# DirectConsumer 见下文。
consumer = consumer_cls(self.conf, self.channel, topic, callback,
six.next(self.consumer_num))
# 维护了 consumers 列表,包含本连接所有 Consumer
self.consumers.append(consumer)
return consumer
# self.ensure 处理发送消息的重试与错误处理。
return self.ensure(_connect_error, _declare_consumer)
class DirectPublisher(Publisher):
def __init__(self, channel, exchange_name, routing_key, **kwargs):
... ...
# 初始化时调用重连逻辑。
self.reconnect(channel)
def reconnect(self, channel):
... ...
# 将创建 Exchange 与 Producer 的逻辑直接写在重连方法中,故初次连接与故障
# 重连可以使用相同的一条逻辑。
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel,
routing_key=self.routing_key)
... ...
def send(self, msg, timeout=None):
... ...
# 调用底层的 Producer 来发送消息。
self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
class DirectConsumer(ConsumerBase):
def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
... ...
# 创建消息交换机。
self.exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
durable=options['durable'],
auto_delete=options['auto_delete'])
... ...
def reconnect(self, channel):
# 将创建 Queue 的逻辑直接写在重连方法中,故初次连接与故障
# 重连可以使用相同的一条逻辑。
self.channel = channel
self.kwargs['channel'] = channel
self.queue = kombu.entity.Queue(**self.kwargs)
try:
# 主动建立将要订阅的队列。
self.queue.declare()
except Exception as e:
... ...
def consume(self, *args, **kwargs):
... ...
# 获取一个队列的消息,注意这里省略了 kombu.Consumer,而是直接使用 Queue 的底层接口。
self.queue.consume(*args, callback=_callback, **options)
... ...