前言
大家好,我是倔强青铜三。欢迎关注我,微信公众号:倔强青铜三。点赞、收藏、关注,一键三连!
欢迎继续 苦练Python第64天。
今天咱们把“并发”这把瑞士军刀——threading 模块,从开箱到实战一次性讲透。全程只用 Python 自带标准库,代码复制即可运行!
一、为什么需要线程?
- I/O 密集场景:爬虫、文件下载、日志采集,CPU 在等网络/磁盘,闲着也是闲着。
- 共享内存:比多进程轻量,数据不用序列化来回拷贝。
- GIL?别慌:I/O 密集时线程照样提速;CPU 密集请转投
multiprocessing。
threading 常用 API 一览表
| API | 作用 | 关键入参 | 返回值/副作用 |
|---|---|---|---|
| threading.Thread(target, args=(), kwargs={}, name=None, daemon=None) | 创建线程对象 | target: 可调用对象;args/kwargs: 位置/关键字参数;name: 线程名;daemon: 是否为守护线程 | Thread 实例 |
| Thread.start() | 启动线程,底层调用 run() | — | 若重复调用抛 RuntimeError |
| Thread.run() | 线程真正执行的逻辑;可被子类重写 | — | 无 |
| Thread.join(timeout=None) | 阻塞等待线程结束 | timeout: 秒级浮点超时 | 总是 None;超时后仍需用 is_alive() 判断是否存活 |
| Thread.is_alive() | 线程是否存活 | — | True/False |
| Thread.name / Thread.ident / Thread.native_id | 线程名字/线程标识符/系统级线程 ID | — | str / int or None / int or None |
| Thread.daemon | 守护线程标志 | 可读写布尔值 | 设置前必须未启动 |
| threading.current_thread() | 获取当前线程对象 | — | Thread 实例 |
| threading.active_count() | 当前存活线程数量 | — | int |
| threading.enumerate() | 当前所有存活线程列表 | — | list[Thread] |
| threading.Lock() | 创建原始互斥锁 | — | Lock 实例 |
| Lock.acquire(blocking=True, timeout=-1) | 获取锁 | blocking=False 非阻塞;timeout 秒级超时 | 成功返回 True,否则 False |
| Lock.release() | 释放锁 | — | 若未持有锁抛 RuntimeError |
| Lock.locked() | 查询锁状态 | — | True/False |
| threading.RLock() | 创建可重入锁 | — | RLock 实例;方法同 Lock |
| threading.Event() | 事件对象 | — | Event 实例 |
| Event.set() / Event.clear() / Event.wait(timeout=None) | 置位/复位/等待事件 | timeout 秒级超时 | wait 返回 True(被 set)或 False(超时) |
| threading.Timer(interval, function, args=None, kwargs=None) | 延时线程 | interval: 延迟秒;function: 回调;args/kwargs: 参数 | Timer 实例,可 .cancel() |
| threading.local() | 线程局部数据容器 | — | local 实例,属性隔离 |
记住:带
timeout的阻塞方法都可被Ctrl+C中断;
任何锁/事件/信号量都支持with上下文管理器,推荐优先使用!
二、30 秒启动你的第一个线程
1# demo_hello_thread.py 2import threading 3import time 4 5def say_hello(name, delay): 6 time.sleep(delay) 7 print(f"你好,{name},来自线程 {threading.current_thread().name}") 8 9# 创建线程 10t = threading.Thread(target=say_hello, args=("倔强青铜三", 2), name="青铜线程") 11t.start() # 启动 12t.join() # 等它跑完 13print("主线程结束") 14
运行效果:
1你好,倔强青铜三,来自线程 青铜线程 2主线程结束 3
三、批量任务:线程池手写版
场景:并发爬 3 个网页(用 sleep 模拟 I/O)。
1# demo_pool.py 2import threading 3import time 4 5links = ["https://a.com", "https://b.com", "https://c.com"] 6 7def crawl(url): 8 print(f"开始 {url}") 9 time.sleep(2) # 模拟网络延迟 10 print(f"完成 {url}") 11 12threads = [] 13for link in links: 14 t = threading.Thread(target=crawl, args=(link,)) 15 t.start() 16 threads.append(t) 17 18for t in threads: 19 t.join() 20 21print("全部爬完!") 22
运行效果:
1开始 https://a.com 2开始 https://b.com 3开始 https://c.com 4完成 https://a.com 5完成 https://b.com 6完成 https://c.com 7全部爬完! 8
四、线程安全:Lock 互斥锁
卖票案例:100 张票,10 个窗口同时卖,不加锁会超卖。
1# demo_lock.py 2import threading 3 4tickets = 4 5lock = threading.Lock() 6 7def sell(window_id): 8 global tickets 9 while True: 10 with lock: # 推荐用 with,自动 acquire/release 11 if tickets <= 0: 12 break 13 tickets -= 1 14 print(f"窗口{window_id} 卖出 1 张,剩余 {tickets}") 15 # 临界区外可快速做其他事 16 print(f"窗口{window_id} 下班~") 17 18threads = [threading.Thread(target=sell, args=(i+1,)) for i in range(4)] 19for t in threads: 20 t.start() 21for t in threads: 22 t.join() 23
运行效果:
1窗口1 卖出 1 张,剩余 3 2窗口1 卖出 1 张,剩余 2 3窗口1 卖出 1 张,剩余 1 4窗口1 卖出 1 张,剩余 0 5窗口1 下班~ 6窗口2 下班~ 7窗口3 下班~ 8窗口4 下班~ 9
五、线程通信:Event 红绿灯
主线程发信号让子线程起跑。
1# demo_event.py 2import threading 3import time 4 5start_event = threading.Event() 6 7def runner(name): 8 print(f"{name} 就位,等发令枪") 9 start_event.wait() # 阻塞直到 set() 10 print(f"{name} 起跑!") 11 12for i in range(3): 13 threading.Thread(target=runner, args=(f"选手{i+1}",)).start() 14 15time.sleep(2) 16print("裁判:预备——跑!") 17start_event.set() # 一枪令下 18
运行效果:
1选手1 就位,等发令枪 2选手2 就位,等发令枪 3选手3 就位,等发令枪 4裁判:预备——跑! 5选手1 起跑! 6选手2 起跑! 7选手3 起跑! 8
六、定时器:Timer 秒杀闹钟
延迟 3 秒响铃,可中途取消。
1# demo_timer.py 2import threading 3 4def ring(): 5 print("⏰ 起床啦!!") 6 7alarm = threading.Timer(3, ring) 8alarm.start() 9print("3 秒后响铃,输入 c 取消") 10if input().strip() == "c": 11 alarm.cancel() 12 print("闹钟已取消") 13
七、线程局部变量:local 隔离数据
每个线程独享一份变量,互不打架。
1# demo_local.py 2import threading 3 4local = threading.local() 5 6def worker(num): 7 local.count = num # 线程独有属性 8 for _ in range(3): 9 local.count += 1 10 print(f"{threading.current_thread().name} -> {local.count}") 11 12for i in range(2): 13 threading.Thread(target=worker, args=(i*10,), name=f"线程{i+1}").start() 14
运行效果:
1线程1 -> 1 2线程1 -> 2 3线程1 -> 3 4线程2 -> 11 5线程2 -> 12 6线程2 -> 13 7
八、完整实战:多线程文件下载器(模拟)
功能:并发“下载”多个文件,统计总耗时。
1# demo_downloader.py 2import threading 3import time 4import random 5 6urls = [f"https://file{i}.bin" for i in range(5)] 7results = {} 8lock = threading.Lock() 9 10def download(url): 11 print(f"开始 {url}") 12 sec = random.randint(1, 3) 13 time.sleep(sec) 14 with lock: 15 results[url] = f"{sec}s" 16 print(f"{url} 完成,耗时 {sec}s") 17 18start = time.time() 19threads = [threading.Thread(target=download, args=(u,)) for u in urls] 20for t in threads: 21 t.start() 22for t in threads: 23 t.join() 24 25print("全部下载完毕!") 26for url, spent in results.items(): 27 print(url, "->", spent) 28print(f"总耗时 {time.time() - start:.2f}s") 29
运行效果:
1开始 https://file0.bin 2开始 https://file1.bin 3开始 https://file2.bin 4开始 https://file3.bin 5开始 https://file4.bin 6https://file1.bin 完成,耗时 1s 7https://file2.bin 完成,耗时 1s 8https://file4.bin 完成,耗时 2s 9https://file0.bin 完成,耗时 3s 10https://file3.bin 完成,耗时 3s 11全部下载完毕! 12https://file1.bin -> 1s 13https://file2.bin -> 1s 14https://file4.bin -> 2s 15https://file0.bin -> 3s 16https://file3.bin -> 3s 17总耗时 3.00s 18
九、with 上下文管理器 × threading:让锁像文件一样好写
还记得文件操作的 with open(...) as f: 吗?
threading 模块里的 Lock、RLock、Condition、Semaphore、BoundedSemaphore 全部支持 with 协议:
进入代码块自动 acquire(),退出时自动 release()——不会忘、不会漏、不会死锁!
9.1 原始锁 Lock 的两种写法对比
1# ❌ 传统写法:容易漏掉 release() 2lock = threading.Lock() 3lock.acquire() 4try: 5 # 临界区 6 global_num += 1 7finally: 8 lock.release() 9 10# ✅ with 写法:一行搞定,异常也不怕 11lock = threading.Lock() 12with lock: 13 global_num += 1 14
9.2 RLock 递归锁同样适用
1# demo_rlock_with.py 2import threading 3 4rlock = threading.RLock() 5 6def nested(): 7 with rlock: # 第一次获取 8 print("外层加锁") 9 with rlock: # 同一线程可再次获取 10 print("内层重入,不会死锁") 11 12threading.Thread(target=nested).start() 13
9.3 Condition 条件变量 + with 经典范式
1# demo_condition_with.py 2import threading 3 4cv = threading.Condition() 5flag = False 6 7def waiter(): 8 with cv: # 自动 acquire 9 cv.wait_for(lambda: flag) # 等待条件成立 10 print("waiter 收到通知!") 11 12def setter(): 13 global flag 14 with cv: 15 flag = True 16 cv.notify_all() 17 18threading.Thread(target=waiter).start() 19threading.Thread(target=setter).start() 20
9.4 Semaphore 资源池限流
1# demo_sema_with.py 2import threading 3import time 4 5pool = threading.Semaphore(value=3) # 并发 3 条 6 7def worker(i): 8 with pool: # 获取令牌 9 print(f"任务 {i} 进入") 10 time.sleep(2) 11 print(f"任务 {i} 完成") 12 13for i in range(5): 14 threading.Thread(target=worker, args=(i,)).start() 15
9.5 自定义类也能支持 with
只要实现 __enter__、__exit__ 即可:
1class MyLock: 2 def __init__(self): 3 self._lock = threading.Lock() 4 def __enter__(self): 5 self._lock.acquire() 6 return self 7 def __exit__(self, exc_type, exc_val, exc_tb): 8 self._lock.release() 9 10with MyLock(): 11 print("自定义锁也能 with!") 12
小结
| 武器 | 用途 | 备注 |
|---|---|---|
| Thread | 创建线程 | 始终记得 join |
| Lock/RLock | 临界区互斥 | 推荐 with lock: |
| Event | 线程间通知 | set/clear/wait |
| Timer | 延迟执行 | 可 cancel |
| local | 线程独享数据 | 替代全局变量 |
最后感谢阅读!欢迎关注我,微信公众号:
倔强青铜三。
欢迎点赞、收藏、关注,一键三连!!