TTL队列
什么是TTL队列
TTL(Time To Live)队列,也叫做过期队列,它是rabbitmq 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信",默认的,它将会从队列中删除。
什么是死信队列?
在消息队列中,执行异步任务时,通常是将消息生产者发布的消息存储在队列中,由消费者从队列中获取并处理这些消息。但是,在某些情况下,消息可能无法正常地被处理和消耗,例如:格式错误、设备故障等,这些未成功处理的消息就被称为"死信",更多关于"死信"队列的介绍,在本篇死信队列部分会有详细介绍。
如何设置TTL属性
有两种方式可以设置TTL属性:
- 在声明队列时,就设置好超时时间,这样每一个往该队列中发布的消息都会有TTL属性了,这种方式适用于每个消息的TTL时间是一致的场景。
- 声明队列时正常声明即可,但是生产者往队列中发布消息时为当前消息单独设置TTL属性,这种情况适合根据消息不同,需要设置不同的超时时间的场景。
如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用。
import time
import pika
RABBITMQ_CONFIG = {
'host': '127.0.0.1', 'port': 5672, 'virtual_host': '/',
'credentials': pika.PlainCredentials('guest', 'guest'), # RabbitMQ 用户名和密码
}
class MQ(object):
connection_parameters = pika.ConnectionParameters(**RABBITMQ_CONFIG)
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_CONFIG))
self.channel = self.connection.channel()
def callback(self, ch, method, properties, body):
print(" [消费者] Received %r" % body.decode("utf8"))
ch.basic_ack(delivery_tag=method.delivery_tag)
# 在声明队列时设置ttl,并发布消息
def ttl_for_queue_declare_basic_publish(self):
queue_name = "ttl_for_queue_declare"
self.channel.queue_declare(queue=queue_name, durable=True, arguments={'x-message-ttl': 10000}) # 设置队列中所有的消息过期时间为10秒
for i in range(10):
self.channel.basic_publish(
exchange="",
routing_key=queue_name,
body=f'message {i}'.encode('utf8'),
properties=pika.BasicProperties(delivery_mode=2)
)
# 接收来自ttl_for_queue_declare_basic_publish发布的消息
def ttl_for_queue_declare_recv_msg(self):
# 下面这两行代码是以防万一,防止消费者没有启动就接收消息,那就和消费者声明队列代码保持一致,这样保证无论谁先启动,队列都是一样的
queue_name = "ttl_for_queue_declare"
self.channel.queue_declare(queue=queue_name, durable=True,arguments={'x-message-ttl': 10000}) # 设置队列中所有的消息过期时间为10秒
# 开始接收消息
self.channel.basic_consume(
queue=queue_name,
auto_ack=False,
on_message_callback=self.callback
)
self.channel.start_consuming()
if __name__ == '__main__':
mq = MQ()
# 如果只运行ttl_for_queue_declare_basic_publish,你可以通过web管理面板观察到有10个元素在10秒钟之后自动删除掉了
mq.ttl_for_queue_declare_basic_publish()
# 如果同时有消费者在运行,那么在未超时之前,则会消费掉所有的消息
# mq.ttl_for_queue_declare_recv_msg()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding = utf-8 -*-
import time
import pika
RABBITMQ_CONFIG = {
'host': '127.0.0.1', 'port': 5672, 'virtual_host': '/',
'credentials': pika.PlainCredentials('guest', 'guest'), # RabbitMQ 用户名和密码
}
class MQ(object):
connection_parameters = pika.ConnectionParameters(**RABBITMQ_CONFIG)
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_CONFIG))
self.channel = self.connection.channel()
def callback(self, ch, method, properties, body):
print(" [消费者] Received %r" % body.decode("utf8"))
ch.basic_ack(delivery_tag=method.delivery_tag)
# 生命队列照常声明,发布消息时指定ttl
def ttl_for_basic_publish(self):
queue_name = "ttl_for_basic_publish"
# self.channel.queue_declare(queue=queue_name, durable=True, arguments={'x-message-ttl': 10000}) # 设置队列中所有的消息过期时间为10秒
# 声明队列时,不设置过期时间,消息的过期时间在basic_publish中指定,当然如果两处都指定,那么较小的那个值将会被使用。
self.channel.queue_declare(queue=queue_name, durable=True)
for i in range(10):
self.channel.basic_publish(
exchange="",
routing_key=queue_name,
body=f'message {i}'.encode('utf8'),
properties=pika.BasicProperties(delivery_mode=2, expiration='10000') # 注意expiration的值必须是字符串类型
)
# 接收来自ttl_for_basic_publish发布的消息
def ttl_for_basic_publish_recv_msg(self):
# 下面这两行代码是以防万一,防止消费者没有启动就接收消息,那就和消费者声明队列代码保持一致,这样保证无论谁先启动,队列都是一样的
queue_name = "ttl_for_basic_publish"
self.channel.queue_declare(queue=queue_name, durable=True)
# 开始接收消息
self.channel.basic_consume(
queue=queue_name,
auto_ack=False,
on_message_callback=self.callback
)
self.channel.start_consuming()
if __name__ == '__main__':
mq = MQ()
# 如果只运行ttl_for_basic_publish,你可以通过web管理面板观察到有10个元素在10秒钟之后自动删除掉了
mq.ttl_for_basic_publish()
# 如果同时有消费者在运行,那么在未超时之前,则会消费掉所有的消息
mq.ttl_for_basic_publish_recv_msg()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
延迟队列
https://www.bmabk.com/index.php/post/171223.html
惰性队列
https://www.bmabk.com/index.php/post/191078.html
死信队列
死信(Dead Letter),是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
- 消息被否定确认,使用
channel.basicNack
或channel.basicReject
,并且此时requeue
属性被设置为false
。 - 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
那么该消息将成为"死信"。"死信"消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
下面两个示例演示了发布订阅模式和关键字模式下的死信队列的使用。
import random
import pika
import arrow
from pika.exchange_type import ExchangeType
RABBITMQ_CONFIG = {
'host': '127.0.0.1', 'port': 5672, 'virtual_host': '/',
'credentials': pika.PlainCredentials('guest', 'guest'), # RabbitMQ 用户名和密码
}
class MQ(object):
connection_parameters = pika.ConnectionParameters(**RABBITMQ_CONFIG)
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_CONFIG))
self.channel = self.connection.channel()
self.dead_exchange_name = 'my_dead_exchange_direct' # 死信交换机
self.dead_queue_name = 'my_dead_queue_direct' # 死信队列
self.dlx_exchange_name = "my_dlx_exchange_direct" # 普通队列所需要绑定的交换机
self.dlx_queue_name = "my_dlx_queue_direct" # 普通的队列,只不过和死信队列做了绑定关系
def time_difference_value(self):
""" 计算当前距离凌晨23:59:59的时间差值 """
now = arrow.now()
end_of_day = now.replace(hour=23, minute=59, second=59)
# 计算的差值是秒,但ttl那里要求的是毫秒,所以要乘以1000
# 而且要求是字符串格式,所以要转换一下
return str(int((end_of_day - now).total_seconds()) * 1000)
def dead_queue(self):
# 创建死信交换机和死信队列,并且将死信队列和死信交换机绑定
self.channel.exchange_declare(exchange=self.dead_exchange_name, exchange_type=ExchangeType.direct, durable=True)
# 声明死信队列,并设置持久化
self.channel.queue_declare(queue=self.dead_queue_name, durable=True)
# 将死信队列和死信交换机绑定
self.channel.queue_bind(queue=self.dead_queue_name, exchange=self.dead_exchange_name,
routing_key=self.dead_queue_name)
def dlx_queue(self):
# 创建普通消息队列,让其和死信队列绑定
self.channel.queue_declare(queue=self.dlx_queue_name, durable=True, arguments={
"x-dead-letter-exchange": self.dead_exchange_name,
# 上面的交换机是direct,那这里,就加上x-dead-letter-routing-key,表示将来普通队列中的消息需要放到死信队列,可以直接通过该路由放进去
'x-dead-letter-routing-key': self.dead_queue_name,
'x-max-length': 100, # 队列长度
})
def send_dlx_message(self):
""" 正常队列的生产者 """
for i in range(40):
self.channel.basic_publish(
exchange="", # 如果有交换机可以绑定,没有就留空
routing_key=self.dlx_queue_name,
body=f"message {i}".encode('utf8'),
# properties=pika.BasicProperties(delivery_mode=2, expiration=self.time_difference_value()), # 这个是让其凌晨23:59:59秒过期
properties=pika.BasicProperties(delivery_mode=2, expiration="20000") # 每个消息都固定设置20秒过期
)
def mq_dlx_callback(self, ch, method, properties, body):
""" 正常队列的消费者的回调函数 """
# 模拟不同情况的消息处理
if random.randint(0, 1) == 0:
# 已经确认的消息不会进入死信队列
ch.basic_ack(delivery_tag=method.delivery_tag)
print(" Received %r" % body.decode("utf8"))
else:
print("Received %r" % body.decode("utf8"))
# 拿到消息了,但是由于其它原因需要将这个消息重新进入普通队列,再次尝试
# self.channel.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
# 拿到消息了,但是由于其它原因需要将这个丢到死信队列中
# self.channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
# 拿到消息了,但是拒收,不会重新进入队列,而是进入死信队列
self.channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def mq_dead_callback(self, ch, method, properties, body):
""" 死信队列的消费者的回调函数 """
# 对于死信队列中的消息,我们可以进行相应的处理,比如记录日志、发送通知,更改一些状态信息等
ch.basic_ack(delivery_tag=method.delivery_tag)
print("Received %r" % body.decode("utf8"))
def receive_dead_message(self):
""" 死信队列的消费者 """
# 死信队列也是普通队列,所以我们可以配置一个或者多个消费者来消费该死信队列中的消息
self.channel.basic_consume(
queue=self.dead_queue_name,
on_message_callback=self.mq_dead_callback,
auto_ack=False
)
self.channel.start_consuming()
def receive_dlx_queue_message(self):
""" 正常队列的消费者 """
# 正常的队列也照样可以配置一个或者多个消费者来消费该队列中的消息
self.channel.basic_consume(
queue=self.dlx_queue_name,
on_message_callback=self.mq_dlx_callback,
auto_ack=False
)
self.channel.start_consuming()
if __name__ == '__main__':
mq = MQ()
mq.dead_queue() # 创建死信交换机和死信队列
mq.dlx_queue() # 创建普通队列并且和死信交换机绑定
mq.send_dlx_message() # 发送消息到普通队列
# mq.receive_dead_message() # 接收死信队列中的消息
# mq.receive_dlx_queue_message() # 接收普通队列中的消息
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import random
import pika
import arrow
from pika.exchange_type import ExchangeType
RABBITMQ_CONFIG = {
'host': '127.0.0.1', 'port': 5672, 'virtual_host': '/',
'credentials': pika.PlainCredentials('guest', 'guest'), # RabbitMQ 用户名和密码
}
class MQ(object):
connection_parameters = pika.ConnectionParameters(**RABBITMQ_CONFIG)
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_CONFIG))
self.channel = self.connection.channel()
self.dead_exchange_name = 'my_dead_exchange_fanout' # 死信交换机
self.dead_queue_name1 = 'my_dead_queue1_fanout' # 死信队列
self.dead_queue_name2 = 'my_dead_queue2_fanout' # 死信队列
self.dlx_exchange_name = "my_dlx_exchange_fanout" # 普通队列所需要绑定的交换机
self.dlx_queue_name = "my_dlx_queue_fanout" # 普通的队列,只不过和死信队列做了绑定关系
def time_difference_value(self):
""" 计算当前距离凌晨23:59:59的时间差值 """
now = arrow.now()
end_of_day = now.replace(hour=23, minute=59, second=59)
# 计算的差值是秒,但ttl那里要求的是毫秒,所以要乘以1000
# 而且要求是字符串格式,所以要转换一下
return str(int((end_of_day - now).total_seconds()) * 1000)
def dead_queue(self):
# 创建死信交换机和死信队列,并且将死信队列和死信交换机绑定
# exchange_type的值是fanout的话,那么你可以搞若干个队列绑定该交换机,然后只要有消息该交换机发送过来,这些队列就都能收到
self.channel.exchange_declare(exchange=self.dead_exchange_name, exchange_type=ExchangeType.fanout, durable=True)
# 声明死信队列,并设置持久化
self.channel.queue_declare(queue=self.dead_queue_name1, durable=True)
# 将死信队列和死信交换机绑定
self.channel.queue_bind(queue=self.dead_queue_name1, exchange=self.dead_exchange_name,
routing_key=self.dead_queue_name1)
# 声明死信队列,并设置持久化
self.channel.queue_declare(queue=self.dead_queue_name2, durable=True)
# 将死信队列和死信交换机绑定
self.channel.queue_bind(queue=self.dead_queue_name2, exchange=self.dead_exchange_name,
routing_key=self.dead_queue_name2)
def dlx_queue(self):
# 创建普通消息队列,让其和死信队列绑定
# self.channel.exchange_declare(exchange=self.dlx_exchange_name, exchange_type=ExchangeType.direct, durable=True)
self.channel.queue_declare(queue=self.dlx_queue_name, durable=True, arguments={
"x-dead-letter-exchange": self.dead_exchange_name,
# 上面的交换机是fanout,那这里,就可以省略掉路由参数了,我们直接将需要放入死信的消息,直接转发到发布订阅模式的交换机,那么谁跟
# 该交换机绑定,谁就可以收到消息
# 'x-dead-letter-routing-key': self.dead_queue_name,
'x-max-length': 100, # 队列长度
})
def send_dlx_message(self):
""" 正常队列的生产者 """
for i in range(40):
self.channel.basic_publish(
# exchange=self.dlx_exchange_name, # 如果有交换机可以绑定,没有就留空
exchange="", # 如果有交换机可以绑定,没有就留空
routing_key=self.dlx_queue_name,
body=f"message {i}".encode('utf8'),
# properties=pika.BasicProperties(delivery_mode=2, expiration=self.time_difference_value()), # 这个是让其凌晨23:59:59秒过期
properties=pika.BasicProperties(delivery_mode=2, expiration="20000") # 每个消息都固定设置20秒过期
)
def mq_dlx_callback(self, ch, method, properties, body):
""" 正常队列的消费者的回调函数 """
# 模拟不同情况的消息处理
if random.randint(0, 1) == 0:
# 已经确认的消息不会进入死信队列
ch.basic_ack(delivery_tag=method.delivery_tag)
print(" Received %r" % body.decode("utf8"))
else:
print("Received %r" % body.decode("utf8"))
# 拿到消息了,但是由于其它原因需要将这个消息重新进入普通队列,再次尝试
# self.channel.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
# 拿到消息了,但是由于其它原因需要将这个丢到死信队列中
# self.channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
# 拿到消息了,但是拒收,不会重新进入队列,而是进入死信队列
self.channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def mq_dead_callback(self, ch, method, properties, body):
""" 死信队列的消费者的回调函数 """
# 对于死信队列中的消息,我们可以进行相应的处理,比如记录日志、发送通知,更改一些状态信息等
ch.basic_ack(delivery_tag=method.delivery_tag)
print("Received %r" % body.decode("utf8"))
def receive_dead_message(self):
""" 死信队列的消费者 """
# 死信队列也是普通队列,所以我们可以配置一个或者多个消费者来消费该死信队列中的消息
self.channel.basic_consume(
queue=self.dead_queue_name1,
on_message_callback=self.mq_dead_callback,
auto_ack=False
)
self.channel.start_consuming()
def receive_dlx_queue_message(self):
""" 正常队列的消费者 """
# 正常的队列也照样可以配置一个或者多个消费者来消费该队列中的消息
self.channel.basic_consume(
queue=self.dlx_queue_name,
on_message_callback=self.mq_dlx_callback,
auto_ack=False
)
self.channel.start_consuming()
if __name__ == '__main__':
mq = MQ()
mq.dead_queue() # 创建死信交换机和死信队列
mq.dlx_queue() # 创建普通队列并且和死信交换机绑定
mq.send_dlx_message() # 发送消息到普通队列
# mq.receive_dead_message() # 接收死信队列中的消息
# mq.receive_dlx_queue_message() # 接收普通队列中的消息
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
死信队列结合TTL实现消息重试限制
有个很常见的需求:消费者在消费消息时,拿到消息在干些别的业务,从而产生失败的情况,那就需要把消息从新放入消息队列,然后再重新拿到这个消息进行重新尝试。
对于这样的需求,如果不限制重新尝试次数的话,我们可以很方便的使用channel.basic_reject
方法结合requeue=True
参数,很方便的把消息重新入队。
def mq_dlx_callback(ch, method, properties, body):
""" 正常队列的消费者的回调函数 """
# 模拟不同情况的消息处理
if random.randint(0, 1) == 0:
# 已经确认的消息算是消费成功,可以写入日志或者执行别的逻辑了
ch.basic_ack(delivery_tag=method.delivery_tag)
print(" Received %r" % body.decode("utf8"))
else:
print("Received %r" % body.decode("utf8"))
# 对于拿到消息了,但是由于其它原因需要将这个消息重新进入普通队列,再次尝试
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
2
3
4
5
6
7
8
9
10
11
但问题也是有的,按照上面这种方式,这个消息有可能会永远消费失败从而永远处于重新尝试状态.....这无疑就增加很多额外的开销。
对于这种情况,我们很容易想到解决办法,那就是限制重新尝试的次数,但很遗憾,我只查到Java实现的重新尝试方案,貌似Python的pika模块中没有开箱即用的配置或者方法(也可有可能我没查到,有大佬查到或者有更优的实现可以告诉我哦),那怎么办?自己写代码实现呗。
下面的示例,实现了上述这个功能,也就是结合ttl实现,在当天23:59:59
之前,允许消息重复尝试指定次数,如果在达到指定次数限制之前,消费成功了,就走消费成功的逻辑。一旦达到指定次数限制,直接将该消息打入死信队列就完了。
实现原理就是发送生产者发布消息时,为消息的请求头添加一个键值对,记录该消息被消费的次数,消费者拿到消息就就去请求头中判断下这个值,当值没有达到指定次数限制之前,就手动的把这个消息确认下(不然会出现很多未确认的情况),然后请求头中的键值对的值加一,再手动往队列中发布下这个消息。
""" 生产者这里主要代码就是发消息时在请求头中带上初始的retry标记值 """
import random
import pika
import arrow
from pika.exchange_type import ExchangeType
RABBITMQ_CONFIG = {
'host': '127.0.0.1', 'port': 5672, 'virtual_host': '/',
'credentials': pika.PlainCredentials('guest', 'guest'), # RabbitMQ 用户名和密码
}
class MQ(object):
connection_parameters = pika.ConnectionParameters(**RABBITMQ_CONFIG)
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_CONFIG))
self.channel = self.connection.channel()
self.dead_exchange_name = 'my_dead_exchange_direct' # 死信交换机
self.dead_queue_name = 'my_dead_queue_direct' # 死信队列
self.dlx_exchange_name = "my_dlx_exchange_direct" # 普通队列所需要绑定的交换机
self.dlx_queue_name = "my_dlx_queue_direct" # 普通的队列,只不过和死信队列做了绑定关系
def time_difference_value(self):
""" 计算当前距离凌晨23:59:59的时间差值 """
now = arrow.now()
end_of_day = now.replace(hour=23, minute=59, second=59)
return str(int((end_of_day - now).total_seconds()) * 1000)
def dead_queue(self):
# 创建死信交换机和死信队列,并且将死信队列和死信交换机绑定
self.channel.exchange_declare(exchange=self.dead_exchange_name, exchange_type=ExchangeType.direct, durable=True)
# 声明死信队列,并设置持久化
self.channel.queue_declare(queue=self.dead_queue_name, durable=True)
# 将死信队列和死信交换机绑定
self.channel.queue_bind(queue=self.dead_queue_name, exchange=self.dead_exchange_name, routing_key=self.dead_queue_name)
def dlx_queue(self):
# 创建普通的消息队列
self.channel.queue_declare(queue=self.dlx_queue_name, durable=True, arguments={
"x-dead-letter-exchange": self.dead_exchange_name,
'x-dead-letter-routing-key': self.dead_queue_name,
# 'x-max-length': 100, # 队列长度
})
def send_dlx_message(self):
""" 正常队列的生产者 """
for i in range(2):
self.channel.basic_publish(
exchange="", # 如果有交换机可以绑定,没有就留空
routing_key=self.dlx_queue_name,
body=f"message {i}".encode('utf8'),
properties=pika.BasicProperties(
delivery_mode=2,
expiration=self.time_difference_value(), # 这个是让其凌晨23:59:59秒过期
headers={'x-retry-count': 1} # 重点代码就是这个,每次发送请求时在请求头中添加个键值对记录该消息被消费的次数
),
)
if __name__ == '__main__':
mq = MQ()
mq.dead_queue() # 创建死信交换机和死信队列
mq.dlx_queue() # 创建普通队列并且和死信交换机绑定
mq.send_dlx_message() # 发送消息到普通队列
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
""" 重点代码在mq_dlx_callback方法和retry方法中 """
import random
import pika
import arrow
from pika.exchange_type import ExchangeType
RABBITMQ_CONFIG = {
'host': '127.0.0.1', 'port': 5672, 'virtual_host': '/',
'credentials': pika.PlainCredentials('guest', 'guest'), # RabbitMQ 用户名和密码
}
class MQ(object):
retry_max_count = 3 # 消息重新尝试次数限制
connection_parameters = pika.ConnectionParameters(**RABBITMQ_CONFIG)
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_CONFIG))
self.channel = self.connection.channel()
self.dead_exchange_name = 'my_dead_exchange_direct' # 死信交换机
self.dead_queue_name = 'my_dead_queue_direct' # 死信队列
self.dlx_exchange_name = "my_dlx_exchange_direct" # 普通队列所需要绑定的交换机
self.dlx_queue_name = "my_dlx_queue_direct" # 普通的队列,只不过和死信队列做了绑定关系
def time_difference_value(self):
""" 计算当前距离凌晨23:59:59的时间差值 """
now = arrow.now()
end_of_day = now.replace(hour=23, minute=59, second=59)
# 计算的差值是秒,但ttl那里要求的是毫秒,所以要乘以1000
# 而且要求是字符串格式,所以要转换一下
return str(int((end_of_day - now).total_seconds()) * 1000)
def dead_queue(self):
# 创建死信交换机和死信队列,并且将死信队列和死信交换机绑定
self.channel.exchange_declare(exchange=self.dead_exchange_name, exchange_type=ExchangeType.direct, durable=True)
# 声明死信队列,并设置持久化
self.channel.queue_declare(queue=self.dead_queue_name, durable=True)
# 将死信队列和死信交换机绑定
self.channel.queue_bind(queue=self.dead_queue_name, exchange=self.dead_exchange_name,
routing_key=self.dead_queue_name)
def dlx_queue(self):
self.channel.queue_declare(queue=self.dlx_queue_name, durable=True, arguments={
"x-dead-letter-exchange": self.dead_exchange_name,
'x-dead-letter-routing-key': self.dead_queue_name,
# 'x-max-length': 100, # 队列长度
})
def retry(self, ch, body, count, method):
""" 消息尝试次数 """
if count >= self.retry_max_count:
self.channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
else:
# 首先确认下消息,也就是保证不要有未确认的消息
ch.basic_ack(delivery_tag=method.delivery_tag)
# 然后手动的往队列中发消息
self.channel.basic_publish(
exchange="", # 如果有交换机可以绑定,没有就留空
routing_key=self.dlx_queue_name,
body=body, # 消息内容还是原来的消息内容
properties=pika.BasicProperties(
delivery_mode=2,
expiration=self.time_difference_value(), # 过期时间
headers={'x-retry-count': count + 1} # 让重试次数加一
),
)
def mq_dlx_callback(self, ch, method, properties, body):
""" 正常队列的消费者的回调函数 """
# 模拟不同情况的消息处理
print(f"Received {body.decode('utf8')}, retry: {properties.headers.get('x-retry-count')}")
if body.decode("utf8").endswith("1"):
ch.basic_ack(delivery_tag=method.delivery_tag) # 认为是消费成功,手动确认消息就完了
else:
# 消费失败的话就走重新尝试逻辑
self.retry(ch, body, properties.headers.get('x-retry-count'), method)
def mq_dead_callback(self, ch, method, properties, body):
""" 死信队列的消费者的回调函数 """
# 对于死信队列中的消息,我们可以进行相应的处理,比如记录日志、发送通知,更改一些状态信息等
ch.basic_ack(delivery_tag=method.delivery_tag)
print("Received %r" % body.decode("utf8"))
def receive_dead_message(self):
""" 死信队列的消费者 """
# 死信队列也是普通队列,所以我们可以配置一个或者多个消费者来消费该死信队列中的消息
self.channel.basic_consume(
queue=self.dead_queue_name,
on_message_callback=self.mq_dead_callback,
auto_ack=False
)
self.channel.start_consuming()
def receive_dlx_queue_message(self):
""" 正常队列的消费者 """
# 正常的队列也照样可以配置一个或者多个消费者来消费该队列中的消息
self.channel.basic_consume(
queue=self.dlx_queue_name,
on_message_callback=self.mq_dlx_callback,
auto_ack=False
)
self.channel.start_consuming()
if __name__ == '__main__':
mq = MQ()
mq.dead_queue() # 创建死信交换机和死信队列
mq.dlx_queue() # 创建普通队列并且和死信交换机绑定
# mq.receive_dead_message() # 接收死信队列中的消息
mq.receive_dlx_queue_message() # 接收普通队列中的消息
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
参考: