Consumers

Basics

The Consumer takes a connection (or channel) and a list of queues to consume from. Several consumers can be mixed to consume from different channels, as they all bind to the same connection, and drain_events will drain events from all channels on that connection.

Note

Kombu since 3.0 will only accept json/binary or text messages by default, to allow deserialization of other formats you have to specify them in the accept argument:

Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])

Draining events from a single consumer:

with Consumer(connection, queues, accept=['json']):
    connection.drain_events(timeout=1)

Draining events from several consumers:

from kombu.utils import nested

with connection.channel(), connection.channel() as (channel1, channel2):
    with nested(Consumer(channel1, queues1, accept=['json']),
                Consumer(channel2, queues2, accept=['json'])):
        connection.drain_events(timeout=1)

Or using ConsumerMixin:

from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queues, callbacks=[self.on_message], accept=['json']),
        ]

    def on_message(self, body, message):
        print("RECEIVED MESSAGE: %r" % (body, ))
        message.ack()

C(connection).run()

and with multiple channels again:

from kombu import Consumer
from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):
    channel2 = None

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, _, default_channel):
        self.channel2 = default_channel.connection.channel()
        return [Consumer(default_channel, queues1,
                         callbacks=[self.on_message],
                         accept=['json']),
                Consumer(self.channel2, queues2,
                         callbacks=[self.on_special_message],
                         accept=['json'])]

    def on_consumer_end(self, connection, default_channel):
        if self.channel2:
            self.channel2.close()

C(connection).run()

Reference

Table Of Contents

Previous topic

Producers

Next topic

Examples

This Page