AMQP 协议消息库 kombu
背景
kombu 是一个 python 消息库,提供 AMQP 协议的高级封装。
题外话:目前 kombu 也支持对非 AMQP 协议的封装,支持 Redis,MongoDB,ZeroMQ 等。
概念
参见 AMQP 协议,协议中的几个关键元素在 kombu 中都有对应的封装,下面把关系
先列出来:
publisher: kombu.Producer()
exchange: kombu.Exchange()
queue: kombu.Queue()
consumer: kombu.Consumer()
使用
建立连接
kombu 的封装抽象中建立一个连接即是建立一个 Connection 实例。
创建一个 Connection:
from kombu import Connection
connection = Connection('amqp://guest:guest@localhost:5672//')
Connection 的初始化参数,是消息队列中间人的 URL,针对 AMQP 类型(其它类型
本文不作讨论)的中间人,有以下几种可能的消息库作为后端:
- pyamqp: 使用纯 python 的 amqp 消息库 amqp,安装 kombu 时已自动安装。
- librabbitmq: C 版本的高性能 amqp 消息库,需要安装 librabbitmq Python 包。
- amqp: 尝试使用 librabbitmq,如果没有则使用 pyamqp。
- qpid: 使用纯 Python 消息库 qpid.messaging,安装 kombu 时已自动安装。
URL 的格式为:transport://userid:password@hostname:port/virtual_host
由此可以看出,’amqp://guest:guest@localhost:5672//’ 是代表要连接本地 localhost
的 5672 端口,使用的用户为 ‘guest’,密码为 ‘guest’,而 virtual_host 为 ‘/’。
关于用户名,密码以及 virtual_host 的概念,参见RabbitMQ 配置初步
除了通过 URL,也可通过分别指定中间人的连接信息来初始化 Connection。
这些参数为:
- hostname:主机名或 IP
- userid:用户名
- password:密码
- virtual_host:虚拟主机
- port:端口
- transport: pyamqp, librabbitmq, redis, qpid, memory 等。
- ssl:是否使用 SSL 来连接到服务器。默认为 False,只有 amqp 和 qpid 支持。
- connect_timeout:尝试连接的超时时限,以秒为单位。
- transport_options:依 transport 类型不同而不同的一些额外参数。
Connection 的实例化并不代表着对消息中间人连接的建立,而是在需要时 Connection
才会发起对消息中间人的连接。消息中间人通常就是 RabbitMQ 服务器或者 qpid 服务器。
可以人为显式的发起连接:
connection.connect()
查看连接是否已建立:
connection.connected
关闭连接时必须调用:
connection.close()
不过更为合理的方法是使用 release 方法,该方法会判断 connection 是否属于一个
连接池,如果是则只释放连接资源,如果不是则调用 close 方法。能够更好在使用连接池
的情况下工作。
connection.release()
可以将 connection 的使用封装在 with 语句中,避免显式调用 release 或 close
with Connection('amqp://guest:guest@localhost:5672//') as connection:
# ...
发送消息
kombu 对于消息生产方有一个封装为 Producer
通常生成一个 Producer 的步骤为:
- 建立连接
- 创建消息交换机实例,即 Exchange,如果在消息服务器上 (RabbitMQ) 上相应 exchange 不存在,则新建。
- 生成 Producer 实例,指定其目标交换机为 Exchange。
代码示例:
from kombu import Connection
from kombu import Exchange
from kombu import Producer
connection = Connection('amqp://guest:guest@localhost:5672//')
exchange = Exchange('test', type='direct')
producer = Producer(connection, exchange, routing_key='test')
# 也可以使用 Connection.Producer 生成 Producer 实例
producer2 = connection.Producer()
Exchange 参数简介:
- name:消息交换机名称
- type:消息交换机类型,direct、topic、fanout 等
- channel:可选参数,一个 Connection 可有多个 channel,指定 Exchange 对应 channel
- durable:是否为持久化消息交换机,在消息服务器的重启前后能够保持存在
- auto_delete:当所有队列都不再使用该交换机时,是否自动删除
- delivery_mode:是否持久化交换机中的消息
Producer 参数简介:
- channel:传递 Connection 实例,或 Connection 中某一个 channel
- exchange:默认往哪个消息交换机上发送消息,Exchange 实例
- routing_key:发送出去消息的默认 routing_key,标识了消息将分发到哪些 queue
- serializer:数据序列化方式,默认为 json,参考更多类型
- compression:消息压缩方式
- auto_declare:实例化时是否自动 declare 所对应的 Exchange,declare 操作在消息
服务器上建立相应的消息交换机
建立了 Producer 实例之后,发送消息的 API 很简单:
msg = {'name':'test'}
producer.publish(msg)
queue = Queue('test', exchange, routing_key='test')
# 或者也可以在 publish 时指定 exchange 或 routing_key 等参数
# declare 参数用于确保目标队列的声明,这样能确保消息是有接收方的,理论上发
# 送消息时不需要确保目标队列的存在,但这是一个好的习惯做法。
producer2.publish(msg, exchange=exchange, routing_key='test', declare=[queue])
# 或者也可以手动声明队列
queue.declare()
publish 接受很多可选参数,来替代实例化 Producer 时所指定的默认值,如默认
exchange,routing_key 等。 详情参考 Producers
接收消息
kombu 对于消息消费方的一个封装为 Consumer
通常生成一个 Consumer 的步骤为:
- 建立连接
- 创建消息交换机实例,即 Exchange,如果在消息服务器上 (RabbitMQ) 上相应 exchange 不存在,则新建。
- 创建消息队列实例 Queue,如果消息服务器上尚没有该队列,则新建
- 实例化 Consumer,指定其订阅到的消息队列
代码示例:
from kombu import Connection
from kombu import Exchange
from kombu import Queue
from kombu import Consumer
def process_msg(body, message):
print body
message.ack()
connection = Connection('amqp://guest:guest@localhost:5672//')
exchange = Exchange('test', type='direct')
queue = Queue('test', exchange, routing_key='test')
consumer = Consumer(queue, callbacks=[process_msg])
消息处理的逻辑在 process_msg 这个回调函数之中。
Consumer 参数简介:
- channel:传递 Connection 实例,或 Connection 中某一个 channel
- queues:Consumer 默认订阅到的队列
- no_ack:消息自动确认,如果打开,消息中间人对消息自动确认,相比于让客户端确
认,这样效率更高。带来的问题是客户端对消息失去部分控制,无法把握消息在消息
服务器上的删除时机。 - auto_declare:是否自动在消息服务器上建立订阅到的队列,默认为 True
- callbacks:回调函数列表,当有新消息到达时,调用回调函数来处理
- on_message:与 callbacks 只能二选一,也是处理消息的回调函数,只是该回调函
数所收到的消息是未经解码的 - on_decode_error:当消息不能解码时指定调用的回调函数
创建了 Consumer 实例之后,接收消息:
consumer.consume()
不再使用的 Consumer 需要清除:
consumer.cancel()
与 Connection 相同,Consumer 的使用也封装在 with 语句之中:
with Consumer(queue, callbacks=[process_msg]) as consumer:
# ...
连接池与生产者池
kombu 提供了连接池与生产者池的封装,不过 OpenStack 模块 Oslo.messaging 中并没有使用,在这里简单介绍一下。
连接池:
from kombu import Connection
from kombu import pools
from kombu.pools import connections
# 设置池的上限,运行时,上限可加大,不过缩小
pools.set_limit(100)
# 创建一个连接
connection = Connection('amqp://guest:guest@localhost:5672//')
# 基于一个连接,创建连接池
conn_pool = connections[connection]
# 从连接池中获取一个连接,block=True 意思是阻塞直到池中有一个可分配的空
# 闲连接。如果为 False 而池中没有空闲连接,则会抛出异常:
# kombu.exceptions.ConnectionLimitExceeded
with conn_pool.acquire(block=True) as conn:
print('Got connection: %r' % (connection.as_uri(), ))
生产者池:
from kombu import Connection
from kombu.pools import producers
connection = Connection('amqp://guest:guest@localhost:5672//')
producer_pool = producers[connection]
with producer_pool.acquire(block=True) as producer:
producer.publish(...)
kombu 的这两个池都是全局的,要清空所有池:
kombu.pools.reset()
如果想要在各线程空间中各自维护池,需要自定义池,步骤如下:
from kombu import pools
from kombu import Connection
connections = pools.Connection(limit=100)
producers = pools.Producers(limit=connections.limit)
connection = Connection('amqp://guest:guest@localhost:5672//')
with connections[connection].acquire(block=True):
# ...