before
pika1.3.2 python3.10
# 队列相关
python
import pika
from pika.exceptions import ChannelClosedByBroker
config = {'host': '127.0.0.1', 'port': 5672, 'virtual_host': '/', 'credentials': pika.PlainCredentials('guest', 'guest')}
connection = pika.BlockingConnection(pika.ConnectionParameters(**config))
channel = connection.channel()
# 声明一个非持久化的队列,如果队列不存在则创建,如果队列存在则使用这个队列,而不是把之前的删掉再重新创建
new1_queue = channel.queue_declare(queue='news1')
# 声明一个持久化的队列,如果队列不存在则创建,如果队列存在则使用这个队列,而不是把之前的删掉再重新创建
new2_queue = channel.queue_declare(queue='news2', durable=True)
for i in range(10):
# channel.basic_publish( # 向指定队列发送消息,非持久化消息
# exchange='', routing_key='news1', body=f'new{i}'.encode('utf8')
# )
channel.basic_publish( # 向指定队列发送消息,并且标记当前消息为持久化
exchange='', routing_key='news2', body=f'new{i}'.encode('utf8'),
properties=pika.BasicProperties(delivery_mode=2)
)
print(f'send new{i}')
# 获取队列中信息,注意,当auto_ack=True时,你执行一次这个代码,再获取队列信息的同时,也同时从队列的左边拿一个消息出来消费掉了,这个要特别注意
# 而当auto_ack=False时,即给你返回队列信息,也会从队列左边拿一个消息出来(反复执行,拿到的还是同一个消息),但该消息仍然还在队列中
method_frame, header_frame, body = channel.basic_get(queue='news2', auto_ack=False)
print(
method_frame) # <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=21', 'redelivered=False', 'routing_key=news2'])>
print(method_frame.routing_key) # 绑定的路由的名字
print(method_frame.message_count) # 当前队列的消息数量
print(method_frame.delivery_tag) # 消息的唯一标识
print(header_frame) # <BasicProperties(['delivery_mode=2'])>
print(header_frame.delivery_mode) # 消息的持久化标识
# 队列的左边拿一个消息出来了
print(body.decode('utf8'))
print(
new2_queue) # <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=4', 'queue=news2'])>"])>
print(new2_queue.method.consumer_count) # 当前队列的消费者数量
print(new2_queue.method.queue) # 当前队列的名字
print(new2_queue.method.message_count) # 当前队列的消息数量
# 确认某个队列是否存在,存在则返回队列信息,不存在则报错
try:
res = channel.queue_declare(queue='news2', passive=True)
print(res) # <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=4', 'queue=news2'])>"])>
except ChannelClosedByBroker:
print('队列不存在')
# 删除指定队列,如果队列存在则删除,不存在也不报错
# channel.queue_delete(queue='news1')
# 关闭连接
channel.close()
connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60