消息队列
通过Redis的列表可以实现消息队列。
在消息队列中,常用的方法:
python
消息分发
默认的消息分发模式是循环分发,这是理想的情况,也就是多个消费者同时监听同一个消息队列,而且每个消费者的能力都是一样的,那么,队列中的消息则会循环的分发给每一个消费者。
但当某个消费者能力不同时,往往能力强的消费者会拿到更多的消息,我将这种机制称之为公平分发,也就是性能强的多干活,性能差的少干活。
循环分发
python
"""
生产者正常的往队列中放消息
"""
import redis
conn = redis.Redis(
connection_pool=redis.ConnectionPool(
host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
)
)
def foo():
for i in range(100):
conn.lpush('test_queue', f"message{i}")
if __name__ == '__main__':
foo()
python
import redis
conn = redis.Redis(
connection_pool=redis.ConnectionPool(
host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
)
)
def foo():
while True:
res = conn.brpop("test_queue", timeout=5)
if res:
print(f"消费者1,队列名称:{res[0]}, 消息:{res[1]}")
if __name__ == '__main__':
foo()
"""
消费者1,队列名称:test_queue, 消息:message2
消费者1,队列名称:test_queue, 消息:message5
消费者1,队列名称:test_queue, 消息:message8
"""
python
import redis
conn = redis.Redis(
connection_pool=redis.ConnectionPool(
host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
)
)
def foo():
while True:
res = conn.brpop("test_queue", timeout=5)
if res:
print(f"消费者2,队列名称:{res[0]}, 消息:{res[1]}")
if __name__ == '__main__':
foo()
"""
消费者2,队列名称:test_queue, 消息:message0
消费者2,队列名称:test_queue, 消息:message3
消费者2,队列名称:test_queue, 消息:message6
消费者2,队列名称:test_queue, 消息:message9
"""
python
import redis
conn = redis.Redis(
connection_pool=redis.ConnectionPool(
host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
)
)
def foo():
while True:
res = conn.brpop("test_queue", timeout=5)
if res:
print(f"消费者3,队列名称:{res[0]}, 消息:{res[1]}")
if __name__ == '__main__':
foo()
"""
消费者3,队列名称:test_queue, 消息:message1
消费者3,队列名称:test_queue, 消息:message4
消费者3,队列名称:test_queue, 消息:message7
"""
公平分发
公平分发这里,下面示例中,通过sleep来模拟各自的能力不同,然后可以观察各自的拿到的消息数量。
python
"""
生产者每0.01秒往队列中放一个消息,一共500个消息
"""
import time
import redis
conn = redis.Redis(
connection_pool=redis.ConnectionPool(
host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
)
)
def foo():
for i in range(500):
conn.lpush('test_queue', f"message{i}")
time.sleep(0.01)
if __name__ == '__main__':
foo()
python
"""
消费者1每消费1个消息,就sleep0.1秒,最终的count结果是:319
"""
import time
import redis
conn = redis.Redis(
connection_pool=redis.ConnectionPool(
host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
)
)
def foo():
count = 0
while True:
res = conn.brpop("test_queue", timeout=5)
if res:
print(f"消费者1,队列名称:{res[0]}, 消息:{res[1]}")
count += 1
print(count)
time.sleep(0.1)
if __name__ == '__main__':
foo()
python
"""
消费者2每消费1个消息,就sleep0.3秒,最终的count结果是:112
"""
import time
import redis
conn = redis.Redis(
connection_pool=redis.ConnectionPool(
host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
)
)
def foo():
count = 0
while True:
res = conn.brpop("test_queue", timeout=5)
if res:
print(f"消费者2,队列名称:{res[0]}, 消息:{res[1]}")
count += 1
print(count)
time.sleep(0.3)
if __name__ == '__main__':
foo()
python
"""
消费者3每消费1个消息,就sleep0.5秒,最终的count结果是:69
"""
import time
import redis
conn = redis.Redis(
connection_pool=redis.ConnectionPool(
host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
)
)
def foo():
count = 0
while True:
res = conn.brpop("test_queue", timeout=5)
if res:
print(f"消费者3,队列名称:{res[0]}, 消息:{res[1]}")
count += 1
print(count)
time.sleep(0.5)
if __name__ == '__main__':
foo()
效果肯定是消费能力越强,拿到的消息就越多。
将队列中的消息写入到本地文件
python
import redis
conn = redis.Redis(
connection_pool=redis.ConnectionPool(
host="127.0.0.1", port=6379, password="1234", db=0, max_connections=100, decode_responses=True
)
)
q = "test_queue"
# 将队列中的元素取出来写入到本地的csv文件中
def foo1():
# 获取队列中的所有元素,但元素都还在队列中
# 队列中元素数量少的话,这种方式最方便
li = conn.lrange(q, 0, -1)
with open("queue_data.csv", "w", encoding='utf8') as f:
for item in li:
f.write(item + "\n")
def foo2():
# 这种方式无论队列中有多少元素,都会从队列中一个一个的取出来中有多少元素,都会从队列中一个一个的取出来,注意取出来之后,队列中就没这个元素了
# 适合队列中元素数量很多的情况
with open("queue_data.csv", "w", encoding='utf8') as f:
count = 0
while True:
res = conn.brpop("test_queue", timeout=5)
if res:
f.write(res[1] + "\n")
count += 1
if count >= 10: # 如果每5秒一共经历十轮都没有再从队列中拿到数据了,我们认为队列中的元素都已经取出来了,就结束掉就行了
break
if __name__ == '__main__':
# foo1()
foo2()