背景

kombu 是一个 python 消息库,提供 AMQP 协议的高级封装。

题外话:目前 kombu 也支持对非 AMQP 协议的封装,支持 Redis,MongoDB,ZeroMQ 等。

概念

参见 AMQP 协议,协议中的几个关键元素在 kombu 中都有对应的封装,下面把关系
先列出来:

publisher: kombu.Producer()
exchange: kombu.Exchange()
queue: kombu.Queue()
consumer: kombu.Consumer()

使用

建立连接

kombu 的封装抽象中建立一个连接即是建立一个 Connection 实例。

创建一个 Connection:

from kombu import Connection
connection = Connection('amqp://guest:guest@localhost:5672//')

Connection 的初始化参数,是消息队列中间人的 URL,针对 AMQP 类型(其它类型
本文不作讨论)的中间人,有以下几种可能的消息库作为后端:

  • pyamqp: 使用纯 python 的 amqp 消息库 amqp,安装 kombu 时已自动安装。
  • librabbitmq: C 版本的高性能 amqp 消息库,需要安装 librabbitmq Python 包。
  • amqp: 尝试使用 librabbitmq,如果没有则使用 pyamqp。
  • qpid: 使用纯 Python 消息库 qpid.messaging,安装 kombu 时已自动安装。

URL 的格式为:transport://userid:password@hostname:port/virtual_host

由此可以看出,’amqp://guest:guest@localhost:5672//’ 是代表要连接本地 localhost
的 5672 端口,使用的用户为 ‘guest’,密码为 ‘guest’,而 virtual_host 为 ‘/’。

关于用户名,密码以及 virtual_host 的概念,参见RabbitMQ 配置初步

除了通过 URL,也可通过分别指定中间人的连接信息来初始化 Connection。
这些参数为:

  • hostname:主机名或 IP
  • userid:用户名
  • password:密码
  • virtual_host:虚拟主机
  • port:端口
  • transport: pyamqp, librabbitmq, redis, qpid, memory 等。
  • ssl:是否使用 SSL 来连接到服务器。默认为 False,只有 amqp 和 qpid 支持。
  • connect_timeout:尝试连接的超时时限,以秒为单位。
  • transport_options:依 transport 类型不同而不同的一些额外参数。

Connection 的实例化并不代表着对消息中间人连接的建立,而是在需要时 Connection
才会发起对消息中间人的连接。消息中间人通常就是 RabbitMQ 服务器或者 qpid 服务器。

可以人为显式的发起连接:

connection.connect()

查看连接是否已建立:

connection.connected

关闭连接时必须调用:

connection.close()

不过更为合理的方法是使用 release 方法,该方法会判断 connection 是否属于一个
连接池,如果是则只释放连接资源,如果不是则调用 close 方法。能够更好在使用连接池
的情况下工作。

connection.release()

可以将 connection 的使用封装在 with 语句中,避免显式调用 release 或 close

with Connection('amqp://guest:guest@localhost:5672//') as connection:
    # ...

发送消息

kombu 对于消息生产方有一个封装为 Producer

通常生成一个 Producer 的步骤为:

  1. 建立连接
  2. 创建消息交换机实例,即 Exchange,如果在消息服务器上 (RabbitMQ) 上相应 exchange 不存在,则新建。
  3. 生成 Producer 实例,指定其目标交换机为 Exchange。

代码示例:

    from kombu import Connection
    from kombu import Exchange
    from kombu import Producer

    connection = Connection('amqp://guest:guest@localhost:5672//')
    exchange = Exchange('test', type='direct')
    producer = Producer(connection, exchange, routing_key='test')

    # 也可以使用 Connection.Producer 生成 Producer 实例
    producer2 = connection.Producer()

Exchange 参数简介:

  • name:消息交换机名称
  • type:消息交换机类型,direct、topic、fanout 等
  • channel:可选参数,一个 Connection 可有多个 channel,指定 Exchange 对应 channel
  • durable:是否为持久化消息交换机,在消息服务器的重启前后能够保持存在
  • auto_delete:当所有队列都不再使用该交换机时,是否自动删除
  • delivery_mode:是否持久化交换机中的消息

Producer 参数简介:

  • channel:传递 Connection 实例,或 Connection 中某一个 channel
  • exchange:默认往哪个消息交换机上发送消息,Exchange 实例
  • routing_key:发送出去消息的默认 routing_key,标识了消息将分发到哪些 queue
  • serializer:数据序列化方式,默认为 json,参考更多类型
  • compression:消息压缩方式
  • auto_declare:实例化时是否自动 declare 所对应的 Exchange,declare 操作在消息
    服务器上建立相应的消息交换机

