1. 核心概念对比
| 特性维度 | 进程 (Process) | 线程 (Thread) | 协程 (Coroutine) |
|---|---|---|---|
| 基本定义 | 资源分配的基本单位,拥有独立的地址空间 | CPU调度的基本单位,共享进程的资源 | 用户态的轻量级线程,在单线程内通过协作进行任务切换 |
| 隔离性 | 强,一个进程崩溃通常不会影响其他进程 | 弱,一个线程崩溃可能导致整个进程退出,影响同进程所有线程。 | 无,所有协程在同一线程内运行。 |
| 开销 | 大,创建、销毁和上下文切换(涉及页表、寄存器等)成本高。 | 中等,创建和切换开销比进程小,但仍需内核介入。 | 极小,切换由程序控制,无需内核参与,只需保存少量寄存器上下文。 |
| 数据共享 | 复杂,需要进程间通信(IPC),如管道、消息队列、共享内存。 | 简单,可直接读写进程的全局变量和堆内存,但需同步机制(如锁)。 | 极简单,可直接访问共享变量,通常在单线程内通过事件循环协作,无需锁。 |
| 并发性 | 多进程可利用多核CPU实现真正并行。 | 多线程可并发,但因GIL限制,在CPython中难以充分利用多核进行CPU计算。 | 单线程内可实现极高并发,尤其适合I/O密集型任务。 |
| Python库 | multiprocessing | threading | asyncio |
2. 三者间的联系
尽管存在差异,但进程、线程和协程之间存在着清晰的层次和协作关系
- 包含关系:一个操作系统由多个进程组成;一个进程包含一个或多个线程;一个线程可以运行成千上万个协程。
- 协作共存:现代复杂应用(如Nginx、数据库)常采用混合模型。例如,采用多进程架构保证稳定性,每个进程内使用多线程处理事务,而在线程内部又使用协程来处理海量网络连接,以兼顾隔离性、效率和并发能力
3. python 代码
在Python中,multiprocessing、threading和asyncio这三个库分别是处理进程、线程和协程的核心工具。
3.1 简单代码
- 使用 multiprocessing进行CPU密集型计算
multiprocessing通过创建子进程绕过GIL限制,充分利用多核CPU,适合计算密集型任务
1import multiprocessing 2import time 3from typing import List 4 5def calculate_square_chunk(chunk: List[int]) -> List[int]: 6 """计算列表中数字的平方(分块处理)""" 7 return [n * n for n in chunk] 8 9def calculate_square_no_process(numbers: List[int]) -> float: 10 """不使用多进程的计算函数""" 11 start_time = time.time() 12 result = [n * n for n in numbers] 13 end_time = time.time() 14 execution_time = end_time - start_time 15 print(f"非多进程执行时间: {execution_time:.6f} 秒") 16 return execution_time 17 18def calculate_with_multiprocessing_pool(numbers: List[int], num_processes: int = None) -> float: 19 """使用进程池进行多进程计算""" 20 if num_processes is None: 21 num_processes = multiprocessing.cpu_count() 22 23 print(f"使用 {num_processes} 个进程") 24 25 # 将数据分成多个块 26 chunk_size = len(numbers) // num_processes 27 chunks = [numbers[i:i + chunk_size] for i in range(0, len(numbers), chunk_size)] 28 29 start_time = time.time() 30 31 with multiprocessing.Pool(processes=num_processes) as pool: 32 results = pool.map(calculate_square_chunk, chunks) 33 34 # 合并结果(如果需要的话) 35 final_result = [] 36 for chunk_result in results: 37 final_result.extend(chunk_result) 38 39 multiprocess_time = time.time() - start_time 40 return multiprocess_time 41 42def calculate_with_single_process(numbers: List[int]) -> float: 43 """使用单个进程进行计算(对比基准)""" 44 start_time = time.time() 45 46 p = multiprocessing.Process(target=calculate_square_chunk, args=(numbers,)) 47 p.start() 48 p.join() 49 50 return time.time() - start_time 51 52if __name__ == '__main__': 53 # 优化数据生成,使用更合理的数据量 54 base_numbers = list(range(1, 21)) 55 numbers = base_numbers * 2000000 # 调整为更合理的数量 56 57 print(f"数据量: {len(numbers):,} 个元素") 58 print(f"CPU核心数: {multiprocessing.cpu_count()}") 59 print(f"\n内存使用:") 60 print(f"原始数据大小: {len(numbers) * 4 / (1024*1024):.1f} MB") 61 62 # 测试1: 不使用多进程 63 print("\n=== 测试1: 不使用多进程 ===") 64 non_multiprocess_time = calculate_square_no_process(numbers) 65 66 # 测试2: 使用进程池(多进程) 67 print("\n=== 测试2: 使用进程池(多进程) ===") 68 multiprocess_pool_time = calculate_with_multiprocessing_pool(numbers) 69 print(f"多进程池执行时间: {multiprocess_pool_time:.6f} 秒") 70 71 # 测试3: 使用单个进程(对比) 72 print("\n=== 测试3: 使用单个进程(对比) ===") 73 single_process_time = calculate_with_single_process(numbers) 74 print(f"单进程执行时间: {single_process_time:.6f} 秒") 75 76 # 性能比较 77 print("\n=== 性能比较 ===") 78 print(f"非多进程 vs 多进程池: {non_multiprocess_time/multiprocess_pool_time:.2f}x 加速") 79 print(f"单进程 vs 多进程池: {single_process_time/multiprocess_pool_time:.2f}x 加速") 80
输出结果:
1数据量: 40,000,000 个元素 2CPU核心数: 16 3 4内存使用: 5原始数据大小: 152.6 MB 6 7=== 测试1: 不使用多进程 === 8非多进程执行时间: 0.579534 秒 9 10=== 测试2: 使用进程池(多进程) === 11使用 16 个进程 12多进程池执行时间: 1.508401 秒 13 14=== 测试3: 使用单个进程(对比) === 15单进程执行时间: 0.715703 秒 16 17=== 性能比较 === 18非多进程 vs 多进程池: 0.38x 加速 19单进程 vs 多进程池: 0.47x 加速 20
- 使用 asyncio处理高并发I/O操作
asyncio非常适合I/O密集型任务,如网络请求、文件读写等,能在单线程内实现高并发
1import asyncio 2import time 3 4async def say_after(delay, what): 5 await asyncio.sleep(delay) # 模拟I/O操作,如网络请求 6 print(what) 7 8async def main(): 9 print(f"Started at {time.strftime('%X')}") 10 start_time = time.time() 11 # 顺序执行两个协程 12 await say_after(1, 'Hello') 13 await say_after(2, 'World') 14 15 print(f"Finished at {time.strftime('%X')}") 16 total_time = time.time() - start_time 17 18 print(f"顺序执行时间: {total_time:.4f} 秒") 19 20# 运行异步主函数 21asyncio.run(main()) 22
输出:
1Started at 16:22:11 2Hello 3World 4Finished at 16:22:14 5顺序执行时间: 3.0037 秒 6
上面的例子是顺序执行,asyncio的强大之处在于能并发执行多个I/O任务:
1import asyncio 2import time 3 4async def fetch_data(task_id, seconds): 5 print(f"Task {task_id} started.") 6 await asyncio.sleep(seconds) # 模拟耗时的I/O操作 7 print(f"Task {task_id} completed after {seconds} second(s).") 8 return f"Data from task {task_id}" 9 10async def main(): 11 # 创建多个任务并发执行 12 print(f"Started at {time.strftime('%X')}") 13 start_time = time.time() 14 tasks = [ 15 asyncio.create_task(fetch_data(1, 2)), 16 asyncio.create_task(fetch_data(2, 1)), 17 asyncio.create_task(fetch_data(3, 3)) 18 ] 19 20 # 等待所有任务完成并收集结果 21 results = await asyncio.gather(*tasks) 22 total_time = time.time() - start_time 23 print(f"Finished at {time.strftime('%X')}") 24 print(f"Total time taken: {total_time:.4f} seconds") 25 print(f"All tasks done. Results: {results}") 26 27asyncio.run(main()) 28
输出结果:
1Started at 16:29:31 2Task 1 started. 3Task 2 started. 4Task 3 started. 5Task 2 completed after 1 second(s). 6Task 1 completed after 2 second(s). 7Task 3 completed after 3 second(s). 8Finished at 16:29:34 9Total time taken: 3.0019 seconds 10All tasks done. Results: ['Data from task 1', 'Data from task 2', 'Data from task 3'] 11
- 多线程生产环境使用ThreadPoolExecutor
1from concurrent.futures import ThreadPoolExecutor 2import time 3import random 4 5def process_item(item): 6 """处理单个项目的函数""" 7 process_time = random.uniform(0.5, 2.0) 8 time.sleep(process_time) 9 result = f"已处理: {item} (耗时: {process_time:.2f}秒)" 10 return result 11 12def main(): 13 # 要处理的项目列表 14 items = [f"项目--> {i}" for i in range(1,13)] 15 16 # 使用线程池(最大4个线程) 17 with ThreadPoolExecutor(max_workers=4) as executor: 18 # 提交所有任务 19 future_to_item = { 20 executor.submit(process_item, item): item 21 for item in items 22 } 23 24 # 收集结果 25 results = [] 26 for future in future_to_item: 27 try: 28 result = future.result() 29 results.append(result) 30 print(result) 31 except Exception as e: 32 print(f"处理出错: {e}") 33 34 print(f"\n总共处理了 {len(results)} 个项目") 35 36if __name__ == "__main__": 37 main() 38
输出:
1已处理: 项目--> 2 (耗时: 1.77秒) 2已处理: 项目--> 3 (耗时: 0.63秒) 3已处理: 项目--> 4 (耗时: 1.06秒) 4已处理: 项目--> 5 (耗时: 1.11秒) 5已处理: 项目--> 6 (耗时: 1.22秒) 6已处理: 项目--> 7 (耗时: 1.08秒) 7已处理: 项目--> 8 (耗时: 1.04秒) 8已处理: 项目--> 9 (耗时: 0.99秒) 9已处理: 项目--> 10 (耗时: 1.67秒) 10已处理: 项目--> 11 (耗时: 1.56秒) 11已处理: 项目--> 12 (耗时: 0.93秒) 12 13总共处理了 12 个项目 14
3. 实战建议与选型
在实际项目中如何选择?下面这个决策树可以帮你快速判断
- 任务是CPU密集型吗?(例如,计算圆周率、数据编码/解码、图像处理)
是 → 选择 multiprocessing。创建多个进程,充分利用多核CPU。
否(是I/O密集型,如网络爬虫、Web服务、数据库查询)→ 进入第2步。 - 需要处理非常高并发(如超过1000)的连接吗?并且希望代码效率最高、开销最小?
是 → 选择 asyncio(协程)。单线程内处理海量连接,切换开销极小。
否(并发量不高,或对代码结构有偏好)→ 进入第3步。 - I/O操作是阻塞式的吗?(例如,使用了一些不支持异步的传统库)
是,且不想或不能修改 → 选择 threading(线程)。线程池是一个简单有效的选择。
否,或愿意使用异步库 → 仍然推荐 asyncio,因为它通常能提供更好的性能。
简单来说:
计算用进程:把繁重的计算任务分给多个进程并行处理。
I/O用协程:在等待网络、磁盘时,用协程并发处理其他任务,效率最高。
兼容用线程:当遇到不支持异步的库时,线程是折中方案。
《进程、线程、协程三者的区别和联系》 是转载文章,点击查看原文。