概述
队列是一种常用的生产者、消费者通信机制,在业务中,常常需要投递延时消息,也就需要延时队列。比如rabbitmq原生是不支持延时队列的,要实现延时队列功能,常用的有两种方式:1、使用死信队列;2、使用延时插件。
python标准库Queue是python提供的队列,同样原生是不支持延时队列的。下面借鉴了libevent中的将timeout融合到事件循环中的思想,提供了一种python延时队列的实现方式。
代码
from queue import Queue, PriorityQueue, Empty
import time
from datetime import datetime, timedelta
import threading
class Delay_Queue:
def __init__(self):
self._real_q = Queue()
self._notify_q = Queue()
# 存储timeout需要一个小根堆,这里使用PriorityQueue去替代,更加方便
self._delay_q = PriorityQueue()
self._delay_queue_status = dict()
# 是否有既不在_delay_q也不在_real_q中的消息,用于正确计算qsize
self._delay_queue_status.update({"has_waiting_msg": False})
self._thread = threading.Thread(target=Delay_Queue._deal_delay, args=(
self._real_q, self._delay_q, self._notify_q, self._delay_queue_status))
self._thread.start()
@staticmethod
def _deal_delay(q, delay_q, notify_q, delay_queue_status):
while True:
(that_time, msg) = delay_q.get()
delay_queue_status.update({"has_waiting_msg": True})
if datetime.now() >= that_time:
q.put_nowait(msg)
else:
try:
delta_time = (that_time - datetime.now())
delta_days = delta_time.days
delta_seconds = delta_time.seconds
delta_microseconds = delta_time.microseconds
total_delta_seconds = delta_days * 24 * 60 * 60 + \
delta_seconds + delta_microseconds / 1000000
# 这里使用queue的超时机制去做延时,借用了libevent中使用epoll的超时机制去做延时,可能让新的延时消息进来,重新调整延时时间优先级。
notify_q.get(timeout=total_delta_seconds)
# 超时时间没到,且中间有延时消息插进来,重新将该消息放回延时队列,并重新评判优先级。(调整堆)
delay_q.put_nowait((that_time, msg))
continue
except Empty as e:
# 等待超时,说明等待时间内没有新的延时消息进来,并且当前消息延时时间已到,放入real_q
q.put_nowait(msg)
delay_queue_status.update({"has_waiting_msg": False})
def qsize(self):
if self._delay_queue_status.get("has_waiting_msg"):
return self._real_q.qsize() + self._delay_q.qsize() + 1
else:
return self._real_q.qsize() + self._delay_q.qsize()
def empty(self):
if not self._delay_queue_status.get("has_waiting_msg"):
return self._real_q.empty() and self._delay_q.empty()
else:
return False
def full(self):
pass
def put(self, item, block=True, timeout=None, delay_s=None):
if delay_s is None:
return self._real_q.put(item, block, timeout)
elif isinstance(delay_s, int):
# 发送延时消息时,通知_deal_delay线程有一个新的延时消息进来,timeout优先级可能发生变化,线程去调整一下timeout堆
self._notify_q.put(1)
return self._delay_q.put((datetime.now() + timedelta(seconds=delay_s), item), block, timeout)
else:
raise Exception("put error.")
def put_nowait(self, item, delay_s=None):
return self.put(item, block=False, delay_s=delay_s)
def get(self, block=True, timeout=None):
return self._real_q.get(block, timeout)
def get_nowait(self):
return self.get(False)
def task_done(self):
pass
def join(self):
pass
版权声明:本文为qq_37264095原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。