内容参考自python - 操作RabbitMQ
0X00 安装环境
首先是在Linux上安装rabbitmq
1 2 3 4 5
| yum install rabbitmq-server systemctl start rabbitmq-server systemctl enable rabbitmq-server systemctl stop firewall-cmd
|
然后用pip安装Python3的开发包
安装好软件之后可以访问http://115.xx.xx.xx:15672/
来访问自带的web页面来查看和管理RabbitMQ。默认管理员的用户密码都是guest
0X01 简单的向队列中加入消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
channel = connection.channel()
channel.queue_declare(queue='test_queue')
for i in range(10): channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i)) print("sent...")
connection.close()
|
0X02 简单的从队列中获取消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
import pika
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials)) channel = connection.channel()
channel.queue_declare(queue='test_queue')
def callback(ch, method, properties, body): print(body.decode('utf-8'))
channel.basic_consume(callback, queue='test_queue', no_ack=False)
print('waiting...')
channel.start_consuming()
|
0X03 万一消费者掉线了
想象这样一种情况:
消费者从消息队列中获取了n条数据,正要处理呢结果宕机了,那该怎么办?在RabbieMQ中有一个ACK可以用来确认消费者处理结束。就有点类似网络中的ACK,消费者每次从队列中获取了数据之后队列不会立刻将数据移除,而是等待对应的ACK。消费者获取到数据并处理完成之后会向队列发送一个ACK包,通知RabbitMQ这堆消息已经处理妥当了,可以删除了,这时候RabbitMQ才会将数据从队列中移除。所以这种情况下即使消费者掉线也没有什么问题,数据依旧会在队列中存在,留给其他消费者处理。
在Python中这样实现:
消费者有这样一行代码channel.basic_consume(callback, queue='test_queue', no_ack=False)
,其中no_ack=False
表示不发送确认包。将其修改为no_ack=True
就会在每次处理完之后向RabbitMQ发送一个确认包,以确认消息处理完毕。
0X04 万一RabbitMQ宕机了呢
虽然有了ACK包,但是万一RabbitMQ挂了那数据还是会损失。所以我们可以给RabbitMQ设置一个数据持久化存储。RabbitMQ会将数据持久化存储到磁盘上,保证下次再启动的时候队列还在。
在Python中这样实现:
我们声明一个队列是这样的channel.queue_declare(queue='test_queue')
,如果需要持久化一个队列可以这样声明channel.queue_declare(queue='test_queue', durable=True)
。不过这行直接放在代码中是不能执行的,因为以前已经有了一个名为test_queue
的队列,RabbitMQ不允许用不同的方式声明同一个队列,所以可以换一个队列名新建来指定数据持久化存储。不过如果只是这样声明的话,在RabbitMQ宕机重启后确实队列还在,不过队列里的数据就没有了。除非我们这样来声明队列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))
。
0X05 最简单的发布订阅
最简单的发布订阅在RabbitMQ中称之为Fanout模式
。也就是说订阅者订阅某个频道,然后发布者向这个频道中发布消息,所有订阅者就都能接收到这条消息。不过因为发布者需要使用订阅者创建的随机队列所以需要先启动订阅者才能启动发布者。
发布者代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
channel = connection.channel()
channel.exchange_declare(exchange='my_fanout', type='fanout')
message = 'Hello Python'
channel.basic_publish(exchange='my_fanout', routing_key='', body=message) connection.close()
|
订阅者代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
import pika
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials)) channel = connection.channel()
channel.exchange_declare(exchange='my_fanout', type='fanout')
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
channel.queue_bind(exchange='my_fanout', queue=queue_name)
def callback(ch, method, properties, body): print(body.decode('utf-8'))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
|