Python多线程并发教程:从入门到实践
前言
在现代软件开发中,并发编程已经成为提高程序性能和响应性的重要手段。Python作为一种流行的高级编程语言,提供了多种实现并发的方式,其中多线程是最常用的方式之一。本文将系统地介绍Python中的多线程编程,从基础概念到实际应用,帮助初学者和中级开发者掌握这一重要技能。
本教程将涵盖以下内容:
- Python多线程的基本概念
- threading模块的使用方法
- concurrent.futures模块的使用方法
- asyncio模块的使用方法
- 多线程数据采集实战
- 多线程方式的对比与选择
- 多线程与多进程的区别
无论你是刚接触并发编程的新手,还是想深入了解Python多线程机制的中级开发者,本文都将为你提供有价值的指导和实践经验。
一、Python多线程基础概念
1.1 什么是线程?
线程(Thread)是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程可以并发多个线程,每条线程并行执行不同的任务。
在Python中,线程是通过标准库中的threading
模块来实现的。多线程编程允许程序同时执行多个操作,从而提高程序的执行效率和响应性。
1.2 Python中的GIL(全局解释器锁)
在讨论Python多线程之前,必须了解一个重要的概念:全局解释器锁(Global Interpreter Lock,简称GIL)。
GIL是CPython解释器(Python的默认实现)中的一个机制,它确保同一时刻只有一个线程在执行Python字节码。这意味着,即使在多核处理器上,Python的多线程也不能同时执行多个线程的Python代码。
GIL的影响:
- 对CPU密集型任务,多线程并不能提供真正的并行计算
- 对IO密集型任务(如网络请求、文件读写),多线程仍然可以显著提高程序性能
理解GIL的存在及其限制,对于正确使用Python多线程至关重要。
1.3 多线程适用场景
尽管有GIL的限制,Python的多线程在以下场景中仍然非常有用:
- IO密集型任务:当程序需要等待外部资源(如文件、网络连接)时,多线程可以让程序在等待过程中执行其他任务。
- 并发请求处理:如Web服务器处理多个客户端请求。
- 后台任务:在不阻塞主程序的情况下执行后台操作。
- 用户界面响应:保持GUI应用的响应性,同时执行耗时操作。
1.4 线程的生命周期
一个线程的生命周期包括以下几个状态:
- 新建(New):创建线程对象
- 就绪(Runnable):调用start()方法后,线程进入就绪状态,等待CPU调度
- 运行(Running):线程获得CPU时间片,执行run()方法中的代码
- 阻塞(Blocked):线程暂时停止执行,等待某个条件(如IO操作完成、锁释放等)
- 终止(Terminated):线程执行完毕或因异常退出
了解线程的生命周期有助于我们更好地控制和管理线程的执行。
二、threading模块详解
Python的threading
模块提供了创建和管理线程的功能。它是Python标准库的一部分,无需额外安装。
2.1 创建和启动线程
在Python中,创建线程主要有两种方式:
方式一:直接使用Thread类
import threading
import time
# 定义线程要执行的函数
def worker(name):
print(f"线程 {name} 开始工作")
time.sleep(2) # 模拟工作耗时
print(f"线程 {name} 工作完成")
# 创建线程
t = threading.Thread(target=worker, args=('Thread-1',))
# 启动线程
t.start()
# 等待线程结束
t.join()
print("主线程继续执行")
在上面的代码中:
target
参数指定线程要执行的函数args
参数以元组形式传递函数参数start()
方法启动线程join()
方法等待线程执行完毕
方式二:继承Thread类
import threading
import time
# 继承Thread类
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
# 重写run方法
def run(self):
print(f"线程 {self.name} 开始工作")
time.sleep(2) # 模拟工作耗时
print(f"线程 {self.name} 工作完成")
# 创建线程实例
t = MyThread("Thread-2")
# 启动线程
t.start()
# 等待线程结束
t.join()
print("主线程继续执行")
继承Thread类的方式更加面向对象,适合复杂的线程行为。
2.2 线程同步机制
当多个线程同时访问共享资源时,可能会导致数据不一致或程序错误。Python的threading
模块提供了多种同步机制来解决这个问题:
2.2.1 Lock(锁)
Lock是最简单的同步机制,它确保同一时刻只有一个线程可以执行受保护的代码块:
import threading
import time
# 共享资源
counter = 0
# 创建锁
lock = threading.Lock()
def increment(n):
global counter
for _ in range(n):
# 获取锁
lock.acquire()
try:
# 临界区(受保护的代码)
counter += 1
finally:
# 释放锁
lock.release()
# 也可以使用with语句简化锁的使用
def increment_with(n):
global counter
for _ in range(n):
with lock: # 自动获取和释放锁
counter += 1
# 创建两个线程
t1 = threading.Thread(target=increment, args=(10000,))
t2 = threading.Thread(target=increment, args=(10000,))
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print(f"最终计数: {counter}") # 应该是20000
2.2.2 RLock(可重入锁)
RLock允许同一个线程多次获取锁,而不会导致死锁:
import threading
# 创建可重入锁
rlock = threading.RLock()
def outer_function():
with rlock: # 第一次获取锁
print("外部函数获取锁")
inner_function()
print("外部函数释放锁")
def inner_function():
with rlock: # 同一线程再次获取锁
print("内部函数获取锁")
print("内部函数释放锁")
# 创建线程
t = threading.Thread(target=outer_function)
t.start()
t.join()
如果使用普通的Lock,上面的代码会导致死锁。
2.2.3 Condition(条件变量)
Condition提供了一种线程间通信的机制,允许线程等待特定条件发生:
import threading
import time
import random
# 创建条件变量
condition = threading.Condition()
# 共享资源
items = []
MAX_ITEMS = 10
# 生产者
def producer():
for i in range(20):
with condition: # 获取锁
while len(items) >= MAX_ITEMS: # 检查条件
print("队列已满,生产者等待...")
condition.wait() # 等待消费者消费
item = random.randint(1, 100)
items.append(item)
print(f"生产: {item}, 当前队列: {items}")
condition.notify() # 通知消费者
time.sleep(0.2) # 生产速度控制
# 消费者
def consumer():
for i in range(20):
with condition: # 获取锁
while not items: # 检查条件
print("队列为空,消费者等待...")
condition.wait() # 等待生产者生产
item = items.pop(0)
print(f"消费: {item}, 当前队列: {items}")
condition.notify() # 通知生产者
time.sleep(0.5) # 消费速度控制
# 创建线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
# 启动线程
producer_thread.start()
consumer_thread.start()
# 等待线程结束
producer_thread.join()
consumer_thread.join()
2.2.4 Semaphore(信号量)
Semaphore用于控制同时访问特定资源的线程数量:
import threading
import time
import random
# 创建信号量,限制最多3个线程同时执行
semaphore = threading.Semaphore(3)
def worker(id):
with semaphore: # 获取信号量
print(f"工作线程 {id} 开始执行")
time.sleep(random.uniform(1, 3)) # 模拟工作耗时
print(f"工作线程 {id} 执行完毕")
# 创建10个线程
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待所有线程结束
for t in threads:
t.join()
print("所有工作线程执行完毕")
在上面的代码中,尽管创建了10个线程,但信号量确保同一时刻最多只有3个线程在执行。
2.2.5 Event(事件)
Event用于线程间的简单通信,一个线程发出事件信号,其他线程等待该信号:
import threading
import time
# 创建事件对象
event = threading.Event()
def waiter(id):
print(f"等待线程 {id} 等待事件...")
event.wait() # 等待事件被设置
print(f"等待线程 {id} 收到事件信号,继续执行")
def setter():
print("设置线程准备发出事件信号")
time.sleep(3) # 模拟一些准备工作
print("设置线程发出事件信号")
event.set() # 设置事件,通知所有等待的线程
# 创建多个等待线程
waiters = []
for i in range(5):
t = threading.Thread(target=waiter, args=(i,))
waiters.append(t)
t.start()
# 创建设置线程
setter_thread = threading.Thread(target=setter)
setter_thread.start()
# 等待所有线程结束
for t in waiters:
t.join()
setter_thread.join()
print("所有线程执行完毕")
2.3 线程本地数据
有时我们需要每个线程拥有自己的数据副本,而不是共享数据。Python的threading
模块提供了local
类来实现线程本地存储:
import threading
import time
import random
# 创建线程本地数据
local_data = threading.local()
def worker(id):
# 为当前线程设置本地数据
local_data.x = id
local_data.name = f"Thread-{id}"
# 模拟工作
time.sleep(random.uniform(0.5, 1.5))
# 访问线程本地数据
print(f"线程 {local_data.name} (ID: {local_data.x}) 执行完毕")
# 创建多个线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待所有线程结束
for t in threads:
t.join()
print("所有线程执行完毕")
在上面的代码中,每个线程都有自己的local_data.x
和local_data.name
,互不干扰。
2.4 守护线程
守护线程(Daemon Thread)是一种特殊的线程,当主线程结束时,所有守护线程会自动终止,而不管它们是否执行完毕。这对于一些后台任务非常有用:
import threading
import time
def background_task():
while True:
print("后台任务执行中...")
time.sleep(1)
# 创建守护线程
daemon_thread = threading.Thread(target=background_task)
daemon_thread.daemon = True # 设置为守护线程
daemon_thread.start()
# 主线程工作
print("主线程开始工作")
time.sleep(5)
print("主线程工作完毕")
# 主线程结束后,守护线程会自动终止,无需join
三、concurrent.futures模块详解
Python 3.2引入的concurrent.futures
模块提供了更高级的异步执行接口,包括线程池和进程池。它简化了多线程编程,特别是在需要获取线程执行结果的场景中。
3.1 ThreadPoolExecutor基础
ThreadPoolExecutor
是concurrent.futures
模块中的线程池实现,它管理一组工作线程,将任务分配给这些线程,并跟踪任务的执行状态。
3.1.1 基本用法
import concurrent.futures
import time
def worker(id):
print(f"工作线程 {id} 开始执行")
time.sleep(id) # 模拟不同的工作耗时
return f"工作线程 {id} 的结果"
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务到线程池
futures = {executor.submit(worker, i): i for i in range(5)}
# 获取任务结果
for future in concurrent.futures.as_completed(futures):
id = futures[future]
try:
result = future.result()
print(f"任务 {id} 完成,结果: {result}")
except Exception as e:
print(f"任务 {id} 执行出错: {e}")
print("所有任务执行完毕")
在上面的代码中:
ThreadPoolExecutor
创建了一个最多包含3个线程的线程池submit()
方法提交任务到线程池,返回Future
对象as_completed()
函数按任务完成的顺序产生Future
对象future.result()
获取任务的返回值
3.1.2 使用map方法
ThreadPoolExecutor
的map
方法提供了一种更简洁的方式来并行执行任务:
import concurrent.futures
import time
def worker(id):
print(f"工作线程 {id} 开始执行")
time.sleep(id) # 模拟不同的工作耗时
return f"工作线程 {id} 的结果"
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 使用map方法并行执行任务
results = executor.map(worker, range(5))
# 获取结果(按提交顺序)
for id, result in enumerate(results):
print(f"任务 {id} 完成,结果: {result}")
print("所有任务执行完毕")
map
方法的结果是按照任务提交的顺序返回的,而不是按完成顺序。
3.2 Future对象
Future
对象表示异步执行的结果。它提供了检查任务是否完成、等待任务完成以及获取任务结果的方法。
import concurrent.futures
import time
def worker(id):
print(f"工作线程 {id} 开始执行")
time.sleep(id) # 模拟不同的工作耗时
if id == 3:
raise ValueError("模拟任务失败")
return f"工作线程 {id} 的结果"
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 提交任务
future = executor.submit(worker, 2)
# 检查任务是否完成
print(f"任务是否完成: {future.done()}")
# 等待任务完成(可选超时)
time.sleep(1)
print(f"1秒后,任务是否完成: {future.done()}")
# 获取任务结果(会阻塞直到任务完成)
result = future.result()
print(f"任务结果: {result}")
# 处理异常情况
error_future = executor.submit(worker, 3)
try:
error_result = error_future.result()
except Exception as e:
print(f"任务执行出错: {e}")
3.3 取消任务
Future
对象还允许取消尚未执行的任务:
import concurrent.futures
import time
def worker(id):
print(f"工作线程 {id} 开始执行")
time.sleep(3) # 模拟长时间运行的任务
print(f"工作线程 {id} 执行完毕")
return f"工作线程 {id} 的结果"
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交多个任务
futures = [executor.submit(worker, i) for i in range(5)]
# 取消部分任务
for i, future in enumerate(futures[2:]):
if future.cancel():
print(f"任务 {i+2} 已取消")
else:
print(f"任务 {i+2} 无法取消(可能已开始执行)")
# 等待剩余任务完成
for i, future in enumerate(futures[:2]):
if not future.cancelled():
result = future.result()
print(f"任务 {i} 完成,结果: {result}")
print("所有任务处理完毕")
注意,已经开始执行的任务无法取消。
3.4 超时控制
Future.result()
方法支持超时参数,可以避免长时间等待某个任务:
import concurrent.futures
import time
def worker(id, sleep_time):
print(f"工作线程 {id} 开始执行,预计耗时 {sleep_time} 秒")
time.sleep(sleep_time)
return f"工作线程 {id} 的结果"
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
future1 = executor.submit(worker, 1, 2) # 2秒任务
future2 = executor.submit(worker, 2, 5) # 5秒任务
# 设置超时获取结果
try:
# 等待最多3秒
result1 = future1.result(timeout=3)
print(f"任务1结果: {result1}")
except concurrent.futures.TimeoutError:
print("任务1超时")
try:
# 等待最多3秒
result2 = future2.result(timeout=3)
print(f"任务2结果: {result2}")
except concurrent.futures.TimeoutError:
print("任务2超时")
# 可以选择取消超时任务
future2.cancel()
四、asyncio模块详解
Python 3.4引入的asyncio
模块提供了一种使用协程实现并发的方式,它与传统的多线程有很大不同。asyncio
使用事件循环来调度和管理协程,实现非阻塞的异步编程。
4.1 协程基础
协程(Coroutine)是一种特殊的函数,可以在执行过程中暂停并稍后恢复。在Python中,协程使用async def
语法定义,使用await
关键字暂停执行。
import asyncio
# 定义协程
async def hello_world():
print("Hello")
await asyncio.sleep(1) # 非阻塞的睡眠
print("World")
return "Done"
# 运行协程
async def main():
result = await hello_world()
print(f"结果: {result}")
# 使用asyncio.run运行主协程
asyncio.run(main())
在上面的代码中:
async def
定义了一个协程函数await
用于等待另一个协程的结果asyncio.sleep()
是一个非阻塞的睡眠函数asyncio.run()
用于运行主协程
4.2 并发执行多个协程
asyncio
的强大之处在于可以并发执行多个协程,而不需要创建多个线程:
import asyncio
import time
async def task(name, delay):
print(f"任务 {name} 开始执行")
await asyncio.sleep(delay) # 非阻塞的睡眠
print(f"任务 {name} 执行完毕")
return f"任务 {name} 的结果"
async def main():
start_time = time.time()
# 方法1:使用asyncio.create_task创建任务
task_a = asyncio.create_task(task("A", 2))
task_b = asyncio.create_task(task("B", 3))
# 等待任务完成
result_a = await task_a
result_b = await task_b
# 方法2:使用asyncio.gather并发执行多个协程
results = await asyncio.gather(
task("C", 1),
task("D", 2),
task("E", 3)
)
end_time = time.time()
print(f"所有任务完成,耗时: {end_time - start_time:.2f}秒")
print(f"任务A结果: {result_a}")
print(f"任务B结果: {result_b}")
print(f"任务C、D、E结果: {results}")
# 运行主协程
asyncio.run(main())
在上面的代码中,尽管有5个任务,总共需要11秒(2+3+1+2+3),但实际执行时间约为3秒,因为这些任务是并发执行的。
4.3 asyncio的同步原语
与threading
模块类似,asyncio
也提供了各种同步原语,如锁、事件、信号量等,但这些是专为协程设计的:
import asyncio
# 创建锁
lock = asyncio.Lock()
async def worker(id):
print(f"工作协程 {id} 尝试获取锁")
async with lock: # 异步上下文管理器
print(f"工作协程 {id} 获取了锁")
await asyncio.sleep(1) # 模拟工作
print(f"工作协程 {id} 释放了锁")
async def main():
# 并发执行多个工作协程
await asyncio.gather(
worker(1),
worker(2),
worker(3)
)
# 运行主协程
asyncio.run(main())
4.4 使用aiohttp进行异步HTTP请求
asyncio
最常见的应用场景之一是进行异步HTTP请求。aiohttp
是一个基于asyncio
的异步HTTP客户端/服务器库:
import asyncio
import aiohttp
import time
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.wikipedia.org",
"https://www.reddit.com"
]
start_time = time.time()
results = await fetch_all(urls)
end_time = time.time()
print(f"获取了 {len(results)} 个网页,总大小: {sum(len(r) for r in results)} 字节")
print(f"总耗时: {end_time - start_time:.2f}秒")
# 运行主协程
asyncio.run(main())
注意:使用上面的代码需要先安装aiohttp
库:pip install aiohttp
。
4.5 asyncio的超时控制
asyncio
提供了wait_for
函数来为协程设置超时:
import asyncio
async def long_running_task():
print("长时间运行的任务开始")
await asyncio.sleep(5)
print("长时间运行的任务完成")
return "任务结果"
async def main():
try:
# 设置3秒超时
result = await asyncio.wait_for(long_running_task(), timeout=3)
print(f"任务结果: {result}")
except asyncio.TimeoutError:
print("任务超时")
# 运行主协程
asyncio.run(main())
五、多线程数据采集实战
在本节中,我们将通过一个实际的数据采集案例,展示如何使用不同的多线程方式进行网页数据采集。
5.1 使用threading模块进行网页数据采集
以下是一个使用threading
模块创建多线程爬虫的示例:
import threading
import requests
import time
from queue import Queue
class WebCrawler:
def __init__(self, urls, max_threads=5):
self.urls = urls
self.results = {}
self.queue = Queue()
self.max_threads = max_threads
# 将URL放入队列
for url in urls:
self.queue.put(url)
def crawler_worker(self):
"""工作线程函数,不断从队列获取URL并处理"""
while not self.queue.empty():
# 从队列获取URL
url = self.queue.get()
try:
# 发送HTTP请求
start_time = time.time()
response = requests.get(url, timeout=10)
elapsed_time = time.time() - start_time
# 存储结果
thread_name = threading.current_thread().name
self.results[url] = {
'status_code': response.status_code,
'content_length': len(response.content),
'elapsed_time': elapsed_time,
'thread': thread_name
}
print(f"{thread_name} 完成抓取 {url}, 状态码: {response.status_code}, 用时: {elapsed_time:.2f}秒")
except Exception as e:
print(f"抓取 {url} 时出错: {e}")
self.results[url] = {'error': str(e)}
finally:
# 标记任务完成
self.queue.task_done()
def run(self):
"""启动爬虫"""
start_time = time.time()
# 创建并启动工作线程
threads = []
for i in range(min(self.max_threads, len(self.urls))):
thread = threading.Thread(target=self.crawler_worker, name=f"爬虫线程-{i}")
thread.daemon = True # 设置为守护线程
thread.start()
threads.append(thread)
# 等待队列中所有任务完成
self.queue.join()
# 计算总耗时
total_time = time.time() - start_time
print(f"所有URL抓取完成,总耗时: {total_time:.2f}秒")
return self.results
# 使用示例
if __name__ == "__main__":
# 待抓取的URL列表
urls = [
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.wikipedia.org",
"https://www.reddit.com"
]
# 创建爬虫实例并运行
crawler = WebCrawler(urls, max_threads=3)
results = crawler.run()
# 输出结果统计
success_count = sum(1 for r in results.values() if 'status_code' in r)
error_count = sum(1 for r in results.values() if 'error' in r)
print(f"\n抓取统计:")
print(f"成功: {success_count}, 失败: {error_count}")
# 计算平均响应时间
if success_count > 0:
avg_time = sum(r['elapsed_time'] for r in results.values() if 'elapsed_time' in r) / success_count
print(f"平均响应时间: {avg_time:.2f}秒")
这个爬虫使用了线程池和队列的设计模式,能够高效地并发抓取多个URL。
5.2 使用concurrent.futures进行数据采集
以下是使用concurrent.futures
模块的ThreadPoolExecutor
进行数据采集的示例:
import concurrent.futures
import requests
import time
def fetch_url(url):
"""抓取单个URL的函数"""
try:
start_time = time.time()
response = requests.get(url, timeout=10)
elapsed_time = time.time() - start_time
return {
'url': url,
'status_code': response.status_code,
'content_length': len(response.content),
'elapsed_time': elapsed_time
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
def fetch_all_urls(urls, max_workers=5):
"""并发抓取多个URL"""
start_time = time.time()
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# 获取任务结果
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results[url] = result
if 'status_code' in result:
print(f"完成抓取 {url}, 状态码: {result['status_code']}, 用时: {result['elapsed_time']:.2f}秒")
else:
print(f"抓取 {url} 时出错: {result['error']}")
except Exception as e:
print(f"处理 {url} 结果时出错: {e}")
results[url] = {'error': str(e)}
# 计算总耗时
total_time = time.time() - start_time
print(f"所有URL抓取完成,总耗时: {total_time:.2f}秒")
return results
# 使用示例
if __name__ == "__main__":
# 待抓取的URL列表
urls = [
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.wikipedia.org",
"https://www.reddit.com"
]
# 并发抓取所有URL
results = fetch_all_urls(urls, max_workers=3)
# 输出结果统计
success_count = sum(1 for r in results.values() if 'status_code' in r)
error_count = sum(1 for r in results.values() if 'error' in r)
print(f"\n抓取统计:")
print(f"成功: {success_count}, 失败: {error_count}")
# 计算平均响应时间
if success_count > 0:
avg_time = sum(r['elapsed_time'] for r in results.values() if 'elapsed_time' in r) / success_count
print(f"平均响应时间: {avg_time:.2f}秒")
使用concurrent.futures
的代码更加简洁,不需要手动管理线程和队列。
5.3 使用asyncio进行异步数据采集
以下是使用asyncio
和aiohttp
进行异步数据采集的示例:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""异步抓取单个URL"""
try:
start_time = time.time()
async with session.get(url, timeout=10) as response:
content = await response.read()
elapsed_time = time.time() - start_time
return {
'url': url,
'status_code': response.status,
'content_length': len(content),
'elapsed_time': elapsed_time
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async def fetch_all_urls(urls):
"""异步抓取多个URL"""
start_time = time.time()
results = {}
# 创建会话
async with aiohttp.ClientSession() as session:
# 创建所有任务
tasks = [fetch_url(session, url) for url in urls]
# 等待所有任务完成
for task in asyncio.as_completed(tasks):
result = await task
url = result['url']
results[url] = result
if 'status_code' in result:
print(f"完成抓取 {url}, 状态码: {result['status_code']}, 用时: {result['elapsed_time']:.2f}秒")
else:
print(f"抓取 {url} 时出错: {result['error']}")
# 计算总耗时
total_time = time.time() - start_time
print(f"所有URL抓取完成,总耗时: {total_time:.2f}秒")
return results
# 使用示例
async def main():
# 待抓取的URL列表
urls = [
"https://www.python.org",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.wikipedia.org",
"https://www.reddit.com"
]
# 异步抓取所有URL
results = await fetch_all_urls(urls)
# 输出结果统计
success_count = sum(1 for r in results.values() if 'status_code' in r)
error_count = sum(1 for r in results.values() if 'error' in r)
print(f"\n抓取统计:")
print(f"成功: {success_count}, 失败: {error_count}")
# 计算平均响应时间
if success_count > 0:
avg_time = sum(r['elapsed_time'] for r in results.values() if 'elapsed_time' in r) / success_count
print(f"平均响应时间: {avg_time:.2f}秒")
# 运行主协程
if __name__ == "__main__":
asyncio.run(main())
注意:使用上面的代码需要先安装aiohttp
库:pip install aiohttp
。
六、多线程方式的对比与选择
在Python中,我们有多种方式实现并发编程,每种方式都有其优缺点和适用场景。下面我们将对threading
、concurrent.futures
和asyncio
三种方式进行详细对比。
6.1 编程模型对比
| 特性 | threading | concurrent.futures | asyncio |
|——|———–|——————-|———|
| 编程模型 | 基于线程的并发 | 基于任务的并发 | 基于协程的并发 |
| 语法复杂度 | 中等 | 简单 | 较高(需要async/await) |
| 代码组织 | 需手动管理线程 | 自动管理线程池 | 基于事件循环和协程 |
| 调试难度 | 较难 | 中等 | 中等 |
| 学习曲线 | 平缓 | 平缓 | 陡峭 |
6.2 性能特性对比
| 特性 | threading | concurrent.futures | asyncio |
|——|———–|——————-|———|
| CPU密集型任务 | 受GIL限制,性能较差 | 受GIL限制,性能较差 | 不适合,单线程模型 |
| IO密集型任务 | 适合 | 适合 | 非常适合 |
| 内存占用 | 中等 | 中等 | 低 |
| 上下文切换开销 | 高 | 高 | 低(协程切换) |
| 并发连接数上限 | 受系统线程数限制 | 受线程池大小限制 | 可支持大量并发(数千级) |
| 启动开销 | 中等 | 中等 | 低 |
6.3 功能特性对比
| 特性 | threading | concurrent.futures | asyncio |
|——|———–|——————-|———|
| 线程同步机制 | 丰富(Lock, Event, Semaphore等) | 简单(Future对象) | 丰富(Lock, Event, Semaphore等) |
| 取消任务 | 困难 | 支持(cancel方法) | 支持(Task.cancel) |
| 获取返回值 | 需自行实现 | 内置支持(Future.result) | 内置支持(await) |
| 异常处理 | 复杂 | 简单(Future.exception) | 简单(try/except with await) |
| 超时控制 | 需自行实现 | 支持 | 内置支持(asyncio.wait_for) |
| 批量任务执行 | 需自行实现 | 支持(map, as_completed) | 支持(gather, as_completed) |
6.4 适用场景对比
| 场景 | threading | concurrent.futures | asyncio |
|——|———–|——————-|———|
| 网络爬虫 | 适合 | 适合 | 非常适合 |
| 文件IO操作 | 适合 | 适合 | 适合(需使用aiofiles) |
| 数据库操作 | 适合 | 适合 | 适合(需使用异步驱动) |
| 图像处理 | 不适合 | 不适合(建议用ProcessPoolExecutor) | 不适合 |
| 实时通信 | 适合 | 适合 | 非常适合 |
| 大量HTTP请求 | 适合 | 适合 | 非常适合 |
| GUI应用 | 适合 | 适合 | 不太适合 |
| 长连接服务 | 适合 | 适合 | 非常适合 |
6.5 选择建议
选择threading:
需要与现有阻塞式代码集成
任务涉及共享状态且需要精细控制
并发量较小(几十个线程)
主要是IO密集型任务
选择concurrent.futures:
需要简单易用的API
需要同时支持线程池和进程池
需要获取任务执行结果
任务相对独立,无需复杂协调
选择asyncio:
需要处理大量并发连接(数百或数千)
主要是网络IO或其他IO密集型任务
可以使用异步库(如aiohttp, asyncpg等)
对性能和资源利用率有较高要求
混合使用:
应用场景复杂,既有IO密集型又有CPU密集型任务
需要处理大量并发连接同时又有计算密集型处理
例如:使用asyncio处理网络请求,使用ProcessPoolExecutor处理数据分析
七、多线程与多进程的区别
在Python中,除了多线程外,多进程也是一种常用的并行处理方式。了解它们的区别有助于我们在不同场景下做出正确的选择。
7.1 概念区别
- 多线程:在同一进程内的多个执行流,共享内存空间
- 多进程:多个独立的进程,各自有独立的内存空间
7.2 GIL的影响
- 多线程:受Python的GIL限制,同一时刻只有一个线程执行Python字节码
- 多进程:每个进程有自己的Python解释器和GIL,可以真正并行执行
7.3 资源消耗
多线程:
内存占用小(共享进程内存)
创建和切换开销小
进程间通信简单(直接共享内存)
多进程:
内存占用大(每个进程独立内存)
创建和切换开销大
进程间通信复杂(需要IPC机制)
7.4 适用场景
多线程适合:
IO密集型任务(网络请求、文件读写等)
需要共享数据的任务
资源有限的环境
多进程适合:
CPU密集型任务(计算、图像处理等)
需要隔离的任务
多核CPU环境
7.5 实现方式
Python提供了multiprocessing
模块来实现多进程编程,其API设计与threading
模块类似:
import multiprocessing
import time
def worker(name):
print(f"进程 {name} 开始工作")
time.sleep(2) # 模拟工作耗时
print(f"进程 {name} 工作完成")
if __name__ == "__main__":
# 创建多个进程
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(f"Process-{i}",))
processes.append(p)
p.start()
# 等待所有进程结束
for p in processes:
p.join()
print("所有进程执行完毕")
concurrent.futures
模块也提供了ProcessPoolExecutor
,使用方式与ThreadPoolExecutor
几乎相同:
import concurrent.futures
import time
def worker(id):
print(f"进程 {id} 开始执行")
time.sleep(id) # 模拟不同的工作耗时
return f"进程 {id} 的结果"
if __name__ == "__main__":
# 创建进程池
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
# 提交任务到进程池
futures = {executor.submit(worker, i): i for i in range(5)}
# 获取任务结果
for future in concurrent.futures.as_completed(futures):
id = futures[future]
try:
result = future.result()
print(f"任务 {id} 完成,结果: {result}")
except Exception as e:
print(f"任务 {id} 执行出错: {e}")
print("所有任务执行完毕")
7.6 选择建议
选择多线程当:
任务主要是IO密集型
需要频繁共享数据
对资源消耗敏感
选择多进程当:
任务主要是CPU密集型
需要充分利用多核CPU
任务之间相对独立,通信需求少
混合使用当:
复杂应用场景,既有IO密集型又有CPU密集型任务
例如:使用多线程处理网络请求,使用多进程处理数据分析
总结
Python提供了多种实现并发的方式,每种方式都有其特点和适用场景:
threading模块:提供了基础的多线程支持,适合IO密集型任务和需要精细控制的场景。
concurrent.futures模块:提供了更高级的线程池和进程池接口,简化了并发编程,适合需要获取任务结果的场景。
asyncio模块:提供了基于协程的异步编程模型,适合处理大量并发连接的IO密集型任务。
multiprocessing模块:提供了多进程支持,适合CPU密集型任务和需要绕过GIL限制的场景。
在实际应用中,应根据任务特性、并发需求和性能要求选择合适的并发方式。对于复杂应用,混合使用多种并发方式往往能够获得最佳性能。
无论选择哪种方式,都需要注意线程安全、资源管理和异常处理等问题,以确保程序的正确性和稳定性。
希望本教程能够帮助你理解Python多线程并发编程的基本概念和实践方法,为你的Python开发之旅增添新的工具和视角。
参考资料
- Python官方文档 - threading模块: https://docs.python.org/3/library/threading.html
- Python官方文档 - concurrent.futures模块: https://docs.python.org/3/library/concurrent.futures.html
- Python官方文档 - asyncio模块: https://docs.python.org/3/library/asyncio.html
- Python官方文档 - multiprocessing模块: https://docs.python.org/3/library/multiprocessing.html
- Real Python - An Intro to Threading in Python: https://realpython.com/intro-to-python-threading/
1.5 并发 vs. 并行
在讨论多线程时,区分并发(Concurrency)和并行(Parallelism)非常重要:
- 并发:指系统能够处理多个任务的能力,这些任务可能在时间上重叠执行,但不一定同时执行。例如,通过快速切换在单个CPU核心上运行多个线程。
- 并行:指系统能够同时执行多个任务的能力,通常需要多个CPU核心。例如,在多核处理器上同时运行多个进程或(在没有GIL限制的情况下)多个线程。
由于Python的GIL,CPython中的多线程主要实现的是并发,而不是真正的并行(对于CPU密集型任务)。然而,对于IO密集型任务,线程在等待IO时会释放GIL,允许其他线程运行,从而实现有效的并发。
1.6 深入理解GIL
全局解释器锁(GIL)是CPython实现中的一个互斥锁,用于保护对Python对象的访问。它的存在主要是为了简化CPython的内存管理机制(特别是引用计数)。
为什么存在GIL?
- 简化内存管理:GIL使得CPython的引用计数机制变得线程安全,开发者不需要担心多个线程同时修改同一个对象的引用计数。
- 简化C扩展开发:许多Python的C扩展库依赖于GIL提供的线程安全保证,移除GIL可能会破坏这些库。
- 历史原因:在单核CPU时代,GIL提供了一种简单有效的方式来实现多线程,并且对于IO密集型任务性能良好。
GIL的深远影响:
- CPU密集型任务瓶颈:对于需要大量计算的任务,GIL使得多线程无法利用多核CPU的优势,性能甚至可能比单线程更差(因为线程切换开销)。
- IO密集型任务优势:对于涉及大量等待(如网络、磁盘IO)的任务,线程在等待时会释放GIL,允许其他线程运行Python代码,因此多线程可以显著提高效率。
- 解释器选择:其他Python解释器,如Jython(运行在JVM上)和IronPython(运行在.NET CLR上),没有GIL,可以在多核CPU上实现真正的线程并行。此外,CPython社区也在进行移除GIL的实验(如PEP 703),但目前仍处于实验阶段。
为什么仍然使用多线程?
尽管有GIL,但在以下情况下,多线程仍然是Python中并发编程的有效选择:
- 处理IO密集型任务:这是Python多线程最主要的优势所在。网络请求、文件读写、数据库交互等任务,大部分时间都在等待外部设备响应,此时线程会释放GIL,让其他线程有机会执行。
- 简化并发模型:相比多进程,多线程共享内存,数据共享和通信更简单直接(但也需要注意线程安全)。
- 资源消耗较低:线程的创建和上下文切换开销通常比进程小。
- 与现有库的兼容性:许多Python库是基于阻塞IO设计的,使用多线程可以方便地将这些库用于并发场景。
1.7 线程生命周期详解
让我们更详细地了解线程状态之间的转换:
- 新建(New):当创建
threading.Thread
对象时,线程处于新建状态。此时,操作系统尚未为其分配资源。t = threading.Thread(target=some_function) # 此时 t 处于新建状态
- 就绪(Runnable):调用线程对象的
start()
方法后,线程进入就绪状态。它已经具备了运行所需的所有资源,等待操作系统的CPU调度器分配时间片。t.start() # 此时 t 进入就绪状态,等待被调度
- 运行(Running):当CPU调度器选中一个就绪状态的线程时,该线程进入运行状态,开始执行其
run()
方法(或target
指定的函数)。线程可以在运行状态和就绪状态之间快速切换(时间片轮转)。 - 阻塞(Blocked):线程在运行过程中,如果遇到某些条件无法继续执行,就会进入阻塞状态。常见导致阻塞的情况包括:
- 调用
time.sleep()
- 等待IO操作完成(如
requests.get()
, 文件读写) - 尝试获取一个已被其他线程持有的锁(
lock.acquire()
) - 等待条件变量或事件(
condition.wait()
,event.wait()
) - 调用另一个线程的
join()
方法
当阻塞条件解除时(如IO完成、锁被释放、事件被设置),线程会重新回到就绪状态,等待再次被调度。
- 调用
- 终止(Terminated):线程的
run()
方法执行完毕,或者在执行过程中抛出未被捕获的异常,线程就进入终止状态。终止的线程无法再次启动。
理解这些状态转换对于分析多线程程序的行为和调试问题至关重要。
二、threading模块详解(续)
2.5 线程间通信 - Queue
除了同步原语外,线程间通信是多线程编程中的另一个重要方面。Python的queue
模块提供了线程安全的队列实现,是线程间传递数据的理想选择:
import threading
import queue
import time
import random
# 创建线程安全的队列
task_queue = queue.Queue()
result_queue = queue.Queue()
# 生产者线程:生成任务
def producer(num_tasks):
for i in range(num_tasks):
task = f"Task-{i}"
task_queue.put(task)
print(f"生产者生成任务: {task}")
time.sleep(random.uniform(0.1, 0.5)) # 随机延迟
# 添加结束标记
for _ in range(3): # 假设有3个消费者线程
task_queue.put(None) # 结束标记
# 消费者线程:处理任务
def consumer():
while True:
# 从队列获取任务
task = task_queue.get()
# 检查结束标记
if task is None:
task_queue.task_done()
break
# 处理任务
print(f"消费者 {threading.current_thread().name} 处理任务: {task}")
time.sleep(random.uniform(0.5, 1.5)) # 模拟处理时间
# 将结果放入结果队列
result = f"Result of {task}"
result_queue.put(result)
# 标记任务完成
task_queue.task_done()
# 创建生产者线程
producer_thread = threading.Thread(target=producer, args=(10,))
# 创建多个消费者线程
consumer_threads = []
for i in range(3):
t = threading.Thread(target=consumer, name=f"Consumer-{i}")
consumer_threads.append(t)
# 启动所有线程
producer_thread.start()
for t in consumer_threads:
t.start()
# 等待所有任务处理完毕
task_queue.join()
# 等待所有线程结束
producer_thread.join()
for t in consumer_threads:
t.join()
# 处理结果
print("\n处理结果:")
while not result_queue.empty():
result = result_queue.get()
print(f"获取结果: {result}")
Queue
类提供了几个重要方法:
put(item)
: 将项目放入队列,如果队列已满,会阻塞get()
: 从队列获取项目,如果队列为空,会阻塞task_done()
: 表示之前入队的一个任务已经完成join()
: 阻塞直到队列中所有项目都被处理完(即每个put()
都有对应的task_done()
调用)
除了基本的Queue
,queue
模块还提供了其他类型的队列:
LifoQueue
: 后进先出队列(栈)PriorityQueue
: 优先级队列,项目按优先级排序SimpleQueue
: 简单的FIFO队列,不支持task_done()
和join()
2.6 线程池实现
在使用concurrent.futures
模块之前,我们可以使用threading
模块自己实现一个简单的线程池:
import threading
import queue
import time
import random
class ThreadPool:
def __init__(self, num_threads):
self.task_queue = queue.Queue()
self.threads = []
self.shutdown_flag = threading.Event()
# 创建工作线程
for i in range(num_threads):
thread = threading.Thread(target=self._worker, name=f"Worker-{i}")
thread.daemon = True # 设置为守护线程
thread.start()
self.threads.append(thread)
def _worker(self):
"""工作线程函数,不断从队列获取任务并执行"""
while not self.shutdown_flag.is_set():
try:
# 从队列获取任务(最多等待1秒)
func, args, kwargs, result_container = self.task_queue.get(timeout=1)
try:
# 执行任务
result = func(*args, **kwargs)
# 存储结果
if result_container is not None:
result_container.append(result)
except Exception as e:
print(f"任务执行出错: {e}")
# 标记任务完成
self.task_queue.task_done()
except queue.Empty:
# 队列为空,继续等待
continue
def submit(self, func, *args, **kwargs):
"""提交任务到线程池"""
result_container = []
self.task_queue.put((func, args, kwargs, result_container))
return result_container # 可以通过这个容器获取结果
def shutdown(self, wait=True):
"""关闭线程池"""
self.shutdown_flag.set()
if wait:
# 等待所有任务完成
self.task_queue.join()
# 等待所有线程结束
for thread in self.threads:
thread.join()
# 示例任务函数
def task(id, sleep_time):
thread_name = threading.current_thread().name
print(f"{thread_name} 执行任务 {id},将耗时 {sleep_time:.2f} 秒")
time.sleep(sleep_time)
return f"任务 {id} 的结果"
# 使用线程池
pool = ThreadPool(3) # 创建包含3个线程的线程池
# 提交多个任务
results = []
for i in range(10):
sleep_time = random.uniform(0.5, 2.0)
result_container = pool.submit(task, i, sleep_time)
results.append(result_container)
# 等待所有任务完成并获取结果
time.sleep(1) # 等待一些任务开始执行
print("\n等待所有任务完成...")
pool.shutdown()
# 打印结果
print("\n所有任务结果:")
for i, result_container in enumerate(results):
if result_container: # 如果任务成功完成
print(f"任务 {i} 结果: {result_container[0]}")
else:
print(f"任务 {i} 未完成或出错")
这个简单的线程池实现展示了线程池的基本原理:维护一个工作线程队列和一个任务队列,工作线程不断从任务队列获取任务并执行。
2.7 线程安全与竞态条件
多线程编程中最常见的问题之一是竞态条件(Race Condition)。当多个线程同时访问和修改共享数据时,如果没有适当的同步机制,可能会导致数据不一致或程序错误。
以下是一个典型的竞态条件示例:
import threading
# 共享资源
counter = 0
def increment(n):
global counter
for _ in range(n):
# 以下三行代码不是原子操作,可能导致竞态条件
current = counter # 读取当前值
current += 1 # 增加
counter = current # 写回
# 创建两个线程
t1 = threading.Thread(target=increment, args=(100000,))
t2 = threading.Thread(target=increment, args=(100000,))
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print(f"最终计数: {counter}") # 预期是200000,但实际可能小于这个值
在上面的代码中,increment
函数中的三行代码(读取、增加、写回)不是原子操作。如果两个线程几乎同时执行这些操作,可能会导致一个线程的更改被另一个线程覆盖,最终结果小于预期。
解决竞态条件的方法是使用同步机制,如锁:
import threading
# 共享资源
counter = 0
# 创建锁
lock = threading.Lock()
def increment(n):
global counter
for _ in range(n):
with lock: # 使用锁保护临界区
counter += 1 # 现在这个操作是线程安全的
# 创建两个线程
t1 = threading.Thread(target=increment, args=(100000,))
t2 = threading.Thread(target=increment, args=(100000,))
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print(f"最终计数: {counter}") # 现在应该正确显示200000
2.8 死锁及其预防
死锁(Deadlock)是多线程编程中另一个常见问题。当两个或多个线程互相等待对方持有的资源时,就会发生死锁,导致所有相关线程永远阻塞。
以下是一个死锁示例:
import threading
import time
# 创建两个锁
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1_function():
print("线程1尝试获取锁1")
with lock1:
print("线程1获取了锁1")
time.sleep(0.5) # 模拟一些工作
print("线程1尝试获取锁2")
with lock2:
print("线程1获取了锁2")
print("线程1执行临界区代码")
def thread2_function():
print("线程2尝试获取锁2")
with lock2:
print("线程2获取了锁2")
time.sleep(0.5) # 模拟一些工作
print("线程2尝试获取锁1")
with lock1:
print("线程2获取了锁1")
print("线程2执行临界区代码")
# 创建线程
t1 = threading.Thread(target=thread1_function)
t2 = threading.Thread(target=thread2_function)
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print("程序正常结束") # 如果发生死锁,这行代码可能永远不会执行
在上面的代码中,线程1先获取锁1,然后尝试获取锁2;同时,线程2先获取锁2,然后尝试获取锁1。如果两个线程的执行时机恰好导致它们各自获取了一个锁,然后等待另一个锁,就会发生死锁。
预防死锁的方法:
- 固定锁的获取顺序:确保所有线程按照相同的顺序获取锁。
def thread1_function():
with lock1:
with lock2:
# 临界区代码
pass
def thread2_function():
with lock1: # 注意这里也是先获取lock1
with lock2:
# 临界区代码
pass
- 使用超时:在获取锁时设置超时,如果超时未获取到锁,则释放已持有的锁并重试。
def thread_function():
while True:
if lock1.acquire(timeout=1):
try:
if lock2.acquire(timeout=1):
try:
# 临界区代码
break # 成功获取两个锁,执行完毕后退出循环
finally:
lock2.release()
else:
# 未能获取lock2,稍后重试
print("未能获取锁2,重试")
finally:
lock1.release()
else:
# 未能获取lock1,稍后重试
print("未能获取锁1,重试")
# 随机延迟,避免活锁
time.sleep(random.uniform(0, 0.1))
使用RLock:如果同一个线程需要多次获取同一个锁,使用
RLock
而不是Lock
。使用更高级的同步原语:如
Condition
、Semaphore
或Event
,它们可能更适合特定的同步需求。减少锁的粒度:尽量减小临界区的大小,只在必要的代码段使用锁。
三、concurrent.futures模块详解(续)
3.5 错误处理与异常传播
在使用concurrent.futures
时,任务中的异常不会立即抛出,而是存储在对应的Future
对象中,直到调用result()
方法时才会引发。这种设计允许我们以更可控的方式处理并发任务中的错误。
import concurrent.futures
import time
import random
def task(id):
print(f"任务 {id} 开始执行")
time.sleep(random.uniform(0.5, 1.5))
# 随机生成错误
if random.random() < 0.3: # 30%的概率出错
raise ValueError(f"任务 {id} 执行出错")
return f"任务 {id} 的结果"
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = [executor.submit(task, i) for i in range(10)]
# 处理结果和异常
for i, future in enumerate(futures):
try:
result = future.result()
print(f"任务 {i} 成功: {result}")
except Exception as e:
print(f"任务 {i} 失败: {e}")
# 另一种处理方式:使用as_completed
print("\n使用as_completed处理结果:")
for future in concurrent.futures.as_completed(futures):
# 找出对应的任务ID
for i, f in enumerate(futures):
if f is future:
task_id = i
break
try:
result = future.result()
print(f"任务 {task_id} 成功: {result}")
except Exception as e:
print(f"任务 {task_id} 失败: {e}")
3.6 上下文管理器与资源管理
ThreadPoolExecutor
实现了上下文管理器协议,可以与with
语句一起使用,确保线程池在使用后正确关闭:
import concurrent.futures
import time
def task(id):
print(f"任务 {id} 开始执行")
time.sleep(1)
return f"任务 {id} 的结果"
# 使用上下文管理器
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
# 上下文管理器结束时,会自动调用executor.shutdown(wait=True)
# 不使用上下文管理器
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
try:
futures = [executor.submit(task, i) for i in range(5)]
# 处理结果...
finally:
# 手动关闭线程池
executor.shutdown(wait=True)
shutdown(wait=True)
方法会等待所有提交的任务完成后再关闭线程池。如果设置wait=False
,则会立即返回,但线程池会继续在后台运行,直到所有任务完成。
3.7 线程池大小的选择
线程池大小的选择对性能有重要影响。太少的线程可能无法充分利用并发潜力,而太多的线程可能导致过多的上下文切换开销。
线程池大小的经验法则:
- IO密集型任务:线程池大小可以相对较大,通常是CPU核心数的2-4倍,因为大部分时间线程都在等待IO操作完成。
- CPU密集型任务:由于GIL的限制,对于纯Python代码,线程池大小通常不应超过CPU核心数(甚至可能是1)。如果任务涉及到释放GIL的C扩展(如NumPy操作),则可以适当增加线程数。
以下是一个根据任务类型动态调整线程池大小的示例:
import concurrent.futures
import os
import time
import numpy as np
def io_bound_task(id):
"""IO密集型任务"""
print(f"IO任务 {id} 开始执行")
time.sleep(1) # 模拟IO操作
return f"IO任务 {id} 完成"
def cpu_bound_task(id):
"""CPU密集型任务"""
print(f"CPU任务 {id} 开始执行")
# 执行一些计算密集型操作
result = 0
for i in range(10000000):
result += i
return f"CPU任务 {id} 完成,结果: {result}"
def numpy_task(id):
"""使用NumPy的CPU密集型任务(会释放GIL)"""
print(f"NumPy任务 {id} 开始执行")
# NumPy操作会释放GIL
a = np.random.rand(1000, 1000)
b = np.random.rand(1000, 1000)
c = np.dot(a, b) # 矩阵乘法
return f"NumPy任务 {id} 完成"
# 获取CPU核心数
cpu_count = os.cpu_count()
print(f"CPU核心数: {cpu_count}")
# 对于IO密集型任务,使用较多的线程
print("\n执行IO密集型任务:")
with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count * 4) as executor:
start_time = time.time()
results = list(executor.map(io_bound_task, range(20)))
end_time = time.time()
print(f"IO密集型任务总耗时: {end_time - start_time:.2f}秒")
# 对于纯Python的CPU密集型任务,使用较少的线程
print("\n执行CPU密集型任务:")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
start_time = time.time()
results = list(executor.map(cpu_bound_task, range(4)))
end_time = time.time()
print(f"CPU密集型任务总耗时: {end_time - start_time:.2f}秒")
# 对于使用NumPy等释放GIL的CPU密集型任务,可以使用更多线程
print("\n执行NumPy任务:")
with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count) as executor:
start_time = time.time()
results = list(executor.map(numpy_task, range(cpu_count * 2)))
end_time = time.time()
print(f"NumPy任务总耗时: {end_time - start_time:.2f}秒")
四、asyncio模块详解(续)
4.6 事件循环深入理解
asyncio
的核心是事件循环(Event Loop),它负责调度和执行协程、处理IO事件、执行网络操作等。理解事件循环的工作原理对于高效使用asyncio
至关重要。
import asyncio
import time
async def demo():
print("协程开始执行")
await asyncio.sleep(1)
print("协程执行完毕")
return "结果"
# 获取事件循环
loop = asyncio.get_event_loop()
# 方法1:在事件循环中运行协程直到完成
result = loop.run_until_complete(demo())
print(f"结果: {result}")
# 方法2:创建任务并添加到事件循环
task = loop.create_task(demo())
loop.run_until_complete(task)
print(f"任务结果: {task.result()}")
# 方法3:使用asyncio.run(推荐,Python 3.7+)
# 这会创建一个新的事件循环,运行协程,然后关闭事件循环
result = asyncio.run(demo())
print(f"asyncio.run结果: {result}")
# 手动操作事件循环(不推荐,但有助于理解)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(demo())
finally:
loop.close()
在现代Python中(3.7+),推荐使用asyncio.run()
来运行协程,它会自动处理事件循环的创建和关闭。
4.7 协程与任务的区别
在asyncio
中,协程(Coroutine)和任务(Task)是两个相关但不同的概念:
- 协程:使用
async def
定义的函数,可以在执行过程中暂停(使用await
)并稍后恢复。协程本身不会自动执行,需要被事件循环调度。 - 任务:是对协程的包装,表示一个正在由事件循环调度的协程。任务是
Future
的子类,可以被等待、取消,并可以添加完成回调。
import asyncio
async def my_coroutine():
print("协程开始执行")
await asyncio.sleep(1)
print("协程执行完毕")
return "协程结果"
async def main():
# 直接等待协程
result1 = await my_coroutine()
print(f"直接等待结果: {result1}")
# 创建任务
task = asyncio.create_task(my_coroutine())
# 等待任务完成
result2 = await task
print(f"任务结果: {result2}")
# 创建多个任务并并发执行
task1 = asyncio.create_task(my_coroutine())
task2 = asyncio.create_task(my_coroutine())
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
print(f"多任务结果: {results}")
asyncio.run(main())
使用任务的主要优势是可以实现真正的并发执行,而直接等待协程是顺序执行的。
4.8 异步上下文管理器
asyncio
支持异步上下文管理器,可以在进入和退出上下文时执行异步操作:
import asyncio
class AsyncContextManager:
async def __aenter__(self):
print("异步进入上下文")
await asyncio.sleep(1) # 模拟异步操作
return "上下文资源"
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("异步退出上下文")
await asyncio.sleep(0.5) # 模拟异步清理操作
# 返回True表示异常已处理,False表示异常需要传播
return False
async def main():
# 使用异步上下文管理器
async with AsyncContextManager() as resource:
print(f"获取资源: {resource}")
await asyncio.sleep(1)
print("在上下文中执行操作")
print("上下文之外")
asyncio.run(main())
asyncio
库中的许多对象都实现了异步上下文管理器协议,如Lock
、Semaphore
等。
4.9 异步迭代器
asyncio
还支持异步迭代器,允许在for
循环中使用await
:
import asyncio
class AsyncIterator:
def __init__(self, limit):
self.limit = limit
self.counter = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.counter < self.limit:
self.counter += 1
await asyncio.sleep(0.5) # 模拟异步操作
return f"Item {self.counter}"
else:
raise StopAsyncIteration
async def main():
# 使用异步for循环
async for item in AsyncIterator(5):
print(f"收到: {item}")
# 手动使用异步迭代器
iterator = AsyncIterator(3)
try:
while True:
item = await iterator.__anext__()
print(f"手动迭代: {item}")
except StopAsyncIteration:
print("迭代结束")
asyncio.run(main())
4.10 与其他异步库的集成
asyncio
可以与其他异步库集成,如aiohttp
(HTTP客户端/服务器)、aiofiles
(文件IO)、asyncpg
(PostgreSQL数据库)等。
以下是一个使用aiofiles
进行异步文件IO的示例:
import asyncio
import aiofiles
import time
async def read_file(filename):
async with aiofiles.open(filename, mode='r') as f:
content = await f.read()
return content
async def write_file(filename, content):
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def process_files(filenames):
# 并发读取多个文件
tasks = [read_file(filename) for filename in filenames]
contents = await asyncio.gather(*tasks)
# 处理内容
processed_contents = [content.upper() for content in contents]
# 并发写入处理后的内容
write_tasks = [
write_file(f"processed_{filename}", content)
for filename, content in zip(filenames, processed_contents)
]
await asyncio.gather(*write_tasks)
async def main():
# 创建一些测试文件
test_files = ["test1.txt", "test2.txt", "test3.txt"]
for i, filename in enumerate(test_files):
with open(filename, 'w') as f:
f.write(f"This is test file {i+1}.\nIt contains some text for testing async file IO.")
# 处理文件
start_time = time.time()
await process_files(test_files)
end_time = time.time()
print(f"异步处理 {len(test_files)} 个文件,耗时: {end_time - start_time:.2f}秒")
# 验证结果
for filename in test_files:
processed_filename = f"processed_{filename}"
with open(processed_filename, 'r') as f:
content = f.read()
print(f"{processed_filename} 内容: {content[:50]}...")
# 注意:需要先安装aiofiles库:pip install aiofiles
# asyncio.run(main())
print("注意:运行此代码需要先安装aiofiles库:pip install aiofiles")
八、多线程调试与常见问题
8.1 多线程程序的调试技巧
调试多线程程序比调试单线程程序更具挑战性,因为线程的执行顺序是不确定的,问题可能难以重现。以下是一些有用的调试技巧:
- 使用日志而不是print:使用Python的
logging
模块,它是线程安全的,并且可以包含线程ID、时间戳等信息。
import logging
import threading
import time
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
)
def worker(name):
logging.debug(f"线程 {name} 开始执行")
time.sleep(1)
logging.debug(f"线程 {name} 执行完毕")
# 创建线程
threads = []
for i in range(3):
t = threading.Thread(target=worker, name=f"Thread-{i}")
threads.append(t)
t.start()
# 等待所有线程结束
for t in threads:
t.join()
logging.debug("所有线程执行完毕")
使用线程名称:给线程设置有意义的名称,便于在日志和调试器中识别。
使用调试器:Python的调试器(如pdb)可以用于调试多线程程序,但需要注意线程切换可能导致的复杂性。
线程追踪:使用
threading.settrace()
和threading.setprofile()
可以为所有线程设置跟踪函数。使用
threading.enumerate()
和threading.current_thread()
:这些函数可以帮助你了解当前活动的线程。
import threading
import time
def list_threads():
"""列出当前所有活动线程"""
current = threading.current_thread()
print(f"当前线程: {current.name} (ID: {current.ident})")
print("\n所有活动线程:")
for thread in threading.enumerate():
print(f"- {thread.name} (ID: {thread.ident}, 守护线程: {thread.daemon})")
def worker(name, delay):
print(f"线程 {name} 开始执行")
time.sleep(delay)
list_threads() # 列出线程
print(f"线程 {name} 执行完毕")
# 主线程
list_threads()
# 创建线程
threads = []
for i in range(3):
t = threading.Thread(target=worker, name=f"Worker-{i}", args=(f"Worker-{i}", i+1))
threads.append(t)
t.start()
# 等待一段时间
time.sleep(0.5)
list_threads() # 列出线程
# 等待所有线程结束
for t in threads:
t.join()
print("所有线程执行完毕")
list_threads() # 列出线程
8.2 常见的多线程问题及解决方案
8.2.1 竞态条件(Race Condition)
问题:多个线程同时访问和修改共享数据,导致数据不一致。
解决方案:
- 使用锁(
Lock
、RLock
)保护共享资源 - 使用线程安全的数据结构(如
Queue
) - 使用线程本地存储(
threading.local()
)避免共享
8.2.2 死锁(Deadlock)
问题:两个或多个线程互相等待对方持有的资源,导致永久阻塞。
解决方案:
- 按固定顺序获取锁
- 使用超时机制
- 使用
RLock
代替Lock
(适用于同一线程多次获取锁的情况) - 减少锁的使用范围和持有时间
8.2.3 活锁(Livelock)
问题:线程不断响应彼此的操作,但无法向前推进。
解决方案:
- 在重试逻辑中添加随机延迟
- 实现退避策略(如指数退避)
import threading
import time
import random
def worker_with_backoff(id, shared_resource, lock):
backoff = 0.1 # 初始退避时间
max_backoff = 2.0 # 最大退避时间
while True:
if lock.acquire(timeout=0.1): # 尝试获取锁,设置超时
try:
# 使用共享资源
print(f"线程 {id} 获取了锁,正在使用共享资源")
shared_resource['value'] += 1
time.sleep(0.1) # 模拟工作
print(f"线程 {id} 完成工作,共享资源值: {shared_resource['value']}")
break # 成功完成,退出循环
finally:
lock.release()
else:
# 未能获取锁,实现指数退避
jitter = random.uniform(0, 0.1) # 添加随机抖动
sleep_time = min(backoff + jitter, max_backoff)
print(f"线程 {id} 未能获取锁,等待 {sleep_time:.2f} 秒后重试")
time.sleep(sleep_time)
backoff = min(backoff * 2, max_backoff) # 指数增长,但不超过最大值
# 共享资源
resource = {'value': 0}
lock = threading.Lock()
# 创建线程
threads = []
for i in range(5):
t = threading.Thread(target=worker_with_backoff, args=(i, resource, lock))
threads.append(t)
t.start()
# 等待所有线程结束
for t in threads:
t.join()
print(f"最终共享资源值: {resource['value']}")
8.2.4 线程饥饿(Starvation)
问题:某些线程长时间无法获取所需资源,导致任务无法进展。
解决方案:
- 使用公平锁(Python标准库中没有直接提供,但可以自行实现)
- 使用超时机制
- 适当调整线程优先级(Python中较难直接控制)
8.2.5 GIL相关问题
问题:Python的GIL限制了多线程在CPU密集型任务上的性能。
解决方案:
- 对于CPU密集型任务,使用多进程代替多线程
- 使用C扩展(如NumPy、Pandas)进行计算密集型操作,这些库在计算时会释放GIL
- 考虑使用其他Python实现(如Jython、IronPython)或其他语言
8.2.6 线程安全问题
问题:某些Python标准库和第三方库可能不是线程安全的。
解决方案:
- 查阅库文档,确认线程安全性
- 使用锁保护对非线程安全库的调用
- 考虑使用线程本地存储或每个线程一个实例
import threading
import sqlite3 # sqlite3在多线程环境中需要小心使用
# 使用线程本地存储
local_data = threading.local()
def get_connection():
"""获取线程本地的数据库连接"""
if not hasattr(local_data, 'connection'):
local_data.connection = sqlite3.connect(':memory:')
# 创建表
cursor = local_data.connection.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, value TEXT)')
local_data.connection.commit()
return local_data.connection
def worker(name, values):
"""每个线程使用自己的数据库连接"""
conn = get_connection()
cursor = conn.cursor()
# 插入数据
for value in values:
cursor.execute('INSERT INTO test (value) VALUES (?)', (f"{name}-{value}",))
conn.commit()
# 查询数据
cursor.execute('SELECT * FROM test')
results = cursor.fetchall()
print(f"线程 {name} 查询结果: {results}")
# 创建线程
threads = []
for i in range(3):
values = [f"value-{j}" for j in range(i*3, (i+1)*3)]
t = threading.Thread(target=worker, args=(f"Thread-{i}", values))
threads.append(t)
t.start()
# 等待所有线程结束
for t in threads:
t.join()
print("所有线程执行完毕")
8.3 性能监控与优化
监控和优化多线程程序的性能是一项重要任务。以下是一些有用的技巧:
- 使用
cProfile
进行性能分析:
import cProfile
import threading
import time
def worker(count):
"""模拟工作线程"""
total = 0
for i in range(count):
total += i
time.sleep(0.1) # 模拟IO操作
return total
def run_threads(num_threads, count_per_thread):
"""运行多个线程"""
threads = []
for i in range(num_threads):
t = threading.Thread(target=worker, args=(count_per_thread,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 使用cProfile分析性能
cProfile.run('run_threads(5, 1000000)')
- 使用
time
模块测量执行时间:
import time
import threading
def measure_time(func, *args, **kwargs):
"""测量函数执行时间的装饰器"""
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} 执行耗时: {end_time - start_time:.4f}秒")
return result
@measure_time
def run_in_threads(func, args_list, num_threads):
"""在线程池中运行多个任务"""
# 实现一个简单的线程池
results = [None] * len(args_list)
def worker(index, args):
results[index] = func(*args)
# 创建并启动线程
threads = []
for i, args in enumerate(args_list):
t = threading.Thread(target=worker, args=(i, args))
threads.append(t)
t.start()
# 控制并发线程数
if len(threads) >= num_threads:
threads[0].join()
threads.pop(0)
# 等待剩余线程完成
for t in threads:
t.join()
return results
# 测试函数
def compute_sum(n):
return sum(range(n))
# 准备参数
args_list = [(1000000,), (2000000,), (3000000,), (4000000,), (5000000,)]
# 使用不同数量的线程测试性能
for num_threads in [1, 2, 3, 4, 5]:
print(f"\n使用 {num_threads} 个线程:")
results = run_in_threads(compute_sum, args_list, num_threads)
print(f"结果: {results}")
- 使用
threading.stack_size()
调整线程栈大小:
import threading
import sys
# 查看默认栈大小
default_stack_size = threading.stack_size()
print(f"默认线程栈大小: {default_stack_size} 字节")
# 调整栈大小(仅在某些平台上支持)
try:
# 设置为2MB
new_stack_size = 2 * 1024 * 1024
threading.stack_size(new_stack_size)
print(f"线程栈大小已调整为: {threading.stack_size()} 字节")
except (ValueError, ThreadError) as e:
print(f"无法调整栈大小: {e}")
# 创建使用新栈大小的线程
def recursive_function(depth):
if depth <= 0:
return
# 创建一些局部变量,占用栈空间
data = [0] * 1000
recursive_function(depth - 1)
def thread_function():
try:
# 尝试较深的递归
recursive_function(500)
print("递归成功完成")
except RecursionError as e:
print(f"递归错误: {e}")
# 创建并启动线程
t = threading.Thread(target=thread_function)
t.start()
t.join()
# 恢复默认栈大小
threading.stack_size(default_stack_size)
print(f"已恢复默认栈大小: {threading.stack_size()} 字节")
- 使用
concurrent.futures.ThreadPoolExecutor
的max_workers
参数控制线程数:
import concurrent.futures
import time
import random
def task(id):
"""模拟任务,随机耗时"""
sleep_time = random.uniform(0.1, 1.0)
time.sleep(sleep_time)
return f"任务 {id} 完成,耗时 {sleep_time:.2f}秒"
def benchmark_thread_pool(num_tasks, max_workers):
"""测试不同线程池大小的性能"""
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(task, i) for i in range(num_tasks)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
end_time = time.time()
elapsed_time = end_time - start_time
print(f"线程池大小: {max_workers}, 任务数: {num_tasks}, 总耗时: {elapsed_time:.2f}秒")
return elapsed_time
# 测试不同线程池大小
num_tasks = 50
for max_workers in [1, 2, 5, 10, 20, 50, 100]:
benchmark_thread_pool(num_tasks, max_workers)
通过这些技术,你可以监控和优化多线程程序的性能,找到最适合你的应用场景的线程配置。