博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ学习小结(二)—— Work Queues[Python]
阅读量:3512 次
发布时间:2019-05-20

本文共 5995 字,大约阅读时间需要 19 分钟。

1. 简介

在Hello World中,已经学会如何发送和接收消息,但是在实际的应用过程中,并不是简单的接收和发送。例如:当我们有复杂需求,我们需要提升效率,毕竟只有一个消费者难免处理不过来,就如官网中所提到的一样——
在这篇教程中,将创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Worker)。
工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

2. 应用场景

在Hello World中只是单独的发送一个字符串,但是实际应用中可能是图片,可能是pdf处理等等,可能需要消耗的时间长短不一,根据官网示例不做复杂操作,而是使用符号“.”来代表任务的难易程度,从而用time.sleep()来控制任务的时间长短。一个点(.)将会耗时1秒钟。比如"Hello..."就会耗时3秒钟。

3.  Work Queues

我们需要在Hello World的基础上进行修改,结合上一章节未做解释的参数,一一处理,我们先介绍以下几个关键词,最后再进行总结和整合。

循环调度

使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以,然后扩展也很简单。
默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。

消息确认

简单来说,当RabbitMQ将任务丢给worker(也就是消费者)处理时,只有worker给RabbitMQ返回确认消息时,RabbitMQ才认为这条消息被正确处理完成,继而将消息移除。否则RabbitMQ认为消息没有处理还会继续保留,当worker挂掉时,RabbitMQ会将这个任务交给另一个worker来处理,直到处理完成。
基于此,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。我们下面需要移除这个标识,并在工作者(worker)完成任务后,就发送一个响应。
代码如下:
def callback(ch, method, properties, body):    """        @ch: channel 通道,是由外部调用时上送        out body        读取队列内容做不同的操作    """    print " [x] Recived %r" % (body, )    print " [x] ch {0}".format(ch)    print " [x] method {0}".format(method)    print " [x] properties {0}".format(properties)    time.sleep(body.count('.'))    print " [x] Done %r" % (body, )    ch.basic_ack(delivery_tag=method.delivery_tag)
使用basic_ack来发送确认消息。
注意:
如果忘记确认也是一件非常危险的事情。从技术上来讲,这条消息即使被处理也不会从内存移除,这就造成可用内存会越来越小,最终造成内存溢出。从业务上来来讲,同一条消息被处理两次,涉及到业务模型可能会造成重复处理,造成严重后果。
对于这种问题也可以通过rabbitmqctl输出messages_unacknowledged字段:
[tRabbitMQ@iZ250x18mnzZ src]$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledgedListing queues ...task_queue      0       0hello   0       0[tRabbitMQ@iZ250x18mnzZ src]$

持久化