建立了 Producer 实例之后,发送消息的 API 很简单:

msg = {'name':'test'}
producer.publish(msg)

queue = Queue('test', exchange, routing_key='test')

# 或者也可以在 publish 时指定 exchange 或 routing_key 等参数
# declare 参数用于确保目标队列的声明,这样能确保消息是有接收方的,理论上发
# 送消息时不需要确保目标队列的存在,但这是一个好的习惯做法。
producer2.publish(msg, exchange=exchange, routing_key='test', declare=[queue])

# 或者也可以手动声明队列
queue.declare()

publish 接受很多可选参数,来替代实例化 Producer 时所指定的默认值,如默认
exchange,routing_key 等。 详情参考 Producers

接收消息

kombu 对于消息消费方的一个封装为 Consumer

通常生成一个 Consumer 的步骤为:

  1. 建立连接
  2. 创建消息交换机实例,即 Exchange,如果在消息服务器上 (RabbitMQ) 上相应 exchange 不存在,则新建。
  3. 创建消息队列实例 Queue,如果消息服务器上尚没有该队列,则新建
  4. 实例化 Consumer,指定其订阅到的消息队列

代码示例:

    from kombu import Connection
    from kombu import Exchange
    from kombu import Queue
    from kombu import Consumer

    def process_msg(body, message):
        print body
        message.ack()

    connection = Connection('amqp://guest:guest@localhost:5672//')
    exchange = Exchange('test', type='direct')
    queue = Queue('test', exchange, routing_key='test')
    consumer = Consumer(queue, callbacks=[process_msg])

消息处理的逻辑在 process_msg 这个回调函数之中。

Consumer 参数简介:

  • channel:传递 Connection 实例,或 Connection 中某一个 channel
  • queues:Consumer 默认订阅到的队列
  • no_ack:消息自动确认,如果打开,消息中间人对消息自动确认,相比于让客户端确
    认,这样效率更高。带来的问题是客户端对消息失去部分控制,无法把握消息在消息
    服务器上的删除时机。
  • auto_declare:是否自动在消息服务器上建立订阅到的队列,默认为 True
  • callbacks:回调函数列表,当有新消息到达时,调用回调函数来处理
  • on_message:与 callbacks 只能二选一,也是处理消息的回调函数,只是该回调函
    数所收到的消息是未经解码的
  • on_decode_error:当消息不能解码时指定调用的回调函数

创建了 Consumer 实例之后,接收消息:

    consumer.consume()

不再使用的 Consumer 需要清除:

    consumer.cancel()

与 Connection 相同,Consumer 的使用也封装在 with 语句之中:

with Consumer(queue, callbacks=[process_msg]) as consumer:
    # ...

连接池与生产者池

kombu 提供了连接池与生产者池的封装,不过 OpenStack 模块 Oslo.messaging 中并没有使用,在这里简单介绍一下。

连接池:

from kombu import Connection
from kombu import pools
from kombu.pools import connections

# 设置池的上限,运行时,上限可加大,不过缩小
pools.set_limit(100)
# 创建一个连接
connection = Connection('amqp://guest:guest@localhost:5672//')
# 基于一个连接,创建连接池
conn_pool = connections[connection]

# 从连接池中获取一个连接,block=True 意思是阻塞直到池中有一个可分配的空
# 闲连接。如果为 False 而池中没有空闲连接,则会抛出异常:
# kombu.exceptions.ConnectionLimitExceeded
with conn_pool.acquire(block=True) as conn:
    print('Got connection: %r' % (connection.as_uri(), ))

生产者池:

from kombu import Connection
from kombu.pools import producers

connection = Connection('amqp://guest:guest@localhost:5672//')
producer_pool = producers[connection]
with producer_pool.acquire(block=True) as producer:
    producer.publish(...)

kombu 的这两个池都是全局的,要清空所有池:

kombu.pools.reset()

如果想要在各线程空间中各自维护池,需要自定义池,步骤如下:

from kombu import pools
from kombu import Connection

connections = pools.Connection(limit=100)
producers = pools.Producers(limit=connections.limit)

connection = Connection('amqp://guest:guest@localhost:5672//')

with connections[connection].acquire(block=True):
    # ...