Skip to content

消息队列

通过Redis的列表可以实现消息队列。

在消息队列中,常用的方法:

python

消息分发

默认的消息分发模式是循环分发,这是理想的情况,也就是多个消费者同时监听同一个消息队列,而且每个消费者的能力都是一样的,那么,队列中的消息则会循环的分发给每一个消费者。

但当某个消费者能力不同时,往往能力强的消费者会拿到更多的消息,我将这种机制称之为公平分发,也就是性能强的多干活,性能差的少干活。

循环分发

python
"""
生产者正常的往队列中放消息
"""
import redis

conn = redis.Redis(
    connection_pool=redis.ConnectionPool(
        host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
    )
)


def foo():
    for i in range(100):
        conn.lpush('test_queue', f"message{i}")


if __name__ == '__main__':
    foo()
python
import redis

conn = redis.Redis(
    connection_pool=redis.ConnectionPool(
        host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
    )
)

def foo():
    while True:
        res = conn.brpop("test_queue", timeout=5)
        if res:
            print(f"消费者1,队列名称:{res[0]}, 消息:{res[1]}")
if __name__ == '__main__':
    foo()
"""
消费者1,队列名称:test_queue, 消息:message2
消费者1,队列名称:test_queue, 消息:message5
消费者1,队列名称:test_queue, 消息:message8
"""
python
import redis

conn = redis.Redis(
    connection_pool=redis.ConnectionPool(
        host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
    )
)

def foo():
    while True:
        res = conn.brpop("test_queue", timeout=5)
        if res:
            print(f"消费者2,队列名称:{res[0]}, 消息:{res[1]}")

if __name__ == '__main__':
    foo()
"""
消费者2,队列名称:test_queue, 消息:message0
消费者2,队列名称:test_queue, 消息:message3
消费者2,队列名称:test_queue, 消息:message6
消费者2,队列名称:test_queue, 消息:message9
"""
python
import redis

conn = redis.Redis(
    connection_pool=redis.ConnectionPool(
        host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
    )
)

def foo():
    while True:
        res = conn.brpop("test_queue", timeout=5)
        if res:
            print(f"消费者3,队列名称:{res[0]}, 消息:{res[1]}")

if __name__ == '__main__':
    foo()
"""
消费者3,队列名称:test_queue, 消息:message1
消费者3,队列名称:test_queue, 消息:message4
消费者3,队列名称:test_queue, 消息:message7
"""

公平分发

公平分发这里,下面示例中,通过sleep来模拟各自的能力不同,然后可以观察各自的拿到的消息数量。

python
"""
生产者每0.01秒往队列中放一个消息,一共500个消息
"""
import time
import redis

conn = redis.Redis(
    connection_pool=redis.ConnectionPool(
        host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
    )
)

def foo():
    for i in range(500):
        conn.lpush('test_queue', f"message{i}")
        time.sleep(0.01)

if __name__ == '__main__':
    foo()
python
"""
消费者1每消费1个消息,就sleep0.1秒,最终的count结果是:319
"""
import time
import redis

conn = redis.Redis(
    connection_pool=redis.ConnectionPool(
        host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
    )
)


def foo():
    count = 0
    while True:
        res = conn.brpop("test_queue", timeout=5)
        if res:
            print(f"消费者1,队列名称:{res[0]}, 消息:{res[1]}")
            count += 1
        print(count)
        time.sleep(0.1)


if __name__ == '__main__':
    foo()
python
"""
消费者2每消费1个消息,就sleep0.3秒,最终的count结果是:112
"""
import time
import redis

conn = redis.Redis(
    connection_pool=redis.ConnectionPool(
        host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
    )
)

def foo():
    count = 0
    while True:
        res = conn.brpop("test_queue", timeout=5)
        if res:
            print(f"消费者2,队列名称:{res[0]}, 消息:{res[1]}")
            count += 1
        print(count)
        time.sleep(0.3)

if __name__ == '__main__':
    foo()
python
"""
消费者3每消费1个消息,就sleep0.5秒,最终的count结果是:69
"""
import time
import redis

conn = redis.Redis(
    connection_pool=redis.ConnectionPool(
        host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
    )
)

def foo():
    count = 0
    while True:
        res = conn.brpop("test_queue", timeout=5)
        if res:
            print(f"消费者3,队列名称:{res[0]}, 消息:{res[1]}")
            count += 1
        print(count)
        time.sleep(0.5)

if __name__ == '__main__':
    foo()

效果肯定是消费能力越强,拿到的消息就越多。

将队列中的消息写入到本地文件

python
import redis

conn = redis.Redis(
    connection_pool=redis.ConnectionPool(
        host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
    )
)

q = "test_queue"

# 将队列中的元素取出来写入到本地的csv文件中
def foo1():
    # 获取队列中的所有元素,但元素都还在队列中
    # 队列中元素数量少的话,这种方式最方便
    li = conn.lrange(q, 0, -1)
    with open("queue_data.csv", "w", encoding='utf8') as f:
        for item in li:
            f.write(item + "\n")
def foo2():
    # 这种方式无论队列中有多少元素,都会从队列中一个一个的取出来中有多少元素,都会从队列中一个一个的取出来,注意取出来之后,队列中就没这个元素了
    # 适合队列中元素数量很多的情况
    with open("queue_data.csv", "w", encoding='utf8') as f:
        count = 0
        while True:
            res = conn.brpop("test_queue", timeout=5)
            if res:
                f.write(res[1] + "\n")
            count += 1
            if count >= 10:  # 如果每5秒一共经历十轮都没有再从队列中拿到数据了,我们认为队列中的元素都已经取出来了,就结束掉就行了
                break


if __name__ == '__main__':
    # foo1()
    foo2()