### 软硬件环境 * ubuntu 18.04 64bit * anaconda3 with python 3.6.4 * RabbitMQ * pika 0.12.0 ### AMQP是什么 `AMQP`(`Advanced Message Queuing Protocol`),顾名思义,它是一个消息协议,能够使得遵循该协议的客户端和消息中间件(`Broker`)进行通讯。 下图是官方给出的模型示意图,中间框内的就是`Broker` ![rabbitmq_hello](https://code.xugaoxiang.com/xugaoxiang/blog/raw/master/images/python/rabbitmq_hello-world-example-routing.png) 消息发布给`Exchange`,`Exchange`相当于邮局或者信箱,它接收到消息后会根据不同的规则(称为`Bindings`)把消息发送出去。 `Exchange`总共有`4`种类型,分别是 * `Direct Exchange` 将消息中的`Routing key`与该`exchange`关联的所有`Binding`中的`Routing key`进行比较,如果相等,则发送到该`Binding`对应的`Queue`中。 * `Topic Exchange` 将消息中的`Routing key`与该`Exchange`关联的所有`Binding`中的`Routing key`进行对比,如果匹配上了,则发送到该`Binding`对应的`Queue`中。 * `Fanout Exchange` 直接将消息转发到所有`Binding`的对应`Queue`中,这种`Exchange`在路由转发的时候,忽略`Routing key`。 * `Headers Exchange` 将消息中的`headers`与该`Exchange`相关联的所有`Binging`中的参数进行匹配,如果匹配上了,则发送到该`Binding`对应的`Queue`中 ### RabbitMQ简介 `RabbitMQ`是当下最流行的开源`Message Broker`,`Broker`中文是经纪、掮客的意思,它非常轻量且容易部署,不管是在本地还是在云端,同时支持多种消息协议,而且支持多种编程语言。`RabbitMQ`是针对`AMQP`的一种实现。 * `Producing`意为发送,发送的程序就是`producer`生产者 * `Queue`即队列,相当于`RabbitMQ`中的邮箱名称,`producer`发送的消息就是存储在这里,队列受制于主机的内存大小和磁盘空间,本质上队列就是缓存。多个`producer`可以发送数据到同一个队列,而`consumer`可以从同一个队列中接收数据 * `Consuming`意为接收,接收的程序就是`consumer`消费者 ### RabbitMQ的消息处理流程 ![rabbitmq_hello](https://code.xugaoxiang.com/xugaoxiang/blog/raw/master/images/python/rabbitmq_arch.png) * `producer`发送消息到`Exchange` * `Exchange`接收消息并且准备将消息根据`Routing_key`路由到`Queue` * 绑定`Queue`和`Exchange`根据`binding_key`(在上图中绑定了两个`Queue`到这个`Exchange`),`Exchange`将消息路由到`Queue` * 消息在被`consumer`接收处理之前,一直在`Queue`中 * `consumer`接收处理消息 ### 安装RabbitMQ 通过`apt`安装 ```bash sudo apt install rabbitmq-server ``` 接下来安装`python`语言的支持包`pika`,`pika`同时支持`python2`和`python3` ```bash pip install pika ``` 开启`RabbitMQ`的`web`管理界面 ```bash sudo rabbitmq-plugins enable rabbitmq_management ``` 之后可以登录浏览器打开`http://localhost:15672`,查看`API`的话打开`http://localhost:15672/api` ### RabbitMQ中的Hello world 消息发送端代码`producer.py` ```python # -*- coding: utf-8 -*- # @Time : 18-12-21 下午11:12 # @Author : xugaoxiang # @Email : djstava@gmail.com # @Website : http://www.xugaoxiang.com # @File : producer.py # @Software: PyCharm import pika import sys message = sys.argv[1] # 可以连接远程,localhost换成ip地址或者域名 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 发送消息之前需要确认接收队列是否存在? 如果不存在,RabbitMQ就会丢弃。下面语句是我们创建`hello`队列 channel.queue_declare(queue='hello') # 最后是发送 channel.basic_publish(exchange='', routing_key='hello', body=message) print('Sent message: {}'.format(message)) connection.close() ``` 消息接收端代码`consumer.py` ```python # -*- coding: utf-8 -*- # @Time : 18-12-21 下午11:18 # @Author : xugaoxiang # @Email : djstava@gmail.com # @Website : http://www.xugaoxiang.com # @File : consumer.py # @Software: PyCharm import pika def callback(ch, method, properties, body): print('Receive: {}'.format(body)) # 建立连接跟发送端的一样 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') # 注册回调函数callback, no_ack表示不支持消息反馈,在多任务的情况下,这个很有用,`producer`能够告诉RabbitMQ消息正被接收和处理。如果处理该消息的进程挂了,可以通过这个机制 channel.basic_consume(consumer_callback=callback, queue='hello', no_ack=True) print('Waiting for messages. To exit press CTRL+C') # 进入事件循环 channel.start_consuming() ``` 执行顺序是: `consumer.py` -> `producer.py` ![rabbitmq_code](https://code.xugaoxiang.com/xugaoxiang/blog/raw/master/images/python/rabbitmq_hello-world.png) 在上面的示例中,我们使用的是`BlockingConnection`适配器,除此以外,`pika`还提供了其它几个适配器,根据需要选择使用 * BlockingConnection 最简单的一种,同步、阻塞 * AsyncioConnection 适用于`python3`异步`IO`事件循环 * SelectConnection 同样是异步,但不依赖与第三方库 * TornadoConnection 依赖于`Tornado` * TwistedProtocolConnection 适用于`Twisted`异步包 在安装`RabbitMQ`中,额外会生成命令行工具`rabbitmqctl`和`rabbitmqadmin`,这两个命令非常有用,在代码调试中也可以通过它们来查看当前系统状态,具体的参数可以通过`--help`来查看。 ### 参考资料 * <https://www.rabbitmq.com/> * <https://www.rabbitmq.com/tutorials/tutorial-one-python.html> * <https://github.com/pika/pika> * <https://www.rabbitmq.com/tutorials/tutorial-three-python.html>


Comments

comments powered by Disqus