一、消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count=1)
注意了,这种公平指的是你消费者有多大本事,就干多少活,你消费者处理的越慢,我就分发的少,你消费者处理的越多,处理的快,我就多发点消息。我server端给客户端发消息的时候,先检查一下,你现在还有多少消息,你如果处理的消息超过1条,我就不给你发了,就是你当前消息没有处理完毕,我就不给你发消息了,没有消息,我就给你发。
二、fanout广播模式
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,先来说说exchange的官方说明:
An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
- fanout:所有bind到此exchange的queue都可以接收消息(纯广播的,所有消费者都能收到消息)
- direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
- topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
- headers:通过headers 来决定把消息发给哪些queue(这个很少用,一般情况下,我们用不到)
2.1、fanout广播模式
说明:fanout这种模式是所有绑定exchange的queue都可以接收到消息。exchange=>转换器
1、生产者(fanout_publiser)
说明:跟之前写的不同,生产者这边并没有声明queue,因为生产者是以广播的形式,所以这边不需要声明queue
import pika#创建socket连接connection = pika.BlockingConnection(pika.ConnectionParameters (host='localhost'))#创建管道channel = connection.channel()#声明exchange,且exchange的名字是logs,exchange的类型为fanoutchannel.exchange_declare(exchange='logs',exchange_type="fanout")#发送的消息message = "info:hello world"#生产一个消息channel.basic_publish( exchange="logs", routing_key='', body=message)print("[X] Send {0}".format(message))#关闭连接connection.close()
注:这边的exchange的名字logs是随便起的
2、消费者(fanout_consumer)
说明:消费者这边要声明一个唯一的queue_name的对象,并且从对象中获取queue名
import pika#创建一个socketconnection = pika.BlockingConnection(pika.ConnectionParameters( host="localhost"))#创建一个管道channel = connection.channel()#声明exchange,exchange的名字logs,类型是fanout广播模式channel.exchange_declare(exchange="logs", exchange_type="fanout")#不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除,result是queue的对象result = channel.queue_declare(exclusive=True) #exclusive=>排他的,唯一的#获取queue名queue_name = result.method.queue#绑定exchangechannel.queue_bind(exchange="logs", queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')#声明回调函数def callback(ch,method,properties,body): "回调函数" print("[X] {0}".format(body))#消费者消费channel.basic_consume(callback, queue=queue_name, no_ack=True)#启动消费模式channel.start_consuming()
3、代码逻辑图
①服务端没有声明queue,为什么客户端要声明一个queue?
答:生产者发消息到exchange上,exchange就会遍历一遍,所有绑定它的哪些queue,然后把消息发到queue里面,它发了queue就不管了,消费者从queue里面去收,所以就收到广播了,而不是说exchange直接就把消息发给消费者,消费者只会从queue里去读消息,且拿着queue去绑定exchange。
②为什么queue要自动生成,而不是自己手动去写?
答:我这个queue只是为了收广播的,所以如果我消费者不收了,这个queue就不需要了,所以就让它自动生成了,不需要的了,就自动销毁
2.2、广播实时性
广播是实时的,你不在的时候,就是你消费者没有开启的时候,发消息的时候,就没有收到,这个时候就没有了。如果消费者开启了,生产者发消息时,消费者是收的到的,这个又叫订阅发布,收音机模式
1、消费者断开->生产者发消息->消费再连接
①消费者全部断开,生产者发消息
②消费再连接
结论:很显然,消费者是无法收到消息的。
2、消费者连接->生产者发消息
①消费者处于连接状态,生产者发消息
②查看各个消费者
结论:只有在消费者处于连接状态,才能接受消息。