如果没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。
队列持久化,需要在声明队列时,通过参数durable=True来声明队列是持久化
channel.queue_declare(queue='hello', durable=True)
尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫hello的非持久化队列。RabbitMq不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如task_queue。
channel.queue_declare(queue='task_queue', durable=True)
后面我们在发送消息和接收消息中,再对相应声明队列的地方再做修改
消息持久化,我们需要将delivery_mode的属性设为2。
channel.basic_publish(exchange='',	                      routing_key="task_queue",	                      body=message,	                      properties=pika.BasicProperties(	                         delivery_mode = 2, # make message persistent	                      ))
注意:
将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你需要改写你的代码来支持事务(transaction)。

公平调度

默认情况下,如果没有特殊说明,RabbitMQ只管将消息分配给woker,而rabbitMQ并不知道woker是否能够应付得了相应的消息,也不会关心有多少worker没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。
可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
channel.basic_qos(prefetch_count=1)
注意:
关于工作队列的大小
如果所有的worker都处理繁忙状态,你的队列就会被填满。需要注意这个问题,要么添加更多的工作者(workers),要么使用其他策略。

总结

在第一章的基础上,我们在这里重点增加了以下几个概念
工作队列:实际应用过程中,队列是可以并行处理的,我们可以通过增加worker的方式提升工作效率
持久化:为了安全性,我们需要保证队列不能丢失,那么队列以及消息都必须要保持持久化
队列持久化:声明时,指定durable=True
消息持久化:指定消息属性,delivery_mode=2
消息确认:保持完整性,在worker处理完成后需要通知RabbitMQ已经处理,这样整个流程才能保持完整,避免风险。
worker处理时,通过basic_ack来进行确认消息
公平调度:RabbitMQ只负责分发,而不关注worker是否处理完成,所以worker中需要指定,我最多同时可以处理多少任务,处理多大的任务,否则rabbitMQ会无限制的发送,导致崩溃。
worker处理时,通过basic_qos(prefetch_count=1)来控制
整合代码
发送程序
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Date    : 2016-02-28 21:28:17# @Author  : mx (mx472756841@gmail.com)# @Link    : http://www.shujutiyu.com.cn/# @Version : $Id$import osimport pikaimport sysconn = Nonemessage = ' '.join(sys.argv[1:]) or "Hello World!"try:    # 获取连接    conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))    # 获取通道    channel = conn.channel()    # 在发送队列前,需要确定队列是否存在,如果不存在RabbitMQ将会丢弃,先创建队列    # @param: durable 队列持久化, 默认False    channel.queue_declare('task_queue', durable=True)    # 在RabbitMQ中发送消息,不是直接发送队列,而是发送交换机(exchange),此处不多做研究    ret = channel.basic_publish(exchange='',                                routing_key='task_queue',                                body=message,                                properties=pika.BasicProperties(                                    delivery_mode=2, # 消息持久化                                ) )    print " [x] Sent '{0}'".format(message)    print retexcept Exception, e:    raise efinally:    if conn:        conn.close()
接收程序
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Date    : 2016-02-29 16:30:21# @Author  : mx (mx472756841@gmail.com)# @Link    : http://www.shujutiyu.com/# @Version : $Id$import osimport pikaimport timeconn = Nonedef callback(ch, method, properties, body):    """        @ch: channel 通道,是由外部调用时上送        out body        读取队列内容做不同的操作    """    print " [x] Recived %r" % (body, )    print " [x] ch {0}".format(ch)    print " [x] method {0}".format(method)    print " [x] properties {0}".format(properties)    time.sleep(body.count('.'))    print " [x] Done %r" % (body, )    ch.basic_ack(delivery_tag=method.delivery_tag)try:    # get connection    conn = pika.BlockingConnection(pika.ConnectionParameters(        'localhost')    )    # get channel    channel = conn.channel()    # declare queue, 重复声明不会报错,但是没有队列的话直接取用会报错    # @param: durable  队列持久化    channel.queue_declare('task_queue', durable=True)    # set channel 在同一时刻处理一条消息    channel.basic_qos(prefetch_count=1)    # get message    # @no_ack,不需要返回确认消息, 默认是False,设置的话,有可能会丢包,不设置的话,    # 当不返回确认消息时,有可能重复处理或者是内存溢出。    channel.basic_consume(callback, queue='task_queue')    print ' [*] Waiting for messages. To exit press CTRL+C'    channel.start_consuming()except Exception, e:    raise efinally:    if conn:        conn.close()
本章就介绍到这里,其实还有好多可用的东西,可以到下面提供的参考资料查看,也可以查看pika的源码,自取所需。

4.  参考资料

中文资料:
官网资料:

转载地址:http://mfbqj.baihongyu.com/

你可能感兴趣的文章
插入排序
查看>>
哈夫曼树java代码实现
查看>>
快速排序
查看>>
vue路由高亮的两种方式
查看>>
vue router 报错: Uncaught (in promise) NavigationDuplicated {_name:""NavigationDuplicated"... 的解决方法
查看>>
vue跳转页面的两种方式
查看>>
存储器题目解析(持续更新中....)
查看>>
存储器知识要点
查看>>
Cache模拟器的实现
查看>>
实验2:MIPS指令系统和MIPS体系结构
查看>>
设计模式七大原则
查看>>
手写 | spring事务
查看>>
AndroidStudio Gradle手动下载
查看>>
SpringBoot入门(二)场景启动器
查看>>
SpringBoot入门--自动配置
查看>>
springboot读取配置文件 例:读取配置文件的优先顺序;在主配置文件中激活其他配置文件;加载非主配置文件
查看>>
自动配置原理
查看>>
TCP协议
查看>>
关于Linux系统使用遇到的问题-1:vi 打开只读(readonly)文件如何退出保存?
查看>>
redis 持久化详解,RDB和AOF是什么?他们优缺点是什么?运行流程是什么?
查看>>