Skip to content

before

pika1.3.2 python3.10

# 队列相关

python
import pika
from pika.exceptions import ChannelClosedByBroker

config = {'host': '127.0.0.1', 'port': 5672, 'virtual_host': '/', 'credentials': pika.PlainCredentials('guest', 'guest')}
connection = pika.BlockingConnection(pika.ConnectionParameters(**config))
channel = connection.channel()

#  声明一个非持久化的队列,如果队列不存在则创建,如果队列存在则使用这个队列,而不是把之前的删掉再重新创建
new1_queue = channel.queue_declare(queue='news1')

#  声明一个持久化的队列,如果队列不存在则创建,如果队列存在则使用这个队列,而不是把之前的删掉再重新创建
new2_queue = channel.queue_declare(queue='news2', durable=True)

for i in range(10):
    # channel.basic_publish(  # 向指定队列发送消息,非持久化消息
    #     exchange='', routing_key='news1', body=f'new{i}'.encode('utf8')
    # )
    channel.basic_publish(  # 向指定队列发送消息,并且标记当前消息为持久化
        exchange='', routing_key='news2', body=f'new{i}'.encode('utf8'),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f'send new{i}')

# 获取队列中信息,注意,当auto_ack=True时,你执行一次这个代码,再获取队列信息的同时,也同时从队列的左边拿一个消息出来消费掉了,这个要特别注意
# 而当auto_ack=False时,即给你返回队列信息,也会从队列左边拿一个消息出来(反复执行,拿到的还是同一个消息),但该消息仍然还在队列中
method_frame, header_frame, body = channel.basic_get(queue='news2', auto_ack=False)
print(
    method_frame)  # <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=21', 'redelivered=False', 'routing_key=news2'])>
print(method_frame.routing_key)  # 绑定的路由的名字
print(method_frame.message_count)  # 当前队列的消息数量
print(method_frame.delivery_tag)  # 消息的唯一标识

print(header_frame)  # <BasicProperties(['delivery_mode=2'])>
print(header_frame.delivery_mode)  # 消息的持久化标识

# 队列的左边拿一个消息出来了
print(body.decode('utf8'))

print(
    new2_queue)  # <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=4', 'queue=news2'])>"])>
print(new2_queue.method.consumer_count)  # 当前队列的消费者数量
print(new2_queue.method.queue)  # 当前队列的名字
print(new2_queue.method.message_count)  # 当前队列的消息数量


# 确认某个队列是否存在,存在则返回队列信息,不存在则报错
try:
    res = channel.queue_declare(queue='news2', passive=True)
    print(res)  # <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=4', 'queue=news2'])>"])>
except ChannelClosedByBroker:
    print('队列不存在')



# 删除指定队列,如果队列存在则删除,不存在也不报错
# channel.queue_delete(queue='news1')

# 关闭连接
channel.close()
connection.close()