before
本篇来介绍下Python在多线程、线程池、锁等相关的内容。
注意:
- 我的环境是Python3.12的解释器。
- 没有特别强调的话,示例代码默认在Windows11系统中进行测试的。
- 本篇只是列出来在Python3.12版本的解释器中,跟线程相关的常见的知识点和常见的应用,而不是包含全部的内容。出现歧义的话,你应该以根据你的解释器版本去查对应的官档:
多线程是指在一个进程内创建多个线程,从而实现多个任务在同一时间段内并发执行的能力。线程是一个轻量级的执行单元,可以同时运行多个线程,并且共享相同的内存空间。与单线程相比,多线程可以提高程序的执行效率,特别是在处理/O密集型任务时,如网络请求、文件读写等。
一个进程中的线程角色,通常有三种:
- 一个主线程,或者称之为父线程、main threading,它可以独立完成程序,也就是说一个进程中只有一个主线程在干活。注意主线程一定是费守护线程。
- 一个或多个子线程,多个子线程之间是平级关系,共享进程的资源,协助主线程来干活,也是干活的主力。默认的子线程也都是非守护线程。
- 一个或多个子守护线程,对普通的子线程进行
t.daemon=True
设置之后,那这个线程就是守护线程了,守护线程通常是默默无闻的配角,主要做一些后台任务、定时更新配置、垃圾回收之类的,通常也是为整个程序或者其他线程提供一些便利和服务。
快速上手
想要使用多线程,通常会选择threading
模块来做。
当你运行如下脚本时,Python解释器会向操作系统申请资源创建一个进程来运行脚本中的代码,而这个进程会默认创建一个主线程从上到下运行代码,当执行到创建子线程的代码时,会创建一个子线程对象,所有需要子线程干的事儿,都需要封装到这个对象中,比如让这个子线程调用哪个函数去干什么事儿,传递什么参数等等,都要处理好,然后通过start将这个子线程提交到操作系统中,由CPU根据自己的调度来执行这个线程。
而此时的主线程则继续往下执行代码逻辑,如果没有别的代码要执行了,就会等待子线程干完活,然后一起结束程序.......后续操作系统释放相关资源之类的则不再展开,因为跟这个程序就无关了。
import threading
def task(arg1, arg2): # 允许传递多个参数
pass
# 创建一个Thread对象(子线程),并封装线程被CPU调度时应该执行的任务和相关参数。
# 注意args的值必须是元组的形式,即如果有1个值的话,你应该args=("x1", ) 而不是 args=("x1")
t = threading.Thread(target=task, args=('x1', 'x2'))
# 子线程准备就绪(等待CPU调度),主线程继续往下执行代码。
t.start()
print("继续执行...") # 主线程执行完所有代码,不结束,而是等待子线程执行完,一起结束
你可以创建多个子线程来进一步提高效率。
import threading
def task(arg1, arg2): # 允许传递多个参数
pass
for i in range(10):
t = threading.Thread(target=task, args=('x1', 'x2'))
t.start()
print("继续执行...") # 主线程执行完所有代码,不结束,而是等待子线程执行完,一起结束
使用多线程中的常见姿势
t.start()
import threading
def task(arg1, arg2):
pass
t = threading.Thread(target=task, args=('x1', 'x2'))
t.start()
print("继续执行...")
当封装好了一个子线程对象之后,调用start方法表示,当前线程准备就绪了,然后被提交到操作系统,由CPU的调度系统进行安排,择机执行该线程。
也就是说,这个过程由start发起一个任务,但CPU并不是立即执行的,而是CPU要执行的任务有成千上万,它会排队执行这些任务,我们发起的任务也需要排队,等前面的任务被CPU执行完,轮到这个咱们这任务才能被执行,而且一次还可能执行不完,因为CPU执行任务有自己的执行逻辑。比如说你这个任务需要耗时5分钟,那么你不可能独占CPU5分钟的时间,所以,它的执行逻辑简单来说就是执行a任务一会儿,无论是否执行完,都会切换到b任务,然后切换到c任务,然后在切换到a任务,这么切换着来执行的。只不过CPU所谓的一会儿,那时间单位则是非常非常短了,例如主频是3.2GHz的CPU,那么每秒钟则会执行32亿个时钟周期,而在每个时钟周期内,可以完成一个或者多个指令.....
参考:https://www.intel.cn/content/www/cn/zh/gaming/resources/cpu-clock-speed.html
额,越说越远了,这里不再展开说了,有兴趣可以学习下计算机组成原理、操作系统相关的知识,就能知道CPU的工作原理了。
t.join()
t.join()
等待当前线程的任务执行完毕后再向下继续执行。
也就是说,我子线程执行不完,你主线程也不能往下走,你要等我完事才能往下走。
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
t = threading.Thread(target=_add)
t.start()
# join必须在start之后,不然子线程都没启动呢,你先join上了肯定不合理
t.join() # 主线程等待中...
print(number)
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
def _sub():
global number
for i in range(10000000):
number -= 1
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.start()
t2.join() # t2线程执行完毕,才继续往后走
print(number)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走
print(number)
"""
如果有很多线程需要启动和join,你可以这样
"""
import threading
def _add():
pass
start_list = []
join_list = []
for i in range(100): # 先把线程对象准备好
t = threading.Thread(target=_add)
start_list.append(t)
join_list.append(t)
for t in start_list: # 统一启动
t.start()
for t in join_list: # 统一join
t.join()
print("over")
t.name(getName,setName)
官档:https://docs.python.org/zh-cn/3/library/threading.html#threading.Thread.setName
我们可以为子线程起一个名字,甚至可以为多个线程赋予相同的名字,所以它只是用来我们在程序中识别它,即对我们有用,但对于Python解释器来说没有特别的意义。
import threading
import time
def task(arg):
# 获取当前线程对象的名称,3.10开始弃用
print(threading.current_thread().getName())
# 获取当前线程对象的名称,3.10之后,推荐使用,当然3.9版本这么写也没问题
print(threading.current_thread().name)
t1 = threading.Thread(target=task, args=(11,))
t1.setName("线程1") # 设置线程名称,3.10开始弃用
t1.start()
t2 = threading.Thread(target=task, args=(22,))
t2.name = "线程2" # 3.10之后,设置线程名称推荐使用name属性,当然3.9版本这么写也没问题
t2.start()
t3 = threading.Thread(target=task, args=(22,), name='线程3')
# t3.name = "线程2" # 实例化的时候传值了,这里就不要再设置name属性了,否则会覆盖掉
t3.start()
t.daemon(setDaemon)
官档:https://docs.python.org/zh-cn/3/library/threading.html#threading.Thread.setDaemon
默认的,主线程要等子线程结束才能结束,这就会产生一个问题,如果某个子线程无法结束的话,那么主线程也就无法结束。
对于需要某个子线程在当主线程结束时,非常识相的自动结束,那对于一些特殊的场景下,就非常完美了,对于这类识相的线程,我们统称为守护线程。
守护线程作用是为其他线程提供便利服务,守护线程最典型的应用就是 GC (垃圾收集器)、后台任务等等。
那么,守护线程它的工作原理是这样的:
- 默认创建的子线程是非守护线程,只要当前主线程中尚存任何一个非守护线程没有结束,守护线程就一直工作;
- 如果将子线程设置为守护线程之后,那么只有当最后一个非守护线程结束时,守护线程随着主线程一同结束。
守护线程设置方式:
import threading
import time
def task(arg):
print(threading.current_thread().name)
t1 = threading.Thread(target=task, args=(11,))
t2 = threading.Thread(target=task, args=(22,))
# 设置方式1
t1.setDaemon(True) # 设置为守护线程,主线程执行完毕后,子线程也自动关闭,自 3.10 版本弃用
t2.daemon = False # 默认的就是非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。3.10版本推荐这么写,当然3.9版本这么写也没问题
t1.start()
t2.start()
# 设置方式2
t3 = threading.Thread(target=task, args=(33,), daemon=True)
t3.start()
来一个示例,有三个普通子线程负责从货架上中购买商品,直到货架为空,当前子线程就结束了。而守护线程则是在后台默默地监测货架,只要监测到某个商品售卖完毕,则将该商品下架。当所有的非守护线程都执行完毕,主线程结束时,守护线程也随之结束。
Details
import threading
import time
import random
# 货架
goods_dict = {"milk": 1, "bread": 2, "butter": 3, "eggs": 4}
def shop_cleaner():
""" 守护线程在工作 """
while True: # 无限循环监测货架中的商品,符合条件的就下架
if goods_dict:
for k, v in list(goods_dict.items()):
if v == 0:
goods_dict.pop(k)
print(f"{threading.current_thread().name}检测到 {k} 售卖完了,已将 {k} 下架!")
time.sleep(0.1)
def task():
""" 非守护线程在工作 """
while True: # 当货架上没有商品了,就结束了
if not goods_dict:
break
key = random.choice(list(goods_dict.keys())) # 随机选择一个商品进行消费
if goods_dict.get(key, 0) > 0:
goods_dict[key] -= 1
print(f"{threading.current_thread().name}买了一个 {key} ,剩余 {goods_dict[key]} 个!")
time.sleep(0.5)
if __name__ == '__main__':
daemon_t = threading.Thread(target=shop_cleaner, daemon=True, name='守护线程')
daemon_t.start()
join_list = []
for i in range(3):
t = threading.Thread(target=task, daemon=False, name=f"{i}号线程")
t.start()
join_list.append(t)
for t in join_list:
t.join()
print("所有商品都已售完,程序结束")
"""
0号线程买了一个 bread ,剩余 1 个!
1号线程买了一个 eggs ,剩余 3 个!
2号线程买了一个 butter ,剩余 2 个!
2号线程买了一个 eggs ,剩余 2 个!
1号线程买了一个 eggs ,剩余 1 个!
0号线程买了一个 bread ,剩余 0 个!
守护线程检测到 bread 售卖完了,已将 bread 下架!
0号线程买了一个 milk ,剩余 0 个!
1号线程买了一个 butter ,剩余 1 个!
2号线程买了一个 eggs ,剩余 0 个!
守护线程检测到 milk 售卖完了,已将 milk 下架!
守护线程检测到 eggs 售卖完了,已将 eggs 下架!
1号线程买了一个 butter ,剩余 0 个!
守护线程检测到 butter 售卖完了,已将 butter 下架!
所有商品都已售完,程序结束
"""
自定义线程类
直接将线程需要做的事写到run方法中。
import threading
# 类名叫啥无所谓,但必须继承threading.Thread
class MyThread(threading.Thread):
def run(self): # 重写父类的run方法
print('执行此线程', self._args)
t = MyThread(args=(100,))
t.start()
import requests
import threading
class DouYinThread(threading.Thread):
def run(self):
file_name, video_url = self._args
res = requests.get(video_url, stream=True)
if res.headers['content-type'] == 'video/mp4':
with open(file_name, mode='wb') as f:
for chunk in res.iter_content(chunk_size=512):
f.write(chunk)
print('下载完成:', file_name)
else:
print('视频链接貌似失效了,下载失败')
url_list = [
("过年.mp4", "https://video.pearvideo.com/mp4/adshort/20210105/cont-1715046-15562045_adpkg-ad_hd.mp4"),
("涪江.mp4", "https://video.pearvideo.com/mp4/adshort/20210105/cont-1715020-15561817_adpkg-ad_hd.mp4"),
("九峰山.mp4", "https://video.pearvideo.com/mp4/adshort/20210105/cont-1715031-15561980_adpkg-ad_hd.mp4")
]
for item in url_list:
t = DouYinThread(args=(item[0], item[1]))
t.start()
"""
有的地址失效了的话,可以从下面的链接中找到
河北大学取消考试学生紧急离校,老师:回不了家的到老师家过年,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715046-15562045_adpkg-ad_hd.mp4
重庆两口子因琐事吵架,男子怒将自家车推进涪江,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715020-15561817_adpkg-ad_hd.mp4
成都九峰山因雪景引游客暴增,致垃圾遍地野猴觅食,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715031-15561980_adpkg-ad_hd.mp4
女子子宫摘除32年后CT报告称未见异常,医生:贴的模版忘删了,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715014-15561686_adpkg-ad_hd.mp4
监控画面曝光!甘肃天水一公交车与救护车相撞后坠桥,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715025-15561875_adpkg-ad_hd.mp4
男子称退伍后发现被贷款100万:征信逾期数十次,非自己签名,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715010-15561845_adpkg-ad_hd.mp4
东北老交警零下43度执勤落下老寒腿:穿2斤重棉裤,已习以为常,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715007-15561958_adpkg-ad_hd.mp4
女教师公寓熟睡被同事弟弟连砍数刀:全身刀疤,不敢告诉父母,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715011-15561664_adpkg-ad_hd.mp4
网曝江西一村庄现两千平违建,房主回应:建给村里当文化中心,https://video.pearvideo.com/mp4/adshort/20210105/cont-1714970-15561752_adpkg-ad_hd.mp4
河南一新建足球场内惊现坟墓,官方:会尽快迁坟,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715006-15561679_adpkg-ad_hd.mp4
老师收到毕业24年学生送的定制台历:他高考失利,我开导过,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715009-15561658_adpkg-ad_hd.mp4
尚德机构回应未兑现宝马奖励:名单仍在确认中,会负责到底,https://video.pearvideo.com/mp4/adshort/20210105/cont-1715000-15561545_adpkg-ad_hd.mp4
沈阳重点管控区日常产90吨生活垃圾,重点疫点垃圾由专人运走,https://video.pearvideo.com/mp4/adshort/20210105/cont-1714993-15561434_adpkg-ad_hd.mp4
消费者称遭移动外呼10088套路换套餐,客服致歉:口径有问题,https://video.pearvideo.com/mp4/adshort/20210105/cont-1714995-1427-174135_adpkg-ad_hd.mp4
"""
线程安全和锁
线程安全
一个进程中可以有多个线程,且线程共享所有进程中的资源。
多个线程同时去操作一个"东西",可能会存在数据混乱的情况,例如:
import threading
num = 0
def task():
global num
for i in range(1000000):
num += 1
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
"""
1385257
2000000
"""
按道理来说,第一个线程执行完,打印num值应该是一百万,但很明显打印的结果是1385257
,这是因为两个线程同时操作一个对象,第一个线程执行完时,第二个线程也执行了一部分了,所以打印的结果才不是整数一百万。
在有些场景下,是不允许有这种情况出现的,也就是不允许某个线程在操作某个对象时,有别的线程也在同时对该对象做操作。那么怎么做呢?很简单,就是加锁:
import threading
lock = threading.Lock()
num = 0
def task():
# 加锁,也叫做申请锁,如果当前线程申请到了锁,那么别的线程就只能等待
# 只有当前线程释放锁后,别的线程才有机会申请到锁
# 那在加锁到释放锁期间,当前线程就能够保证对共享资源的独占访问
lock.acquire()
global num
for i in range(1000000):
num += 1
print(num)
lock.release() # 释放锁
for i in range(2):
t = threading.Thread(target=task)
t.start()
"""
1000000
2000000
"""
Lock和RLock
在程序中,需要我们自己加锁一般有两种锁,
- 同步锁Lock,也叫做互斥锁、线程的原始锁对象。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。https://docs.python.org/zh-cn/3/library/threading.html#threading.Lock
- 递归锁RLock,也叫做重入锁,或者可重入锁,重入锁是一个可以被同一个线程多次获取的同步基元组件。在内部,它在基元锁的锁定/非锁定状态上附加了 "所属线程" 和 "递归等级" 的概念。在锁定状态下,某些线程拥有锁 ; 在非锁定状态下, 没有线程拥有它。https://docs.python.org/zh-cn/3/library/threading.html#rlock-objects
import threading
# 全局实例化一把锁,多个线程来申请这把锁
# 千万不能一个线程搞一把锁,或者创建多把锁,否则会乱套
lock = threading.Lock()
num = 0
def task():
# --------- 加锁方式1 ---------
# 加锁,也叫做申请锁,如果当前线程申请到了锁,那么别的线程就只能等待
# 也不能重复加锁
# 只有当前线程释放锁后,别的线程才有机会申请到锁
# 那在加锁到释放锁期间,当前线程就能够保证对共享资源的独占访问
lock.acquire()
global num
for i in range(1000000):
num += 1
print(num)
lock.release() # 释放锁
# --------- 加锁方式2 ---------
# 基于上下文自动管理锁,进入with语句加上锁,出with语句,释放锁
# with lock:
# global num
# for i in range(1000000):
# num += 1
# print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
"""
1000000
2000000
"""
import threading
# 递归锁跟同步锁基本用法一致,都是先要实例化一把锁
lock = threading.RLock()
num = 0
def task():
# --------- 加锁方式1 ---------
lock.acquire()
global num
for i in range(1000000):
num += 1
print(num)
lock.release() # 释放锁
# --------- 加锁方式2 ---------
# 基于上下文自动管理锁,进入with语句加上锁,出with语句,释放锁
# with lock:
# global num
# for i in range(1000000):
# num += 1
# print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
"""
1000000
2000000
"""
"""
同步锁和递归锁有啥区别呢,通过下面示例来看下
1. RLock支持多次申请锁和多次释放;Lock不支持
"""
import threading
lock = threading.RLock()
num = 0
def task():
lock.acquire() # 加锁
lock.acquire() # 加锁
global num
for i in range(1000000):
num += 1
print(num)
lock.release() # 释放锁
lock.release() # 释放锁
for i in range(2):
t = threading.Thread(target=task)
t.start()
"""
1000000
2000000
"""
import threading
lock = threading.RLock()
# 程序员A开发了一个函数,函数可以被其他开发者调用,内部需要基于锁保证数据安全。
def func():
with lock:
pass
# 程序员B开发了一个函数,可以直接调用这个函数。
def run():
print("其他功能")
func() # 调用程序员A写的func函数,内部用到了锁。
print("其他功能")
# 程序员C开发了一个函数,自己需要加锁,同时也需要调用func函数。
def process():
with lock:
print("其他功能")
func() # ----------------> 此时就会出现多次锁的情况,只有RLock支持(Lock不支持)。
print("其他功能")
RLock和Lock的区别就是:
- RLock可以连续多次acquire,简单来说,在内部每次acquire都会将其递归级别加一,而执行release则递归级别减一,只要递归级别不为0,就会阻塞值该锁解锁。
- Lock则只能加锁acquire一次,想要再加锁acquire,那么就必须先执行一次release,这就是为啥无法多次加锁的原因。
注意,加了锁就一定要注意释放锁,不然就容易造成死锁。
死锁现象
死锁是一个资源被多次调用,而多次调用方都未能释放该资源而造成的一种阻塞的现象,程序表现就是卡住了......
下面来看两段死锁的示例:
import threading
num = 0
lock_object = threading.Lock()
def task():
print("开始", threading.current_thread().name)
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
print("上锁", threading.current_thread().name)
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
print("干活", threading.current_thread().name)
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
"""
开始 Thread-1 (task) # 第一个线程准备加锁了
上锁 Thread-1 (task) # 第一个线程加上锁了,但不往下走了,因为Lock锁不能重复加锁,该线程就卡住了,也走不到释放锁的代码,导致锁无法释放
开始 Thread-2 (task) # 第二线程准备加锁了,但发现锁已经被别人申请走了,那这个线程就在这里等锁释放
"""
"""
最终的结果就是:只有第一个线程能执行到下面的代码,而第二个线程只能等待第一个线程释放锁之后才能执行。但第一个线程走不到释放锁的代码了,也就是自己卡住自己了...
最终都组赛了,也就是都卡住了
"""
import threading
import time
lock_1 = threading.Lock()
lock_2 = threading.Lock()
def task1():
print("准备上锁 %s" % threading.current_thread().name)
lock_1.acquire()
time.sleep(1)
print("上锁lock_1 %s" % threading.current_thread().name)
lock_2.acquire()
print("上锁lock_2 %s" % threading.current_thread().name)
lock_2.release()
print("释放锁lock_2 %s" % threading.current_thread().name)
lock_1.release()
print("释放锁lock_1 %s" % threading.current_thread().name)
def task2():
print("准备上锁 %s" % threading.current_thread().name)
lock_2.acquire()
time.sleep(1)
print("上锁lock_2 %s" % threading.current_thread().name)
lock_1.acquire()
print("上锁lock_1 %s" % threading.current_thread().name)
lock_1.release()
print("释放锁lock_1 %s" % threading.current_thread().name)
lock_2.release()
print("释放锁lock_2 %s" % threading.current_thread().name)
t1 = threading.Thread(target=task1)
t1.start()
t2 = threading.Thread(target=task2)
t2.start()
"""
准备上锁 Thread-1 (task1) # 线程1准备上锁
准备上锁 Thread-2 (task2) # 线程2准备上锁
上锁lock_1 Thread-1 (task1) # 线程1加上了锁lock_1,然后又要去加锁lock_2,但是此时锁lock_2已经被线程2加上了,所以线程1阻塞
上锁lock_2 Thread-2 (task2) # 线程2加上了锁lock_2,然后又要去加锁lock_1,但是此时锁lock_1已经被线程1加上了,所以线程2阻塞
"""
"""
最终的结果就是我阻塞了,因为我需要锁lock_2,但是此时锁lock_2已经被线程2加上了,所以线程2阻塞了,
然后线程2阻塞了,线程1也阻塞了,所以线程1和线程2都阻塞了,然后就卡在这里出不来了。
"""
在开发时,应尽量避免死锁现象。
当然,对于上面的两个死锁示例,对于线程等待线程自己的情况,通常可以用可重入锁来解决,也就是将Lock改为RLock就能解决问题。但RLock不能解决所有的问题,比如上例第二个死锁就不能通过RLock解决,你还是想想优化代码吧。
import threading
num = 0
# lock_object = threading.Lock()
lock_object = threading.RLock()
def task():
print("开始", threading.current_thread().name)
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
print("上锁", threading.current_thread().name)
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
print("干活", threading.current_thread().name)
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
"""
开始 Thread-1 (task)
上锁 Thread-1 (task)
干活 Thread-1 (task)
开始 Thread-2 (task)
1000000上锁
Thread-2 (task)
干活 Thread-2 (task)
2000000
"""
无论是哪种现象产生的死锁,我们第一反应不是应该如何解决死锁,而是应该想着如何避免死锁产生,不会产生死锁,也就不存在如何解决死锁。尤其是平常开发中,我们一般用锁的话,也就是非常简单的使用,别整那些骚操作,宁愿代码多几行、low一点都为所谓,总之,苦逼程序员从来不会给自己添麻烦......
条件对象Condition
参考:http://www.coolpython.net/python_senior/concurrent/multithreading_semaphore.html
条件对象Condition,也叫做条件变量,它它也可以实现多线程对共享资源的互斥访问,相比于Lock,RLock,它更加灵活,其灵活之处在于它提供了wait操作和notify操作。
常用的几个方法,参考官网:https://docs.python.org/zh-cn/3/library/threading.html#threading.Condition
acquire
,请求底层锁。此方法调用底层锁的相应方法,返回值是底层锁相应方法的返回值。release()
,释放底层锁。此方法调用底层锁的相应方法。没有返回值。wait()
,等待直到被通知或发生超时。如果线程在调用此方法时没有获得锁,将会引发RuntimeError
异常。这个方法释放底层锁,然后阻塞,直到在另外一个线程中调用同一个条件变量的
notify()
或notify_all()
唤醒它,或者直到可选的超时发生。一旦被唤醒或者超时,它重新获得锁并返回。当提供了 timeout 参数且不是
None
时,它应该是一个浮点数,代表操作的超时时间,以秒为单位(可以为小数)。当底层锁是个
RLock
,不会使用它的release()
方法释放锁,因为当它被递归多次获取时,实际上可能无法解锁。相反,使用了RLock
类的内部接口,即使多次递归获取它也能解锁它。 然后,在重新获取锁时,使用另一个内部接口来恢复递归级别。返回
True
,除非提供的 timeout 过期,这种情况下返回False
。notify()
,默认唤醒一个等待这个条件的线程。如果调用线程在没有获得锁的情况下调用这个方法,会引发RuntimeError
异常。这个方法唤醒最多 n 个正在等待这个条件变量的线程;如果没有线程在等待,这是一个空操作。
当前实现中,如果至少有 n 个线程正在等待,准确唤醒 n 个线程。但是依赖这个行为并不安全。未来,优化的实现有时会唤醒超过 n 个线程。
注意:被唤醒的线程并没有真正恢复到它调用的
wait()
,直到它可以重新获得锁。 因为notify()
不释放锁,其调用者才应该这样做。notify_all()
,唤醒所有正在等待这个条件的线程。这个方法行为与notify()
相似,但并不只唤醒单一线程,而是唤醒所有等待线程。如果调用线程在调用这个方法时没有获得锁,会引发RuntimeError
异常。notifyAll
方法是此方法的已弃用别名。
假设这样一个场景,你有2个生产者线程,10个消费者线程,当商品数量大于10个的时候,生产者不再生产,商品数量为0时,不再消费,如果是用RLock来做线程间互斥,当商品数量为0时,按道理说消费者线程是不进行任何消费活动的,但是,这些消费者线程仍然在拼命的尝试来获得锁,得到锁以后,发现商品数量为0,于是再释放掉锁,紧接着去争抢锁。
如果是用Condition来做呢,消费者发现商品数量为0时,可以进行wait操作,此时,线程进入等待状态,再没有被唤醒之前,它是不会去争抢锁的,极端的情况是商品为0时,10个消费者都进入到wait状态,而这时,生产者获得锁,生产了商品,然后进行notify操作,去唤醒一个线程(甚至是所有的线程),当然这次唤醒的可能是另一个生产者,不过没关系,总会有一次唤醒的是消费者线程。
condition和Lock,RLock相比较,在生产者和消费者模型中,避免了不必要的对锁的争抢,更加高效的调用线程资源。
import time
import threading
# 实例化一个条件变量的对象
condition = threading.Condition(lock=threading.RLock()) # lock可传可不传,不传默认就是RLock
buffer = [] # 搞个库存
def produce():
""" 生产者 """
# 条件变量申请锁,底层申请的是我们上面传进去的RLock锁
condition.acquire()
item = time.strftime("%H:%M:%S", time.localtime())
buffer.append(item)
print(f"生产者生产了一个数据: {item}")
condition.notify() # 通知另一个等待的线程
# 然后释放锁,进入等待状态,直到在另外一个线程中调用同一个条件变量的 notify() 或 notify_all() 唤醒它
condition.wait()
# 走到这里说明另一个线程消费了数据,然后告知自己可以继续生产了
item = time.strftime("%H:%M:%S", time.localtime())
print(f"生产者又生产了一个数据: {item}")
buffer.append(item)
# 通知另一个等待的线程可以继续消费
condition.notify()
# 当需要结束该线程时,就不用wait了,直接释放锁,该线程结束
condition.release()
def consume():
""" 消费者 """
# 消费者线程调用同一个条件变量去申请同一把锁
# 直到生产者执行notify释放锁并且告知咱们可以消费了
condition.acquire()
print(f"消费者开始消费, {buffer.pop()}")
# 消费完毕后告知生产者可以继续生产,生产者线程从wait那里继续往下执行
condition.notify()
# 消费者这里继续等待生产者的通知,直到得到通知之后,才往下走
condition.wait()
print(f"消费者再次消费, {buffer.pop()}")
# 消费完毕后再次通知生产者可以继续生产,但根据代码逻辑,生产者不在生产了,而是结束线程了
# 咱们这里也直接释放锁,结束线程就完了
condition.release()
if __name__ == '__main__':
producer = threading.Thread(target=produce)
consumer = threading.Thread(target=consume)
producer.start()
consumer.start()
"""
生产者生产了一个数据: 11:49:50
消费者开始消费, 11:49:50
生产者又生产了一个数据: 11:49:50
消费者再次消费, 11:49:50
"""
"""
这个示例模拟的场景,就是起两个线程
生产者线程(守护线程),生产一个商品,就告诉消费者可以消费了,但生产者生产的快;当库存有3个商品时,生产暂停
告诉消费者去消费,直到把库存消耗完,消费者再告诉生产者继续生产。
当没有消费者时,程序结束,生产者线程也随之结束。
消费者线程(非守护线程),该线程启动就去等生产者生产商品,有了商品就消费,消费完一个继续消费,如果此时库存空了,那么
消费者就告知生产者让其继续生产,直到得到生产者告知可以消费了,整个消费活动继续。
消费者有一个总钱数,比如3块钱,花完了,就不在消费,线程结束
当你研究明白这个示例之后,你可以适当增加生产者和消费者的数量(比如2个生产者,3个消费者)和sleep的时间,这样能
进一步理解条件变量在其中起到的作用。
"""
import threading
import multiprocessing
import random
import time
condition_obj = threading.Condition(lock=threading.RLock())
buffer = []
def produce():
""" 生产者 """
while True: # 生产无限个商品
condition_obj.acquire() # 生产商品时申请锁
if len(buffer) == 3: # 如果生产了3个商品还没有人消费,说明商品积压了,就暂停生产,通知所有的消费者来消费
print(f'生产者{threading.current_thread().name}检测到库存已满,等待消费者消费商品...')
condition_obj.notify_all() # 通知所有的消费者来消费
condition_obj.wait() # 直到商品消费完了,有消费者通知生产者继续生产,这里才继续
else: # 只要当库存不满时才继续生产
item = time.strftime("%H:%M:%S", time.localtime())
buffer.append(item)
print(f'生产者{threading.current_thread().name}生产了一个产品{item},此时总商品数{len(buffer)}')
# condition_obj.notify_all()
condition_obj.notify()
condition_obj.release()
time.sleep(0.1)
def consumer(money, amount):
""" 消费者 """
while money: # 只要有钱就能消费
condition_obj.acquire() # 拿到条件变量的锁
while not buffer: # 如果库存为空,也就是没有可消费的商品了,就通知生产者生产
print(f'消费者{threading.current_thread().name}检测到库存为空,等待生产者生产商品...')
# wait解锁并等待生产者告知我有商品可消费了,本线程才继续执行
condition_obj.notify_all()
condition_obj.wait()
# 走这里表示有商品可以直接消费
money -= 1
print(f'消费者{threading.current_thread().name}有{amount}块钱,消费了1个商品: {buffer.pop(0)},还剩{money}块钱')
# 本线程消费完一个商品,线程释放锁,睡0.1秒之后,再次尝试消费
condition_obj.release()
time.sleep(0.5)
print(f'消费者{threading.current_thread().name}的{amount}块钱花完了,回家!')
if __name__ == '__main__':
# 创建1个守护线程作为生产者,只要还有消费者再消费,生产者就不会停止,当没有消费者再消费时,生产者会停止
for i in range(1):
t = threading.Thread(target=produce, name=str(i), daemon=True)
t.start()
# 创建1个消费者,每个消费者都有一个初始的金额,花完了就结束
for i in range(1):
money = 3
t = threading.Thread(target=consumer,args=(money, money), name=str(i))
t.start()
"""
生产者0生产了一个产品16:01:32,此时总商品数1 # 第一个商品总是能立即消费掉,因为是程序启动消费者就就绪了
消费者0有3块钱,消费了1个商品: 16:01:32,还剩2块钱 # 当生产者生产一个商品后,就立即通知消费者来消费
生产者0生产了一个产品16:01:32,此时总商品数1 # 因为消费者消费的慢,0.5秒才消费一个,而生产者
生产者0生产了一个产品16:01:32,此时总商品数2 # 每0.1秒就能生产一个商品,所以当消费者还在sleep的时候,生产者是不会
生产者0生产了一个产品16:01:32,此时总商品数3 # 停止的,直到库存满了,生产停止
生产者0检测到库存已满,等待消费者消费商品...
消费者0有3块钱,消费了1个商品: 16:01:32,还剩1块钱 # 生产者那边库存慢了,生产停了,消费者这边消费的慢也要慢慢消费
消费者0有3块钱,消费了1个商品: 16:01:32,还剩0块钱 # 直到将库存消耗一空,才继续通知生产者继续生产
消费者0的3块钱花完了,回家! # 直到消费者花完钱,整个程序执行结束
"""
信号量
TIP
信号量由E.Dijkstra发明并第一次应用在操作系统中,信号量是由操作系统管理的一种抽象数据类型,用于在多线程中同步对共享资源的使用。本质上说,信号量是一个内部数据,用于标明当前的共享资源可以有多少并发读取。
同样的,在threading模块中,信号量的操作有两个函数,即 acquire()
和 release()
,解释如下:
- 每当线程想要读取关联了信号量的共享资源时,必须调用
acquire()
,此操作减少信号量的内部变量, 如果此变量的值非负,那么分配该资源的权限。如果是负值,那么线程被挂起,直到有其他的线程释放资源。 - 当线程不再需要该共享资源,必须通过
release()
释放。这样,信号量的内部变量增加,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。
大白话解释就是,信号量Semaphore内部实现了一个计数器:
- 初始值,默认为1,也可以传递大于等于1的正整数,总之不能小于0,否则会报错。
- 信号量每次acquire操作都会使计数器加一,release操作会使其减一。
- 当计数器为0时,任何线程的acquire操作都不会成功,也就是都会处于阻塞状态。
这么设计的目的是什么?使用信号量的目的为了控制对于资源的访问有个上限,简单来说可以控制并发的数量。
假设这样一个场景,有个多线程爬虫,起10个线程去爬页面,但目标网站检测到访问过于频繁,进行反扒了,但把线程数量见到3个就没问题了。
对于这样的场景来说,用Lock和RLock是不行的,它只能有一个线程去访问资源。
而如果用信号量的话,我们仍然可以起10个线程,然后通过信号量来控制并发的数量为3,这样就可以了。
import threading
import multiprocessing
import random
import time
# 实例化一个信号量对象,它最多允许3个线程同时访问资源
semaphore = threading.Semaphore(3)
def task():
# 基本写法
# semaphore.acquire()
# print(f'线程{threading.current_thread().name}在访问资源')
# time.sleep(1)
# semaphore.release()
# 基于上下文管的信号量
with semaphore:
print(f'线程{threading.current_thread().name}在访问资源')
time.sleep(1)
if __name__ == '__main__':
for i in range(7):
t = threading.Thread(target=task, name=str(i))
t.start()
"""
# 根据运行结果,你会看到每次打印都是有三个线程在访问资源
线程0在访问资源
线程1在访问资源
线程2在访问资源
线程3在访问资源
线程4在访问资源
线程5在访问资源
线程6在访问资源
"""
事件
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。
为了解决这些问题,我们可以使用threading中的Event来处理,该事件对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
在初始情况下,Event对象中的信号标志被设置为假。
如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。
一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。
如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行。
参考官网:https://docs.python.org/zh-cn/3/library/threading.html#event-objects
实现事件对象的类。事件对象管理一个内部标识,调用 set()
方法可将其设置为true。调用 clear()
方法可将其设置为 false 。调用 wait()
方法将进入阻塞直到标识为true。这个标识初始时为 false 。
is_set()
当且仅当内部标识为 true 时返回
True
。isSet
方法是此方法的已弃用别名。set()
将内部标识设置为 true 。所有正在等待这个事件的线程将被唤醒。当标识为 true 时,调用
wait()
方法的线程不会被被阻塞。clear()
将内部标识设置为 false 。之后调用
wait()
方法的线程将会被阻塞,直到调用set()
方法将内部标识再次设置为 true 。wait(timeout=None)
只要内部旗标为为假值且未超出所给出的 timeout 值就保持阻塞。 返回值表示阻塞方法返回的原因;如果返回是因为内部旗标被设为真值则为
True
,如果给出了 timeout 而内部旗标在给定的等待时间内没有变成真值则为False
。当提供了 timeout 参数且不为None
时,它应当为一个指定操作的超时限制秒数的浮点值,也可以为分数。在 3.1 版本发生变更: 很明显,方法总是返回None
。
来个例子,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。
那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作。
import threading
import time
import random
from threading import Thread, Event
# 创建事件对象
event = Event()
def conn_mysql():
count = 1
# 事件的初始状态为False,这里一直尝试链接
while not event.is_set():
if count > 3:
raise TimeoutError('链接超时')
print('<%s>第%s次尝试链接' % (threading.current_thread().name, count))
event.wait(1)
count += 1
# 直到check_mysql那里将事件对象的状态值设置为True,这里才能继续执行
print('<%s>链接成功' % threading.current_thread().name)
def check_mysql():
print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().name)
time.sleep(1.5)
# 检查没问题之后,设置event的状态值为True,其它conn就可以正常链接了
event.set()
if __name__ == '__main__':
conn1 = Thread(target=conn_mysql, name='conn1')
conn2 = Thread(target=conn_mysql, name='conn2')
check = Thread(target=check_mysql, name='check')
conn1.start()
conn2.start()
check.start()
"""
<conn1>第1次尝试链接
<conn2>第1次尝试链接
[check]正在检查mysql
<conn2>第2次尝试链接
<conn1>第2次尝试链接
<conn2>链接成功
<conn1>链接成功
"""
参考:https://blog.csdn.net/u013210620/article/details/78736153
线程池
Python3中官方才正式提供线程池。
这里继续了解一些概念
开启线程本身是需要一定资源开销的,开启线程、运行线程、干完活之后销毁线程,这些都是需要资源开销的,所以线程不是无限开的。开的多了,可能会导致系统的负载过大,反而影响性能。
那线程开多少合适呢?这个没有具体的规定和建议,因为大家的硬件性能不一致。但根据经验来说,线程数量处于CPU核心数量的5倍,最多不超过10倍。
线程池
来个场景,比如有一万块砖要搬:
- 那么如果顺序执行,就是一个人一次搬一块,要搬一万次,这个方案效率有点低了。
- 使用多线程呢,也就是找一万个人,每个人只需要搬一次就好了,但这个方案有点浪费资源,因为开启一万个线程,然后再销毁,也是有很大的资源开销。
- 那线程池呢,这就很好,本质上来说,线程池就是相当于维护固定数量的线程,放到一个线程池中,比如100个,那么100个线程去搬一万块砖,每个线程搬完一块砖,不销毁线程,而是还回线程池,然后再次参与搬砖任务,直到所有的砖搬完,大家一起结束。
线程池的基本写法
"""
这个示例线程池在干活时,主线程是继续往下走的
"""
import time
from concurrent.futures import ThreadPoolExecutor
# 基本用法
# pool = ThreadPoolExecutor(100)
# pool.submit(函数名,参数1,参数2,参数...)
def task(video_url, num):
print("开始执行任务", video_url)
time.sleep(1)
# 创建线程池,最多维护2个线程。
pool = ThreadPoolExecutor(2)
# 任务列表
url_list = ["www.xxxx-{}.com".format(i) for i in range(10)]
for url in url_list:
# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。
pool.submit(task, url, 2)
print("END")
"""
这个示例线程池在干活时,主线程则是等待线程池中的任务执行完毕后,再往下走
"""
import time
from concurrent.futures import ThreadPoolExecutor
def task(video_url, num):
print("开始执行任务", video_url)
time.sleep(1)
# 创建线程池,最多维护2个线程。
pool = ThreadPoolExecutor(2)
# 任务列表
url_list = ["www.xxxx-{}.com".format(i) for i in range(10)]
for url in url_list:
# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。
pool.submit(task, url, 2)
print("执行中...")
pool.shutdown(True) # 等待线程池中的任务执行完毕后,在继续执行
print('继续往下走')
回调函数的使用
当每个线程拿到请求的结果后,我们可以将结果交给回调函数来统一处理,比如对请求结果进行数据清洗、保存到本地等动作。
目的是为了做代码解耦合,让逻辑更为顺畅。
"""
这个示例主要演示线程池中增加回调函数,以及回调函数的固定写法
注意,回调函数默认接收一个形参,而传值是内部传递的,我们只管add_done_callback(done)
不需要考虑def done(response):这个response谁给它传的问题,你要问,那就是内部在传递,我们不管
你可以认为下面示例是回调函数的典型写法
"""
import time
from concurrent.futures import ThreadPoolExecutor
def task(video_url, num):
print("开始执行任务", video_url)
time.sleep(1)
return "{}执行完毕".format(video_url)
def done(response):
# 注意,这个response是个task返回的返回值对象
# 如果真的要获取真正的返回值则需要.result()才行
# 算是一种固定写法
print("任务执行完成后的回调", response.result())
if __name__ == '__main__':
# 创建线程池,最多维护2个线程。
pool = ThreadPoolExecutor(2)
url_list = ["www.xxxx-{}.com".format(i) for i in range(10)]
for url in url_list:
# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。
future = pool.submit(task, url, 2)
# 设置回调函数,当任务完成时,自动触发。
# 注意这个回调函数是主线程在执行的,而且默认只能传递一个参数给回调函数,而且参数不用显式传递
# 内部会自动传递
# 如果需要传递多个参数,则可以使用partial函数来处理。
future.add_done_callback(done)
"""
这个示例主要演示,线程池中应用回调函数,且需要额外给回调函数传递多个形参的场景,也算是一种固定写法示范
需要你掌握偏函数partial的用法
"""
import time
from functools import partial
from concurrent.futures import ThreadPoolExecutor
def task(video_url, num):
print("开始执行任务", video_url)
time.sleep(1)
return "{}执行完毕".format(video_url)
def done(*args, **kwargs):
print("任务执行完成后的回调", args, kwargs)
"""
注意,打印结果长这样
('x1', 'x2', <Future at 0x142891881c0 state=finished returned str>) {'x3': 'x3', 'x4': 'x4'}
其中,args中,x1和x2是我们在partial中传递的两个额外的参数
另一个 <Future at 0x142891881c0 state=finished returned str> 参数是done函数内部接收到的return对象
也就是task函数的返回值对象,需要点result()才能拿到task函数的返回值,取值方式如下
x1, x2, response = args
print(response.result())
kwargs中的字典,无需多说,也是额外传给回调函数的形参一种传参形式,主要是为了给你演示能这么传参
"""
if __name__ == '__main__':
# 创建线程池,最多维护2个线程。
pool = ThreadPoolExecutor(2)
url_list = ["www.xxxx-{}.com".format(i) for i in range(10)]
for url in url_list:
# 在线程池中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池;如果没有空闲线程,则等待。
future = pool.submit(task, url, 2)
# 设置回调函数,当任务完成时,自动触发。
# 如果需要传递多个参数给回调函数,则可以使用partial函数来处理。
future.add_done_callback(partial(done, 'x1', 'x2', x3='x3', x4='x4'))
"""
来个回调函数需要多个形参的实战案例
"""
import time
import requests
from functools import partial
from concurrent.futures import ThreadPoolExecutor
def task(url):
response = requests.get(url)
# 拿到图片的二进制数据并返回给回调函数done
return response.content
def done(*args, **kwargs):
# filename是额外传递过来的文件名
# response则是task函数的返回值
filename, response = args
with open(filename, 'wb') as fp:
# 想要取返回值的具体内容,则需要点result()才行
fp.write(response.result())
print(f"{filename}下载完成")
if __name__ == '__main__':
# 创建线程池,最多维护2个线程。
pool = ThreadPoolExecutor(2)
url_list = [
"https://hbimg.huabanimg.com/51d46dc32abe7ac7f83b94c67bb88cacc46869954f478-aP4Q3V",
"https://hbimg.huabanimg.com/703fdb063bdc37b11033ef794f9b3a7adfa01fd21a6d1-wTFbnO",
"https://hbimg.huabanimg.com/b438d8c61ed2abf50ca94e00f257ca7a223e3b364b471-xrzoQd",
"https://hbimg.huabanimg.com/4edba1ed6a71797f52355aa1de5af961b85bf824cb71-px1nZz",
]
for url in url_list:
future = pool.submit(task, url)
filename = f"{url[-2:]}.jpg"
# 回调函数done,按照默认来说,只能默认接收一个参数,但这还需要额外的传递一个图片名字
# 那多个参数就需要用partial特殊处理了
future.add_done_callback(partial(done, filename))
"""
如果对偏函数partial不熟悉,对于回调函数需要多个参数的场景,用闭包也能解决,希望你会闭包....
"""
import time
import requests
from functools import partial
from concurrent.futures import ThreadPoolExecutor
def task(url):
response = requests.get(url)
# 拿到图片的二进制数据并返回给回调函数done
return response.content
def closure(filename):
def done(response):
# filename是额外传递过来的文件名
# response则是task函数的返回值
with open(filename, 'wb') as fp:
# 想要取返回值的具体内容,则需要点result()才行
fp.write(response.result())
print(f"{filename}下载完成")
return done
if __name__ == '__main__':
# 创建线程池,最多维护2个线程。
pool = ThreadPoolExecutor(2)
url_list = [
"https://hbimg.huabanimg.com/51d46dc32abe7ac7f83b94c67bb88cacc46869954f478-aP4Q3V",
"https://hbimg.huabanimg.com/703fdb063bdc37b11033ef794f9b3a7adfa01fd21a6d1-wTFbnO",
"https://hbimg.huabanimg.com/b438d8c61ed2abf50ca94e00f257ca7a223e3b364b471-xrzoQd",
"https://hbimg.huabanimg.com/4edba1ed6a71797f52355aa1de5af961b85bf824cb71-px1nZz",
]
for url in url_list:
future = pool.submit(task, url)
filename = f"{url[-2:]}.jpg"
# 回调函数done,按照默认来说,只能默认接收一个参数,但这还需要额外的传递一个图片名字
# 如果对partial函数不熟悉,可以用闭包来解决这个问题
# 不过如果对闭包不熟悉的话,额.....没有什么特别好的办法了啊.....
# 说下闭包在这里的作用,closure函数在执行时,先把额外的参数传递进去
# 然后closure函数的返回值done函数是才是真正的回调函数,那么默认的形参我们也就无需处理了
# future.add_done_callback(closure(filename))
# 如果还是不太理解的话,可以把上面一行代码,拆成两行,应该好理解点,代码的逻辑是等价的
done = closure(filename)
future.add_done_callback(done)
# 总之闭包就实现把额外的参数接收了,然后回调函数那里就按照最开始的回调函数写的写法来就好了
对线程的执行结果的统一处理的另一种方式
除了上面通过回调函数来统一处理线程的执行结果,还有其他写法,上代码:
import time
import requests
from functools import partial
from concurrent.futures import ThreadPoolExecutor
def task(url):
response = requests.get(url)
# 拿到图片的二进制数据并返回给回调函数done
return response.content
def done(filename, future):
# filename是额外传递过来的文件名
# response则是task函数的返回值
with open(filename, 'wb') as fp:
# 想要取返回值的具体内容,则需要点result()才行
fp.write(future.result())
print(f"{filename}下载完成")
if __name__ == '__main__':
# 创建线程池,最多维护2个线程。
pool = ThreadPoolExecutor(2)
url_list = [
"https://hbimg.huabanimg.com/51d46dc32abe7ac7f83b94c67bb88cacc46869954f478-aP4Q3V",
"https://hbimg.huabanimg.com/703fdb063bdc37b11033ef794f9b3a7adfa01fd21a6d1-wTFbnO",
"https://hbimg.huabanimg.com/b438d8c61ed2abf50ca94e00f257ca7a223e3b364b471-xrzoQd",
"https://hbimg.huabanimg.com/4edba1ed6a71797f52355aa1de5af961b85bf824cb71-px1nZz",
]
future_list = []
for url in url_list:
future = pool.submit(task, url)
# 将每个任务的结果保存起来,后续统一处理
# future_list.append(future)
# 但示例还需要filename,我这里再封装成元组,重点还是future的用法
filename = f"{url[-2:]}.jpg"
future_list.append((filename, future))
pool.shutdown(True) # 等所有的任务都执行完
# 循环着拿所有任务的执行结果,也就是task的返回值
for f in future_list:
filename, future = f
# print(future.result()) # 也是点result()固定写法
# 也可以单独封装,这就跟回调函数差不多了,做个了解就完了
done(filename, future)
单利模式
单利模式也是面试常问和实际开发中常用的一个知识点,这里来介绍一下单例模式在多线程中的应用。
回顾下单利模式吧
之前写一个类,每次执行 类()
都会实例化一个类的对象。
单例模式的话,就是每次实例化类的对象时,都是最开始创建的那个对象,不再重复创建对象。
class Foo:
pass
obj1 = Foo()
obj2 = Foo()
print(obj1, obj2)
print(id(obj1), id(obj2))
class Singleton:
instance = None
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
# 返回空对象
if cls.instance:
return cls.instance
cls.instance = object.__new__(cls)
return cls.instance
obj1 = Singleton('张开')
obj2 = Singleton('李开')
print(obj1, obj2)
print(id(obj1), id(obj2))
单利模式在多线程中的应用
多线程中,不加处理的话,会有bug。
import threading
import time
class Singleton:
instance = None
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
if cls.instance:
return cls.instance
# 正常情况下,不加sleep,bug效果基本不出现,因为如果第一个线程执行的特别特别快
# 导致后面的线程在判断instance的时候,instance已经存在了,就直接返回了
# 然后看着单例模式是没问题的
# 这里但凡加个sleep,让各个线程走完instance判断之后
# 那么在创建对象之前都停一下,此时对象都没有创建出来,所以上面的if判断都会失败
# 然后sleep完事之后,每个线程都会创建一个对象,这样就破坏了单例模式
time.sleep(0.000001) # 这里不加就没问题,加了bug就出来了
cls.instance = object.__new__(cls)
return cls.instance
def task():
obj = Singleton('x')
print(id(obj))
for i in range(10):
t = threading.Thread(target=task)
t.start()
"""
2571315164688
2572141587152
2572142639040
2572142638992
2572142640000
2572142640144
2572142805680
2572142805824
2571317513856
2572142640096
"""
import threading
import time
lock = threading.RLock()
class Singleton:
instance = None
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
with lock:
if cls.instance:
return cls.instance
# time.sleep(0.000001) # 睡不睡都没问题了
cls.instance = object.__new__(cls)
return cls.instance
def task():
obj = Singleton('x')
print(id(obj))
for i in range(10):
t = threading.Thread(target=task)
t.start()
"""
1998150499488
1998150499488
1998150499488
1998150499488
1998150499488
1998150499488
1998150499488
1998150499488
1998150499488
1998150499488
"""
import threading
import time
lock = threading.RLock()
class Singleton:
instance = None
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
# 锁本身也是有开销的,如果instance有值了,压根不需要再加锁往下走了
if cls.instance:
return cls.instance
with lock:
if cls.instance:
return cls.instance
# time.sleep(0.000001) # 睡不睡都没问题了
cls.instance = object.__new__(cls)
return cls.instance
def task():
obj = Singleton('x')
print(id(obj))
for i in range(10):
t = threading.Thread(target=task)
t.start()
"""
1625916816592
1625916816592
1625916816592
1625916816592
1625916816592
1625916816592
1625916816592
1625916816592
1625916816592
1625916816592
"""
线程安全的扩展
https://www.cnblogs.com/traditional/p/13649359.html
https://blog.csdn.net/qq_27283619/article/details/106021295
https://segmentfault.com/q/1010000041987131
线程状态和全局解释器锁:https://docs.python.org/zh-cn/3/c-api/init.html#thread-state-and-the-global-interpreter-lock
字节码: