RabbitMQ基础操作
@ sasaba | 星期二,一月 26 日,2021 年 | 3 分钟阅读 | 更新于 星期二,一月 26 日,2021 年

RabbitMQ基础操作的python实现与例子。

基础操作

hello world

client
import pika
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host="192.168.8.101",
        port=5672,
        # 设置用户
        credentials=pika.credentials.PlainCredentials("admin", "199564"),
        # 设置虚拟机 相当于独立的mq
        virtual_host="/"
    ),
)
生产者
from mq.client import connection
# 打开一个通道
channel = connection.channel()

# queue, durable=False 是否持久, exclusive=False 是否独占(临时队列), auto_delete=False 自动删除
channel.queue_declare(queue="hello", durable=True)

# exchange 交换机 默认是'', routing_key 使用默认交换机要设置为队列
channel.basic_publish(exchange='', routing_key='hello',
                      body=b'Test message.')
connection.close()
消费者
from mq.client import connection

def message_callback(ch, method, properties, body):
    print("body is {}".format(body))

# 打开一个通道
channel = connection.channel()
# queue, durable=False 是否持久, exclusive=False 是否独占(临时队列), auto_delete=False 自动删除
channel.queue_declare(queue="hello", durable=True)
# queue, auto_ack 自动回复, on_message_callback 消息方法, consumer_tag 消费者标识
# 不自动回复的的话会一直存在
channel.basic_consume(queue="hello", auto_ack=True, on_message_callback=message_callback, consumer_tag="c1")
channel.start_consuming()

可以查看队列的消息数目

工作模式

交换机类型
  • fanout/发布订阅类型
  • direct/Routing工作模式
  • topic/通配符工作模式
  • headers/headers工作模式
工作队列模式

实际上就是hello world的版本,hello world多个消费者时,会采用轮训的方式依次把消息给消费者。

发布订阅
# 生产者
from mq.client import connection

QUEUE_INFORM_EMAIL = "queue_inform_email"
QUEUE_INFORM_SMS = "queue_inform_sms"
EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"

# 打开一个通道
channel = connection.channel()

# 声明两个队列
channel.queue_declare(queue=QUEUE_INFORM_SMS, durable=True)
channel.queue_declare(queue=QUEUE_INFORM_EMAIL, durable=True)

# 声明一个交换机
# exchange 名称, exchange_type="fanout"
channel.exchange_declare(EXCHANGE_FANOUT_INFORM, "fanout", durable=True)

# 交换机队列绑定
# queue 队列, exchange 交换机, routing_key  路由key 交换机根据路由key的值把消息发到指定队列 发布订阅为''
channel.queue_bind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "")
channel.queue_bind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "")


channel.basic_publish(exchange=EXCHANGE_FANOUT_INFORM, routing_key='',
                      body=b'Test 2222')
connection.close()
# 消费者email
from mq.client import connection

QUEUE_INFORM_EMAIL = "queue_inform_email"
EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"

def message_callback(ch, method, properties, body):
    print("body is {}".format(body))

# 打开一个通道
channel = connection.channel()
# 声明两个队列
channel.queue_declare(queue=QUEUE_INFORM_EMAIL, durable=True)
# 声明一个交换机
# exchange 名称, exchange_type="fanout"
channel.exchange_declare(EXCHANGE_FANOUT_INFORM, "fanout", durable=True)
# 交换机队列绑定
# queue 队列, exchange 交换机, routing_key  路由key 交换机根据路由key的值把消息发到指定队列 发布订阅为''
channel.queue_bind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "")

channel.basic_consume(queue=QUEUE_INFORM_EMAIL, auto_ack=True, on_message_callback=message_callback, consumer_tag="c1")
channel.start_consuming()


# 消费者sms
from mq.client import connection

QUEUE_INFORM_SMS = "queue_inform_sms"
EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform"

def message_callback(ch, method, properties, body):
    print("body is {}".format(body))


# 打开一个通道
channel = connection.channel()

# 声明两个队列
channel.queue_declare(queue=QUEUE_INFORM_SMS, durable=True)

# 声明一个交换机
# exchange 名称, exchange_type="fanout"
channel.exchange_declare(EXCHANGE_FANOUT_INFORM, "fanout", durable=True)

# 交换机队列绑定
# queue 队列, exchange 交换机, routing_key  路由key 交换机根据路由key的值把消息发到指定队列 发布订阅为''
channel.queue_bind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "")

channel.basic_consume(queue=QUEUE_INFORM_SMS, auto_ack=True, on_message_callback=message_callback, consumer_tag="c1")
channel.start_consuming()

查看交换机绑定的队列

交换机

路由模式

跟发布订阅的区别主要是使用direct,并且需要指定路由。

# pub
from mq.client import connection

QUEUE_INFORM_EMAIL = "queue_inform_email"
QUEUE_INFORM_SMS = "queue_inform_sms"
EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"
ROUTING_EMAIL = "routing_email"
ROUTING_SMS = "routing_sms"

# 打开一个通道
channel = connection.channel()

# 声明两个队列
channel.queue_declare(queue=QUEUE_INFORM_SMS, durable=True)
channel.queue_declare(queue=QUEUE_INFORM_EMAIL, durable=True)

channel.exchange_declare(EXCHANGE_ROUTING_INFORM, "direct", durable=True)

# 交换机队列绑定
# queue 队列, exchange 交换机, routing_key  路由key 交换机根据路由key的值把消息发到指定队列 发布订阅为''
channel.queue_bind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, ROUTING_SMS)
channel.queue_bind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL)


channel.basic_publish(exchange=EXCHANGE_ROUTING_INFORM, routing_key=ROUTING_SMS,
                      body=b'Test 22223')
connection.close()
# sub email
from mq.client import connection


def message_callback(ch, method, properties, body):
    print("body is {}".format(body))


QUEUE_INFORM_EMAIL = "queue_inform_email"
EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"
ROUTING_EMAIL = "routing_email"

# 打开一个通道
channel = connection.channel()

# 声明两个队列
channel.queue_declare(queue=QUEUE_INFORM_EMAIL, durable=True)

channel.exchange_declare(EXCHANGE_ROUTING_INFORM, "direct", durable=True)

# 交换机队列绑定
# queue 队列, exchange 交换机, routing_key  路由key 交换机根据路由key的值把消息发到指定队列 发布订阅为''
channel.queue_bind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL)

channel.basic_consume(queue=QUEUE_INFORM_EMAIL, auto_ack=True, on_message_callback=message_callback, consumer_tag="email")
channel.start_consuming()

# sub sms
from mq.client import connection


def message_callback(ch, method, properties, body):
    print("body is {}".format(body))


QUEUE_INFORM_SMS = "queue_inform_sms"
EXCHANGE_ROUTING_INFORM = "exchange_routing_inform"
ROUTING_SMS = "routing_sms"

# 打开一个通道
channel = connection.channel()

# 声明两个队列
channel.queue_declare(queue=QUEUE_INFORM_SMS, durable=True)

channel.exchange_declare(EXCHANGE_ROUTING_INFORM, "direct", durable=True)

# 交换机队列绑定
# queue 队列, exchange 交换机, routing_key  路由key 交换机根据路由key的值把消息发到指定队列 发布订阅为''
channel.queue_bind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, ROUTING_SMS)

channel.basic_consume(queue=QUEUE_INFORM_SMS, auto_ack=True, on_message_callback=message_callback, consumer_tag="sms")
channel.start_consuming()

查看绑定的路由

通配符 Topic

使用topic,并且需要指定通配符路由。# 表示匹配一个或多个词,*只匹配一个词。

跟mqtt类似。

# pub
from mq.client import connection

QUEUE_INFORM_EMAIL = "queue_inform_email"
QUEUE_INFORM_SMS = "queue_inform_sms"
EXCHANGE_ROUTING_INFORM = "exchange_topic_inform"

ROUTING_EMAIL = "inform.#.email.#"
ROUTING_SMS = "inform.#.sms.#"

# 打开一个通道
channel = connection.channel()

# 声明两个队列
channel.queue_declare(queue=QUEUE_INFORM_SMS, durable=True)
channel.queue_declare(queue=QUEUE_INFORM_EMAIL, durable=True)

channel.exchange_declare(EXCHANGE_ROUTING_INFORM, "topic", durable=True)

# 交换机队列绑定
# queue 队列, exchange 交换机, routing_key  路由key 交换机根据路由key的值把消息发到指定队列 发布订阅为''
channel.queue_bind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, ROUTING_SMS)
channel.queue_bind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL)


channel.basic_publish(exchange=EXCHANGE_ROUTING_INFORM, routing_key="inform.sms.email",
                      body=b'Test 22223')
connection.close()
# sub mail
from mq.client import connection


def message_callback(ch, method, properties, body):
    print("body is {}".format(body))


QUEUE_INFORM_EMAIL = "queue_inform_email"
EXCHANGE_ROUTING_INFORM = "exchange_topic_inform"
ROUTING_EMAIL = "inform.#.email.#"

# 打开一个通道
channel = connection.channel()

# 声明两个队列
channel.queue_declare(queue=QUEUE_INFORM_EMAIL, durable=True)

channel.exchange_declare(EXCHANGE_ROUTING_INFORM, "topic", durable=True)

# 交换机队列绑定
# queue 队列, exchange 交换机, routing_key  路由key 交换机根据路由key的值把消息发到指定队列 发布订阅为''
channel.queue_bind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL)

channel.basic_consume(queue=QUEUE_INFORM_EMAIL, auto_ack=True, on_message_callback=message_callback, consumer_tag="email")
channel.start_consuming()


# sub sms
from mq.client import connection


def message_callback(ch, method, properties, body):
    print("body is {}".format(body))


QUEUE_INFORM_SMS = "queue_inform_sms"
EXCHANGE_ROUTING_INFORM = "exchange_topic_inform"
ROUTING_SMS = "inform.#.sms.#"

# 打开一个通道
channel = connection.channel()

# 声明两个队列
channel.queue_declare(queue=QUEUE_INFORM_SMS, durable=True)

channel.exchange_declare(EXCHANGE_ROUTING_INFORM, "topic", durable=True)

# 交换机队列绑定
# queue 队列, exchange 交换机, routing_key  路由key 交换机根据路由key的值把消息发到指定队列 发布订阅为''
channel.queue_bind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, ROUTING_SMS)

channel.basic_consume(queue=QUEUE_INFORM_SMS, auto_ack=True, on_message_callback=message_callback, consumer_tag="sms")
channel.start_consuming()