网站Logo 小李的博客

Python多线程并发教程:从入门到实践

xiaoli
11
2025-05-28

Python多线程并发教程:从入门到实践

前言

在现代软件开发中,并发编程已经成为提高程序性能和响应性的重要手段。Python作为一种流行的高级编程语言,提供了多种实现并发的方式,其中多线程是最常用的方式之一。本文将系统地介绍Python中的多线程编程,从基础概念到实际应用,帮助初学者和中级开发者掌握这一重要技能。

本教程将涵盖以下内容:

  • Python多线程的基本概念
  • threading模块的使用方法
  • concurrent.futures模块的使用方法
  • asyncio模块的使用方法
  • 多线程数据采集实战
  • 多线程方式的对比与选择
  • 多线程与多进程的区别

无论你是刚接触并发编程的新手,还是想深入了解Python多线程机制的中级开发者,本文都将为你提供有价值的指导和实践经验。

一、Python多线程基础概念

1.1 什么是线程?

线程(Thread)是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程可以并发多个线程,每条线程并行执行不同的任务。

在Python中,线程是通过标准库中的threading模块来实现的。多线程编程允许程序同时执行多个操作,从而提高程序的执行效率和响应性。

1.2 Python中的GIL(全局解释器锁)

在讨论Python多线程之前,必须了解一个重要的概念:全局解释器锁(Global Interpreter Lock,简称GIL)。

GIL是CPython解释器(Python的默认实现)中的一个机制,它确保同一时刻只有一个线程在执行Python字节码。这意味着,即使在多核处理器上,Python的多线程也不能同时执行多个线程的Python代码。

GIL的影响:

  • 对CPU密集型任务,多线程并不能提供真正的并行计算
  • 对IO密集型任务(如网络请求、文件读写),多线程仍然可以显著提高程序性能

理解GIL的存在及其限制,对于正确使用Python多线程至关重要。

1.3 多线程适用场景

尽管有GIL的限制,Python的多线程在以下场景中仍然非常有用:

  1. IO密集型任务:当程序需要等待外部资源(如文件、网络连接)时,多线程可以让程序在等待过程中执行其他任务。
  2. 并发请求处理:如Web服务器处理多个客户端请求。
  3. 后台任务:在不阻塞主程序的情况下执行后台操作。
  4. 用户界面响应:保持GUI应用的响应性,同时执行耗时操作。

1.4 线程的生命周期

一个线程的生命周期包括以下几个状态:

  1. 新建(New):创建线程对象
  2. 就绪(Runnable):调用start()方法后,线程进入就绪状态,等待CPU调度
  3. 运行(Running):线程获得CPU时间片,执行run()方法中的代码
  4. 阻塞(Blocked):线程暂时停止执行,等待某个条件(如IO操作完成、锁释放等)
  5. 终止(Terminated):线程执行完毕或因异常退出

了解线程的生命周期有助于我们更好地控制和管理线程的执行。

二、threading模块详解

Python的threading模块提供了创建和管理线程的功能。它是Python标准库的一部分,无需额外安装。

2.1 创建和启动线程

在Python中,创建线程主要有两种方式:

方式一:直接使用Thread类

import threading
import time

# 定义线程要执行的函数
def worker(name):
    print(f"线程 {name} 开始工作")
    time.sleep(2)  # 模拟工作耗时
    print(f"线程 {name} 工作完成")

# 创建线程
t = threading.Thread(target=worker, args=('Thread-1',))

# 启动线程
t.start()

# 等待线程结束
t.join()

print("主线程继续执行")

在上面的代码中:

  • target参数指定线程要执行的函数
  • args参数以元组形式传递函数参数
  • start()方法启动线程
  • join()方法等待线程执行完毕

方式二:继承Thread类

import threading
import time

# 继承Thread类
class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    # 重写run方法
    def run(self):
        print(f"线程 {self.name} 开始工作")
        time.sleep(2)  # 模拟工作耗时
        print(f"线程 {self.name} 工作完成")

# 创建线程实例
t = MyThread("Thread-2")

# 启动线程
t.start()

# 等待线程结束
t.join()

print("主线程继续执行")

继承Thread类的方式更加面向对象,适合复杂的线程行为。

2.2 线程同步机制

当多个线程同时访问共享资源时,可能会导致数据不一致或程序错误。Python的threading模块提供了多种同步机制来解决这个问题:

2.2.1 Lock(锁)

Lock是最简单的同步机制,它确保同一时刻只有一个线程可以执行受保护的代码块:

import threading
import time

# 共享资源
counter = 0
# 创建锁
lock = threading.Lock()

def increment(n):
    global counter
    for _ in range(n):
        # 获取锁
        lock.acquire()
        try:
            # 临界区(受保护的代码)
            counter += 1
        finally:
            # 释放锁
            lock.release()

# 也可以使用with语句简化锁的使用
def increment_with(n):
    global counter
    for _ in range(n):
        with lock:  # 自动获取和释放锁
            counter += 1

# 创建两个线程
t1 = threading.Thread(target=increment, args=(10000,))
t2 = threading.Thread(target=increment, args=(10000,))

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print(f"最终计数: {counter}")  # 应该是20000

2.2.2 RLock(可重入锁)

RLock允许同一个线程多次获取锁,而不会导致死锁:

import threading

# 创建可重入锁
rlock = threading.RLock()

def outer_function():
    with rlock:  # 第一次获取锁
        print("外部函数获取锁")
        inner_function()
        print("外部函数释放锁")

def inner_function():
    with rlock:  # 同一线程再次获取锁
        print("内部函数获取锁")
        print("内部函数释放锁")

# 创建线程
t = threading.Thread(target=outer_function)
t.start()
t.join()

如果使用普通的Lock,上面的代码会导致死锁。

2.2.3 Condition(条件变量)

Condition提供了一种线程间通信的机制,允许线程等待特定条件发生:

import threading
import time
import random

# 创建条件变量
condition = threading.Condition()
# 共享资源
items = []
MAX_ITEMS = 10

# 生产者
def producer():
    for i in range(20):
        with condition:  # 获取锁
            while len(items) >= MAX_ITEMS:  # 检查条件
                print("队列已满,生产者等待...")
                condition.wait()  # 等待消费者消费

            item = random.randint(1, 100)
            items.append(item)
            print(f"生产: {item}, 当前队列: {items}")

            condition.notify()  # 通知消费者

        time.sleep(0.2)  # 生产速度控制

# 消费者
def consumer():
    for i in range(20):
        with condition:  # 获取锁
            while not items:  # 检查条件
                print("队列为空,消费者等待...")
                condition.wait()  # 等待生产者生产

            item = items.pop(0)
            print(f"消费: {item}, 当前队列: {items}")

            condition.notify()  # 通知生产者

        time.sleep(0.5)  # 消费速度控制

# 创建线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程结束
producer_thread.join()
consumer_thread.join()

2.2.4 Semaphore(信号量)

Semaphore用于控制同时访问特定资源的线程数量:

import threading
import time
import random

# 创建信号量,限制最多3个线程同时执行
semaphore = threading.Semaphore(3)

def worker(id):
    with semaphore:  # 获取信号量
        print(f"工作线程 {id} 开始执行")
        time.sleep(random.uniform(1, 3))  # 模拟工作耗时
        print(f"工作线程 {id} 执行完毕")

# 创建10个线程
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程结束
for t in threads:
    t.join()

print("所有工作线程执行完毕")

在上面的代码中,尽管创建了10个线程,但信号量确保同一时刻最多只有3个线程在执行。

2.2.5 Event(事件)

Event用于线程间的简单通信,一个线程发出事件信号,其他线程等待该信号:

import threading
import time

# 创建事件对象
event = threading.Event()

def waiter(id):
    print(f"等待线程 {id} 等待事件...")
    event.wait()  # 等待事件被设置
    print(f"等待线程 {id} 收到事件信号,继续执行")

def setter():
    print("设置线程准备发出事件信号")
    time.sleep(3)  # 模拟一些准备工作
    print("设置线程发出事件信号")
    event.set()  # 设置事件,通知所有等待的线程

# 创建多个等待线程
waiters = []
for i in range(5):
    t = threading.Thread(target=waiter, args=(i,))
    waiters.append(t)
    t.start()

# 创建设置线程
setter_thread = threading.Thread(target=setter)
setter_thread.start()

# 等待所有线程结束
for t in waiters:
    t.join()
setter_thread.join()

print("所有线程执行完毕")

2.3 线程本地数据

有时我们需要每个线程拥有自己的数据副本,而不是共享数据。Python的threading模块提供了local类来实现线程本地存储:

import threading
import time
import random

# 创建线程本地数据
local_data = threading.local()

def worker(id):
    # 为当前线程设置本地数据
    local_data.x = id
    local_data.name = f"Thread-{id}"

    # 模拟工作
    time.sleep(random.uniform(0.5, 1.5))

    # 访问线程本地数据
    print(f"线程 {local_data.name} (ID: {local_data.x}) 执行完毕")

# 创建多个线程
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程结束
for t in threads:
    t.join()

print("所有线程执行完毕")

在上面的代码中,每个线程都有自己的local_data.xlocal_data.name,互不干扰。

2.4 守护线程

守护线程(Daemon Thread)是一种特殊的线程,当主线程结束时,所有守护线程会自动终止,而不管它们是否执行完毕。这对于一些后台任务非常有用:

import threading
import time

def background_task():
    while True:
        print("后台任务执行中...")
        time.sleep(1)

# 创建守护线程
daemon_thread = threading.Thread(target=background_task)
daemon_thread.daemon = True  # 设置为守护线程
daemon_thread.start()

# 主线程工作
print("主线程开始工作")
time.sleep(5)
print("主线程工作完毕")

# 主线程结束后,守护线程会自动终止,无需join

三、concurrent.futures模块详解

Python 3.2引入的concurrent.futures模块提供了更高级的异步执行接口,包括线程池和进程池。它简化了多线程编程,特别是在需要获取线程执行结果的场景中。

3.1 ThreadPoolExecutor基础

ThreadPoolExecutorconcurrent.futures模块中的线程池实现,它管理一组工作线程,将任务分配给这些线程,并跟踪任务的执行状态。

3.1.1 基本用法

import concurrent.futures
import time

def worker(id):
    print(f"工作线程 {id} 开始执行")
    time.sleep(id)  # 模拟不同的工作耗时
    return f"工作线程 {id} 的结果"

# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务到线程池
    futures = {executor.submit(worker, i): i for i in range(5)}

    # 获取任务结果
    for future in concurrent.futures.as_completed(futures):
        id = futures[future]
        try:
            result = future.result()
            print(f"任务 {id} 完成,结果: {result}")
        except Exception as e:
            print(f"任务 {id} 执行出错: {e}")

print("所有任务执行完毕")

在上面的代码中:

  • ThreadPoolExecutor创建了一个最多包含3个线程的线程池
  • submit()方法提交任务到线程池,返回Future对象
  • as_completed()函数按任务完成的顺序产生Future对象
  • future.result()获取任务的返回值

3.1.2 使用map方法

ThreadPoolExecutormap方法提供了一种更简洁的方式来并行执行任务:

import concurrent.futures
import time

def worker(id):
    print(f"工作线程 {id} 开始执行")
    time.sleep(id)  # 模拟不同的工作耗时
    return f"工作线程 {id} 的结果"

# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 使用map方法并行执行任务
    results = executor.map(worker, range(5))

    # 获取结果(按提交顺序)
    for id, result in enumerate(results):
        print(f"任务 {id} 完成,结果: {result}")

print("所有任务执行完毕")

map方法的结果是按照任务提交的顺序返回的,而不是按完成顺序。

3.2 Future对象

Future对象表示异步执行的结果。它提供了检查任务是否完成、等待任务完成以及获取任务结果的方法。

import concurrent.futures
import time

def worker(id):
    print(f"工作线程 {id} 开始执行")
    time.sleep(id)  # 模拟不同的工作耗时
    if id == 3:
        raise ValueError("模拟任务失败")
    return f"工作线程 {id} 的结果"

# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # 提交任务
    future = executor.submit(worker, 2)

    # 检查任务是否完成
    print(f"任务是否完成: {future.done()}")

    # 等待任务完成(可选超时)
    time.sleep(1)
    print(f"1秒后,任务是否完成: {future.done()}")

    # 获取任务结果(会阻塞直到任务完成)
    result = future.result()
    print(f"任务结果: {result}")

    # 处理异常情况
    error_future = executor.submit(worker, 3)
    try:
        error_result = error_future.result()
    except Exception as e:
        print(f"任务执行出错: {e}")

3.3 取消任务

Future对象还允许取消尚未执行的任务:

import concurrent.futures
import time

def worker(id):
    print(f"工作线程 {id} 开始执行")
    time.sleep(3)  # 模拟长时间运行的任务
    print(f"工作线程 {id} 执行完毕")
    return f"工作线程 {id} 的结果"

# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    # 提交多个任务
    futures = [executor.submit(worker, i) for i in range(5)]

    # 取消部分任务
    for i, future in enumerate(futures[2:]):
        if future.cancel():
            print(f"任务 {i+2} 已取消")
        else:
            print(f"任务 {i+2} 无法取消(可能已开始执行)")

    # 等待剩余任务完成
    for i, future in enumerate(futures[:2]):
        if not future.cancelled():
            result = future.result()
            print(f"任务 {i} 完成,结果: {result}")

print("所有任务处理完毕")

注意,已经开始执行的任务无法取消。

3.4 超时控制

Future.result()方法支持超时参数,可以避免长时间等待某个任务:

import concurrent.futures
import time

def worker(id, sleep_time):
    print(f"工作线程 {id} 开始执行,预计耗时 {sleep_time} 秒")
    time.sleep(sleep_time)
    return f"工作线程 {id} 的结果"

# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务
    future1 = executor.submit(worker, 1, 2)  # 2秒任务
    future2 = executor.submit(worker, 2, 5)  # 5秒任务

    # 设置超时获取结果
    try:
        # 等待最多3秒
        result1 = future1.result(timeout=3)
        print(f"任务1结果: {result1}")
    except concurrent.futures.TimeoutError:
        print("任务1超时")

    try:
        # 等待最多3秒
        result2 = future2.result(timeout=3)
        print(f"任务2结果: {result2}")
    except concurrent.futures.TimeoutError:
        print("任务2超时")
        # 可以选择取消超时任务
        future2.cancel()

四、asyncio模块详解

Python 3.4引入的asyncio模块提供了一种使用协程实现并发的方式,它与传统的多线程有很大不同。asyncio使用事件循环来调度和管理协程,实现非阻塞的异步编程。

4.1 协程基础

协程(Coroutine)是一种特殊的函数,可以在执行过程中暂停并稍后恢复。在Python中,协程使用async def语法定义,使用await关键字暂停执行。

import asyncio

# 定义协程
async def hello_world():
    print("Hello")
    await asyncio.sleep(1)  # 非阻塞的睡眠
    print("World")
    return "Done"

# 运行协程
async def main():
    result = await hello_world()
    print(f"结果: {result}")

# 使用asyncio.run运行主协程
asyncio.run(main())

在上面的代码中:

  • async def定义了一个协程函数
  • await用于等待另一个协程的结果
  • asyncio.sleep()是一个非阻塞的睡眠函数
  • asyncio.run()用于运行主协程

4.2 并发执行多个协程

asyncio的强大之处在于可以并发执行多个协程,而不需要创建多个线程:

import asyncio
import time

async def task(name, delay):
    print(f"任务 {name} 开始执行")
    await asyncio.sleep(delay)  # 非阻塞的睡眠
    print(f"任务 {name} 执行完毕")
    return f"任务 {name} 的结果"

async def main():
    start_time = time.time()

    # 方法1:使用asyncio.create_task创建任务
    task_a = asyncio.create_task(task("A", 2))
    task_b = asyncio.create_task(task("B", 3))

    # 等待任务完成
    result_a = await task_a
    result_b = await task_b

    # 方法2:使用asyncio.gather并发执行多个协程
    results = await asyncio.gather(
        task("C", 1),
        task("D", 2),
        task("E", 3)
    )

    end_time = time.time()
    print(f"所有任务完成,耗时: {end_time - start_time:.2f}秒")
    print(f"任务A结果: {result_a}")
    print(f"任务B结果: {result_b}")
    print(f"任务C、D、E结果: {results}")

# 运行主协程
asyncio.run(main())

在上面的代码中,尽管有5个任务,总共需要11秒(2+3+1+2+3),但实际执行时间约为3秒,因为这些任务是并发执行的。

4.3 asyncio的同步原语

threading模块类似,asyncio也提供了各种同步原语,如锁、事件、信号量等,但这些是专为协程设计的:

import asyncio

# 创建锁
lock = asyncio.Lock()

async def worker(id):
    print(f"工作协程 {id} 尝试获取锁")
    async with lock:  # 异步上下文管理器
        print(f"工作协程 {id} 获取了锁")
        await asyncio.sleep(1)  # 模拟工作
        print(f"工作协程 {id} 释放了锁")

async def main():
    # 并发执行多个工作协程
    await asyncio.gather(
        worker(1),
        worker(2),
        worker(3)
    )

# 运行主协程
asyncio.run(main())

4.4 使用aiohttp进行异步HTTP请求

asyncio最常见的应用场景之一是进行异步HTTP请求。aiohttp是一个基于asyncio的异步HTTP客户端/服务器库:

import asyncio
import aiohttp
import time

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        "https://www.python.org",
        "https://www.github.com",
        "https://www.stackoverflow.com",
        "https://www.wikipedia.org",
        "https://www.reddit.com"
    ]

    start_time = time.time()
    results = await fetch_all(urls)
    end_time = time.time()

    print(f"获取了 {len(results)} 个网页,总大小: {sum(len(r) for r in results)} 字节")
    print(f"总耗时: {end_time - start_time:.2f}秒")

# 运行主协程
asyncio.run(main())

注意:使用上面的代码需要先安装aiohttp库:pip install aiohttp

4.5 asyncio的超时控制

asyncio提供了wait_for函数来为协程设置超时:

import asyncio

async def long_running_task():
    print("长时间运行的任务开始")
    await asyncio.sleep(5)
    print("长时间运行的任务完成")
    return "任务结果"

async def main():
    try:
        # 设置3秒超时
        result = await asyncio.wait_for(long_running_task(), timeout=3)
        print(f"任务结果: {result}")
    except asyncio.TimeoutError:
        print("任务超时")

# 运行主协程
asyncio.run(main())

五、多线程数据采集实战

在本节中,我们将通过一个实际的数据采集案例,展示如何使用不同的多线程方式进行网页数据采集。

5.1 使用threading模块进行网页数据采集

以下是一个使用threading模块创建多线程爬虫的示例:

import threading
import requests
import time
from queue import Queue

class WebCrawler:
    def __init__(self, urls, max_threads=5):
        self.urls = urls
        self.results = {}
        self.queue = Queue()
        self.max_threads = max_threads

        # 将URL放入队列
        for url in urls:
            self.queue.put(url)

    def crawler_worker(self):
        """工作线程函数,不断从队列获取URL并处理"""
        while not self.queue.empty():
            # 从队列获取URL
            url = self.queue.get()
            try:
                # 发送HTTP请求
                start_time = time.time()
                response = requests.get(url, timeout=10)
                elapsed_time = time.time() - start_time

                # 存储结果
                thread_name = threading.current_thread().name
                self.results[url] = {
                    'status_code': response.status_code,
                    'content_length': len(response.content),
                    'elapsed_time': elapsed_time,
                    'thread': thread_name
                }
                print(f"{thread_name} 完成抓取 {url}, 状态码: {response.status_code}, 用时: {elapsed_time:.2f}秒")

            except Exception as e:
                print(f"抓取 {url} 时出错: {e}")
                self.results[url] = {'error': str(e)}

            finally:
                # 标记任务完成
                self.queue.task_done()

    def run(self):
        """启动爬虫"""
        start_time = time.time()

        # 创建并启动工作线程
        threads = []
        for i in range(min(self.max_threads, len(self.urls))):
            thread = threading.Thread(target=self.crawler_worker, name=f"爬虫线程-{i}")
            thread.daemon = True  # 设置为守护线程
            thread.start()
            threads.append(thread)

        # 等待队列中所有任务完成
        self.queue.join()

        # 计算总耗时
        total_time = time.time() - start_time
        print(f"所有URL抓取完成,总耗时: {total_time:.2f}秒")

        return self.results

# 使用示例
if __name__ == "__main__":
    # 待抓取的URL列表
    urls = [
        "https://www.python.org",
        "https://www.github.com",
        "https://www.stackoverflow.com",
        "https://www.wikipedia.org",
        "https://www.reddit.com"
    ]

    # 创建爬虫实例并运行
    crawler = WebCrawler(urls, max_threads=3)
    results = crawler.run()

    # 输出结果统计
    success_count = sum(1 for r in results.values() if 'status_code' in r)
    error_count = sum(1 for r in results.values() if 'error' in r)

    print(f"\n抓取统计:")
    print(f"成功: {success_count}, 失败: {error_count}")

    # 计算平均响应时间
    if success_count > 0:
        avg_time = sum(r['elapsed_time'] for r in results.values() if 'elapsed_time' in r) / success_count
        print(f"平均响应时间: {avg_time:.2f}秒")

这个爬虫使用了线程池和队列的设计模式,能够高效地并发抓取多个URL。

5.2 使用concurrent.futures进行数据采集

以下是使用concurrent.futures模块的ThreadPoolExecutor进行数据采集的示例:

import concurrent.futures
import requests
import time

def fetch_url(url):
    """抓取单个URL的函数"""
    try:
        start_time = time.time()
        response = requests.get(url, timeout=10)
        elapsed_time = time.time() - start_time

        return {
            'url': url,
            'status_code': response.status_code,
            'content_length': len(response.content),
            'elapsed_time': elapsed_time
        }

    except Exception as e:
        return {
            'url': url,
            'error': str(e)
        }

def fetch_all_urls(urls, max_workers=5):
    """并发抓取多个URL"""
    start_time = time.time()
    results = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_url = {executor.submit(fetch_url, url): url for url in urls}

        # 获取任务结果
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result()
                results[url] = result

                if 'status_code' in result:
                    print(f"完成抓取 {url}, 状态码: {result['status_code']}, 用时: {result['elapsed_time']:.2f}秒")
                else:
                    print(f"抓取 {url} 时出错: {result['error']}")

            except Exception as e:
                print(f"处理 {url} 结果时出错: {e}")
                results[url] = {'error': str(e)}

    # 计算总耗时
    total_time = time.time() - start_time
    print(f"所有URL抓取完成,总耗时: {total_time:.2f}秒")

    return results

# 使用示例
if __name__ == "__main__":
    # 待抓取的URL列表
    urls = [
        "https://www.python.org",
        "https://www.github.com",
        "https://www.stackoverflow.com",
        "https://www.wikipedia.org",
        "https://www.reddit.com"
    ]

    # 并发抓取所有URL
    results = fetch_all_urls(urls, max_workers=3)

    # 输出结果统计
    success_count = sum(1 for r in results.values() if 'status_code' in r)
    error_count = sum(1 for r in results.values() if 'error' in r)

    print(f"\n抓取统计:")
    print(f"成功: {success_count}, 失败: {error_count}")

    # 计算平均响应时间
    if success_count > 0:
        avg_time = sum(r['elapsed_time'] for r in results.values() if 'elapsed_time' in r) / success_count
        print(f"平均响应时间: {avg_time:.2f}秒")

使用concurrent.futures的代码更加简洁,不需要手动管理线程和队列。

5.3 使用asyncio进行异步数据采集

以下是使用asyncioaiohttp进行异步数据采集的示例:

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    """异步抓取单个URL"""
    try:
        start_time = time.time()
        async with session.get(url, timeout=10) as response:
            content = await response.read()
            elapsed_time = time.time() - start_time

            return {
                'url': url,
                'status_code': response.status,
                'content_length': len(content),
                'elapsed_time': elapsed_time
            }

    except Exception as e:
        return {
            'url': url,
            'error': str(e)
        }

async def fetch_all_urls(urls):
    """异步抓取多个URL"""
    start_time = time.time()
    results = {}

    # 创建会话
    async with aiohttp.ClientSession() as session:
        # 创建所有任务
        tasks = [fetch_url(session, url) for url in urls]

        # 等待所有任务完成
        for task in asyncio.as_completed(tasks):
            result = await task
            url = result['url']
            results[url] = result

            if 'status_code' in result:
                print(f"完成抓取 {url}, 状态码: {result['status_code']}, 用时: {result['elapsed_time']:.2f}秒")
            else:
                print(f"抓取 {url} 时出错: {result['error']}")

    # 计算总耗时
    total_time = time.time() - start_time
    print(f"所有URL抓取完成,总耗时: {total_time:.2f}秒")

    return results

# 使用示例
async def main():
    # 待抓取的URL列表
    urls = [
        "https://www.python.org",
        "https://www.github.com",
        "https://www.stackoverflow.com",
        "https://www.wikipedia.org",
        "https://www.reddit.com"
    ]

    # 异步抓取所有URL
    results = await fetch_all_urls(urls)

    # 输出结果统计
    success_count = sum(1 for r in results.values() if 'status_code' in r)
    error_count = sum(1 for r in results.values() if 'error' in r)

    print(f"\n抓取统计:")
    print(f"成功: {success_count}, 失败: {error_count}")

    # 计算平均响应时间
    if success_count > 0:
        avg_time = sum(r['elapsed_time'] for r in results.values() if 'elapsed_time' in r) / success_count
        print(f"平均响应时间: {avg_time:.2f}秒")

# 运行主协程
if __name__ == "__main__":
    asyncio.run(main())

注意:使用上面的代码需要先安装aiohttp库:pip install aiohttp

六、多线程方式的对比与选择

在Python中,我们有多种方式实现并发编程,每种方式都有其优缺点和适用场景。下面我们将对threadingconcurrent.futuresasyncio三种方式进行详细对比。

6.1 编程模型对比

| 特性 | threading | concurrent.futures | asyncio |
|——|———–|——————-|———|
| 编程模型 | 基于线程的并发 | 基于任务的并发 | 基于协程的并发 |
| 语法复杂度 | 中等 | 简单 | 较高(需要async/await) |
| 代码组织 | 需手动管理线程 | 自动管理线程池 | 基于事件循环和协程 |
| 调试难度 | 较难 | 中等 | 中等 |
| 学习曲线 | 平缓 | 平缓 | 陡峭 |

6.2 性能特性对比

| 特性 | threading | concurrent.futures | asyncio |
|——|———–|——————-|———|
| CPU密集型任务 | 受GIL限制,性能较差 | 受GIL限制,性能较差 | 不适合,单线程模型 |
| IO密集型任务 | 适合 | 适合 | 非常适合 |
| 内存占用 | 中等 | 中等 | 低 |
| 上下文切换开销 | 高 | 高 | 低(协程切换) |
| 并发连接数上限 | 受系统线程数限制 | 受线程池大小限制 | 可支持大量并发(数千级) |
| 启动开销 | 中等 | 中等 | 低 |

6.3 功能特性对比

| 特性 | threading | concurrent.futures | asyncio |
|——|———–|——————-|———|
| 线程同步机制 | 丰富(Lock, Event, Semaphore等) | 简单(Future对象) | 丰富(Lock, Event, Semaphore等) |
| 取消任务 | 困难 | 支持(cancel方法) | 支持(Task.cancel) |
| 获取返回值 | 需自行实现 | 内置支持(Future.result) | 内置支持(await) |
| 异常处理 | 复杂 | 简单(Future.exception) | 简单(try/except with await) |
| 超时控制 | 需自行实现 | 支持 | 内置支持(asyncio.wait_for) |
| 批量任务执行 | 需自行实现 | 支持(map, as_completed) | 支持(gather, as_completed) |

6.4 适用场景对比

| 场景 | threading | concurrent.futures | asyncio |
|——|———–|——————-|———|
| 网络爬虫 | 适合 | 适合 | 非常适合 |
| 文件IO操作 | 适合 | 适合 | 适合(需使用aiofiles) |
| 数据库操作 | 适合 | 适合 | 适合(需使用异步驱动) |
| 图像处理 | 不适合 | 不适合(建议用ProcessPoolExecutor) | 不适合 |
| 实时通信 | 适合 | 适合 | 非常适合 |
| 大量HTTP请求 | 适合 | 适合 | 非常适合 |
| GUI应用 | 适合 | 适合 | 不太适合 |
| 长连接服务 | 适合 | 适合 | 非常适合 |

6.5 选择建议

  1. 选择threading:

  2. 需要与现有阻塞式代码集成

  3. 任务涉及共享状态且需要精细控制

  4. 并发量较小(几十个线程)

  5. 主要是IO密集型任务

  6. 选择concurrent.futures:

  7. 需要简单易用的API

  8. 需要同时支持线程池和进程池

  9. 需要获取任务执行结果

  10. 任务相对独立,无需复杂协调

  11. 选择asyncio:

  12. 需要处理大量并发连接(数百或数千)

  13. 主要是网络IO或其他IO密集型任务

  14. 可以使用异步库(如aiohttp, asyncpg等)

  15. 对性能和资源利用率有较高要求

  16. 混合使用:

  17. 应用场景复杂,既有IO密集型又有CPU密集型任务

  18. 需要处理大量并发连接同时又有计算密集型处理

  19. 例如:使用asyncio处理网络请求,使用ProcessPoolExecutor处理数据分析

七、多线程与多进程的区别

在Python中,除了多线程外,多进程也是一种常用的并行处理方式。了解它们的区别有助于我们在不同场景下做出正确的选择。

7.1 概念区别

  • 多线程:在同一进程内的多个执行流,共享内存空间
  • 多进程:多个独立的进程,各自有独立的内存空间

7.2 GIL的影响

  • 多线程:受Python的GIL限制,同一时刻只有一个线程执行Python字节码
  • 多进程:每个进程有自己的Python解释器和GIL,可以真正并行执行

7.3 资源消耗

  • 多线程

  • 内存占用小(共享进程内存)

  • 创建和切换开销小

  • 进程间通信简单(直接共享内存)

  • 多进程

  • 内存占用大(每个进程独立内存)

  • 创建和切换开销大

  • 进程间通信复杂(需要IPC机制)

7.4 适用场景

  • 多线程适合

  • IO密集型任务(网络请求、文件读写等)

  • 需要共享数据的任务

  • 资源有限的环境

  • 多进程适合

  • CPU密集型任务(计算、图像处理等)

  • 需要隔离的任务

  • 多核CPU环境

7.5 实现方式

Python提供了multiprocessing模块来实现多进程编程,其API设计与threading模块类似:

import multiprocessing
import time

def worker(name):
    print(f"进程 {name} 开始工作")
    time.sleep(2)  # 模拟工作耗时
    print(f"进程 {name} 工作完成")

if __name__ == "__main__":
    # 创建多个进程
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(f"Process-{i}",))
        processes.append(p)
        p.start()

    # 等待所有进程结束
    for p in processes:
        p.join()

    print("所有进程执行完毕")

concurrent.futures模块也提供了ProcessPoolExecutor,使用方式与ThreadPoolExecutor几乎相同:

import concurrent.futures
import time

def worker(id):
    print(f"进程 {id} 开始执行")
    time.sleep(id)  # 模拟不同的工作耗时
    return f"进程 {id} 的结果"

if __name__ == "__main__":
    # 创建进程池
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        # 提交任务到进程池
        futures = {executor.submit(worker, i): i for i in range(5)}

        # 获取任务结果
        for future in concurrent.futures.as_completed(futures):
            id = futures[future]
            try:
                result = future.result()
                print(f"任务 {id} 完成,结果: {result}")
            except Exception as e:
                print(f"任务 {id} 执行出错: {e}")

    print("所有任务执行完毕")

7.6 选择建议

  1. 选择多线程当

  2. 任务主要是IO密集型

  3. 需要频繁共享数据

  4. 对资源消耗敏感

  5. 选择多进程当

  6. 任务主要是CPU密集型

  7. 需要充分利用多核CPU

  8. 任务之间相对独立,通信需求少

  9. 混合使用当

  10. 复杂应用场景,既有IO密集型又有CPU密集型任务

  11. 例如:使用多线程处理网络请求,使用多进程处理数据分析

总结

Python提供了多种实现并发的方式,每种方式都有其特点和适用场景:

  1. threading模块:提供了基础的多线程支持,适合IO密集型任务和需要精细控制的场景。

  2. concurrent.futures模块:提供了更高级的线程池和进程池接口,简化了并发编程,适合需要获取任务结果的场景。

  3. asyncio模块:提供了基于协程的异步编程模型,适合处理大量并发连接的IO密集型任务。

  4. multiprocessing模块:提供了多进程支持,适合CPU密集型任务和需要绕过GIL限制的场景。

在实际应用中,应根据任务特性、并发需求和性能要求选择合适的并发方式。对于复杂应用,混合使用多种并发方式往往能够获得最佳性能。

无论选择哪种方式,都需要注意线程安全、资源管理和异常处理等问题,以确保程序的正确性和稳定性。

希望本教程能够帮助你理解Python多线程并发编程的基本概念和实践方法,为你的Python开发之旅增添新的工具和视角。

参考资料

  1. Python官方文档 - threading模块: https://docs.python.org/3/library/threading.html
  2. Python官方文档 - concurrent.futures模块: https://docs.python.org/3/library/concurrent.futures.html
  3. Python官方文档 - asyncio模块: https://docs.python.org/3/library/asyncio.html
  4. Python官方文档 - multiprocessing模块: https://docs.python.org/3/library/multiprocessing.html
  5. Real Python - An Intro to Threading in Python: https://realpython.com/intro-to-python-threading/

1.5 并发 vs. 并行

在讨论多线程时,区分并发(Concurrency)和并行(Parallelism)非常重要:

  • 并发:指系统能够处理多个任务的能力,这些任务可能在时间上重叠执行,但不一定同时执行。例如,通过快速切换在单个CPU核心上运行多个线程。
  • 并行:指系统能够同时执行多个任务的能力,通常需要多个CPU核心。例如,在多核处理器上同时运行多个进程或(在没有GIL限制的情况下)多个线程。

由于Python的GIL,CPython中的多线程主要实现的是并发,而不是真正的并行(对于CPU密集型任务)。然而,对于IO密集型任务,线程在等待IO时会释放GIL,允许其他线程运行,从而实现有效的并发。

1.6 深入理解GIL

全局解释器锁(GIL)是CPython实现中的一个互斥锁,用于保护对Python对象的访问。它的存在主要是为了简化CPython的内存管理机制(特别是引用计数)。

为什么存在GIL?

  • 简化内存管理:GIL使得CPython的引用计数机制变得线程安全,开发者不需要担心多个线程同时修改同一个对象的引用计数。
  • 简化C扩展开发:许多Python的C扩展库依赖于GIL提供的线程安全保证,移除GIL可能会破坏这些库。
  • 历史原因:在单核CPU时代,GIL提供了一种简单有效的方式来实现多线程,并且对于IO密集型任务性能良好。

GIL的深远影响:

  • CPU密集型任务瓶颈:对于需要大量计算的任务,GIL使得多线程无法利用多核CPU的优势,性能甚至可能比单线程更差(因为线程切换开销)。
  • IO密集型任务优势:对于涉及大量等待(如网络、磁盘IO)的任务,线程在等待时会释放GIL,允许其他线程运行Python代码,因此多线程可以显著提高效率。
  • 解释器选择:其他Python解释器,如Jython(运行在JVM上)和IronPython(运行在.NET CLR上),没有GIL,可以在多核CPU上实现真正的线程并行。此外,CPython社区也在进行移除GIL的实验(如PEP 703),但目前仍处于实验阶段。

为什么仍然使用多线程?

尽管有GIL,但在以下情况下,多线程仍然是Python中并发编程的有效选择:

  1. 处理IO密集型任务:这是Python多线程最主要的优势所在。网络请求、文件读写、数据库交互等任务,大部分时间都在等待外部设备响应,此时线程会释放GIL,让其他线程有机会执行。
  2. 简化并发模型:相比多进程,多线程共享内存,数据共享和通信更简单直接(但也需要注意线程安全)。
  3. 资源消耗较低:线程的创建和上下文切换开销通常比进程小。
  4. 与现有库的兼容性:许多Python库是基于阻塞IO设计的,使用多线程可以方便地将这些库用于并发场景。

1.7 线程生命周期详解

让我们更详细地了解线程状态之间的转换:

  1. 新建(New):当创建threading.Thread对象时,线程处于新建状态。此时,操作系统尚未为其分配资源。
    t = threading.Thread(target=some_function)
    # 此时 t 处于新建状态
    
  2. 就绪(Runnable):调用线程对象的start()方法后,线程进入就绪状态。它已经具备了运行所需的所有资源,等待操作系统的CPU调度器分配时间片。
    t.start()
    # 此时 t 进入就绪状态,等待被调度
    
  3. 运行(Running):当CPU调度器选中一个就绪状态的线程时,该线程进入运行状态,开始执行其run()方法(或target指定的函数)。线程可以在运行状态和就绪状态之间快速切换(时间片轮转)。
  4. 阻塞(Blocked):线程在运行过程中,如果遇到某些条件无法继续执行,就会进入阻塞状态。常见导致阻塞的情况包括:
    • 调用time.sleep()
    • 等待IO操作完成(如requests.get(), 文件读写)
    • 尝试获取一个已被其他线程持有的锁(lock.acquire()
    • 等待条件变量或事件(condition.wait(), event.wait()
    • 调用另一个线程的join()方法
      当阻塞条件解除时(如IO完成、锁被释放、事件被设置),线程会重新回到就绪状态,等待再次被调度。
  5. 终止(Terminated):线程的run()方法执行完毕,或者在执行过程中抛出未被捕获的异常,线程就进入终止状态。终止的线程无法再次启动。

理解这些状态转换对于分析多线程程序的行为和调试问题至关重要。

二、threading模块详解(续)

2.5 线程间通信 - Queue

除了同步原语外,线程间通信是多线程编程中的另一个重要方面。Python的queue模块提供了线程安全的队列实现,是线程间传递数据的理想选择:

import threading
import queue
import time
import random

# 创建线程安全的队列
task_queue = queue.Queue()
result_queue = queue.Queue()

# 生产者线程:生成任务
def producer(num_tasks):
    for i in range(num_tasks):
        task = f"Task-{i}"
        task_queue.put(task)
        print(f"生产者生成任务: {task}")
        time.sleep(random.uniform(0.1, 0.5))  # 随机延迟

    # 添加结束标记
    for _ in range(3):  # 假设有3个消费者线程
        task_queue.put(None)  # 结束标记

# 消费者线程:处理任务
def consumer():
    while True:
        # 从队列获取任务
        task = task_queue.get()

        # 检查结束标记
        if task is None:
            task_queue.task_done()
            break

        # 处理任务
        print(f"消费者 {threading.current_thread().name} 处理任务: {task}")
        time.sleep(random.uniform(0.5, 1.5))  # 模拟处理时间

        # 将结果放入结果队列
        result = f"Result of {task}"
        result_queue.put(result)

        # 标记任务完成
        task_queue.task_done()

# 创建生产者线程
producer_thread = threading.Thread(target=producer, args=(10,))

# 创建多个消费者线程
consumer_threads = []
for i in range(3):
    t = threading.Thread(target=consumer, name=f"Consumer-{i}")
    consumer_threads.append(t)

# 启动所有线程
producer_thread.start()
for t in consumer_threads:
    t.start()

# 等待所有任务处理完毕
task_queue.join()

# 等待所有线程结束
producer_thread.join()
for t in consumer_threads:
    t.join()

# 处理结果
print("\n处理结果:")
while not result_queue.empty():
    result = result_queue.get()
    print(f"获取结果: {result}")

Queue类提供了几个重要方法:

  • put(item): 将项目放入队列,如果队列已满,会阻塞
  • get(): 从队列获取项目,如果队列为空,会阻塞
  • task_done(): 表示之前入队的一个任务已经完成
  • join(): 阻塞直到队列中所有项目都被处理完(即每个put()都有对应的task_done()调用)

除了基本的Queuequeue模块还提供了其他类型的队列:

  • LifoQueue: 后进先出队列(栈)
  • PriorityQueue: 优先级队列,项目按优先级排序
  • SimpleQueue: 简单的FIFO队列,不支持task_done()join()

2.6 线程池实现

在使用concurrent.futures模块之前,我们可以使用threading模块自己实现一个简单的线程池:

import threading
import queue
import time
import random

class ThreadPool:
    def __init__(self, num_threads):
        self.task_queue = queue.Queue()
        self.threads = []
        self.shutdown_flag = threading.Event()

        # 创建工作线程
        for i in range(num_threads):
            thread = threading.Thread(target=self._worker, name=f"Worker-{i}")
            thread.daemon = True  # 设置为守护线程
            thread.start()
            self.threads.append(thread)

    def _worker(self):
        """工作线程函数,不断从队列获取任务并执行"""
        while not self.shutdown_flag.is_set():
            try:
                # 从队列获取任务(最多等待1秒)
                func, args, kwargs, result_container = self.task_queue.get(timeout=1)

                try:
                    # 执行任务
                    result = func(*args, **kwargs)
                    # 存储结果
                    if result_container is not None:
                        result_container.append(result)
                except Exception as e:
                    print(f"任务执行出错: {e}")

                # 标记任务完成
                self.task_queue.task_done()

            except queue.Empty:
                # 队列为空,继续等待
                continue

    def submit(self, func, *args, **kwargs):
        """提交任务到线程池"""
        result_container = []
        self.task_queue.put((func, args, kwargs, result_container))
        return result_container  # 可以通过这个容器获取结果

    def shutdown(self, wait=True):
        """关闭线程池"""
        self.shutdown_flag.set()

        if wait:
            # 等待所有任务完成
            self.task_queue.join()
            # 等待所有线程结束
            for thread in self.threads:
                thread.join()

# 示例任务函数
def task(id, sleep_time):
    thread_name = threading.current_thread().name
    print(f"{thread_name} 执行任务 {id},将耗时 {sleep_time:.2f} 秒")
    time.sleep(sleep_time)
    return f"任务 {id} 的结果"

# 使用线程池
pool = ThreadPool(3)  # 创建包含3个线程的线程池

# 提交多个任务
results = []
for i in range(10):
    sleep_time = random.uniform(0.5, 2.0)
    result_container = pool.submit(task, i, sleep_time)
    results.append(result_container)

# 等待所有任务完成并获取结果
time.sleep(1)  # 等待一些任务开始执行
print("\n等待所有任务完成...")
pool.shutdown()

# 打印结果
print("\n所有任务结果:")
for i, result_container in enumerate(results):
    if result_container:  # 如果任务成功完成
        print(f"任务 {i} 结果: {result_container[0]}")
    else:
        print(f"任务 {i} 未完成或出错")

这个简单的线程池实现展示了线程池的基本原理:维护一个工作线程队列和一个任务队列,工作线程不断从任务队列获取任务并执行。

2.7 线程安全与竞态条件

多线程编程中最常见的问题之一是竞态条件(Race Condition)。当多个线程同时访问和修改共享数据时,如果没有适当的同步机制,可能会导致数据不一致或程序错误。

以下是一个典型的竞态条件示例:

import threading

# 共享资源
counter = 0

def increment(n):
    global counter
    for _ in range(n):
        # 以下三行代码不是原子操作,可能导致竞态条件
        current = counter  # 读取当前值
        current += 1       # 增加
        counter = current  # 写回

# 创建两个线程
t1 = threading.Thread(target=increment, args=(100000,))
t2 = threading.Thread(target=increment, args=(100000,))

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print(f"最终计数: {counter}")  # 预期是200000,但实际可能小于这个值

在上面的代码中,increment函数中的三行代码(读取、增加、写回)不是原子操作。如果两个线程几乎同时执行这些操作,可能会导致一个线程的更改被另一个线程覆盖,最终结果小于预期。

解决竞态条件的方法是使用同步机制,如锁:

import threading

# 共享资源
counter = 0
# 创建锁
lock = threading.Lock()

def increment(n):
    global counter
    for _ in range(n):
        with lock:  # 使用锁保护临界区
            counter += 1  # 现在这个操作是线程安全的

# 创建两个线程
t1 = threading.Thread(target=increment, args=(100000,))
t2 = threading.Thread(target=increment, args=(100000,))

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print(f"最终计数: {counter}")  # 现在应该正确显示200000

2.8 死锁及其预防

死锁(Deadlock)是多线程编程中另一个常见问题。当两个或多个线程互相等待对方持有的资源时,就会发生死锁,导致所有相关线程永远阻塞。

以下是一个死锁示例:

import threading
import time

# 创建两个锁
lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1_function():
    print("线程1尝试获取锁1")
    with lock1:
        print("线程1获取了锁1")
        time.sleep(0.5)  # 模拟一些工作

        print("线程1尝试获取锁2")
        with lock2:
            print("线程1获取了锁2")
            print("线程1执行临界区代码")

def thread2_function():
    print("线程2尝试获取锁2")
    with lock2:
        print("线程2获取了锁2")
        time.sleep(0.5)  # 模拟一些工作

        print("线程2尝试获取锁1")
        with lock1:
            print("线程2获取了锁1")
            print("线程2执行临界区代码")

# 创建线程
t1 = threading.Thread(target=thread1_function)
t2 = threading.Thread(target=thread2_function)

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print("程序正常结束")  # 如果发生死锁,这行代码可能永远不会执行

在上面的代码中,线程1先获取锁1,然后尝试获取锁2;同时,线程2先获取锁2,然后尝试获取锁1。如果两个线程的执行时机恰好导致它们各自获取了一个锁,然后等待另一个锁,就会发生死锁。

预防死锁的方法:

  1. 固定锁的获取顺序:确保所有线程按照相同的顺序获取锁。
def thread1_function():
    with lock1:
        with lock2:
            # 临界区代码
            pass

def thread2_function():
    with lock1:  # 注意这里也是先获取lock1
        with lock2:
            # 临界区代码
            pass
  1. 使用超时:在获取锁时设置超时,如果超时未获取到锁,则释放已持有的锁并重试。
def thread_function():
    while True:
        if lock1.acquire(timeout=1):
            try:
                if lock2.acquire(timeout=1):
                    try:
                        # 临界区代码
                        break  # 成功获取两个锁,执行完毕后退出循环
                    finally:
                        lock2.release()
                else:
                    # 未能获取lock2,稍后重试
                    print("未能获取锁2,重试")
            finally:
                lock1.release()
        else:
            # 未能获取lock1,稍后重试
            print("未能获取锁1,重试")

        # 随机延迟,避免活锁
        time.sleep(random.uniform(0, 0.1))
  1. 使用RLock:如果同一个线程需要多次获取同一个锁,使用RLock而不是Lock

  2. 使用更高级的同步原语:如ConditionSemaphoreEvent,它们可能更适合特定的同步需求。

  3. 减少锁的粒度:尽量减小临界区的大小,只在必要的代码段使用锁。

三、concurrent.futures模块详解(续)

3.5 错误处理与异常传播

在使用concurrent.futures时,任务中的异常不会立即抛出,而是存储在对应的Future对象中,直到调用result()方法时才会引发。这种设计允许我们以更可控的方式处理并发任务中的错误。

import concurrent.futures
import time
import random

def task(id):
    print(f"任务 {id} 开始执行")
    time.sleep(random.uniform(0.5, 1.5))

    # 随机生成错误
    if random.random() < 0.3:  # 30%的概率出错
        raise ValueError(f"任务 {id} 执行出错")

    return f"任务 {id} 的结果"

# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 提交多个任务
    futures = [executor.submit(task, i) for i in range(10)]

    # 处理结果和异常
    for i, future in enumerate(futures):
        try:
            result = future.result()
            print(f"任务 {i} 成功: {result}")
        except Exception as e:
            print(f"任务 {i} 失败: {e}")

    # 另一种处理方式:使用as_completed
    print("\n使用as_completed处理结果:")
    for future in concurrent.futures.as_completed(futures):
        # 找出对应的任务ID
        for i, f in enumerate(futures):
            if f is future:
                task_id = i
                break

        try:
            result = future.result()
            print(f"任务 {task_id} 成功: {result}")
        except Exception as e:
            print(f"任务 {task_id} 失败: {e}")

3.6 上下文管理器与资源管理

ThreadPoolExecutor实现了上下文管理器协议,可以与with语句一起使用,确保线程池在使用后正确关闭:

import concurrent.futures
import time

def task(id):
    print(f"任务 {id} 开始执行")
    time.sleep(1)
    return f"任务 {id} 的结果"

# 使用上下文管理器
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, i) for i in range(5)]
    # 上下文管理器结束时,会自动调用executor.shutdown(wait=True)

# 不使用上下文管理器
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
try:
    futures = [executor.submit(task, i) for i in range(5)]
    # 处理结果...
finally:
    # 手动关闭线程池
    executor.shutdown(wait=True)

shutdown(wait=True)方法会等待所有提交的任务完成后再关闭线程池。如果设置wait=False,则会立即返回,但线程池会继续在后台运行,直到所有任务完成。

3.7 线程池大小的选择

线程池大小的选择对性能有重要影响。太少的线程可能无法充分利用并发潜力,而太多的线程可能导致过多的上下文切换开销。

线程池大小的经验法则:

  1. IO密集型任务:线程池大小可以相对较大,通常是CPU核心数的2-4倍,因为大部分时间线程都在等待IO操作完成。
  2. CPU密集型任务:由于GIL的限制,对于纯Python代码,线程池大小通常不应超过CPU核心数(甚至可能是1)。如果任务涉及到释放GIL的C扩展(如NumPy操作),则可以适当增加线程数。

以下是一个根据任务类型动态调整线程池大小的示例:

import concurrent.futures
import os
import time
import numpy as np

def io_bound_task(id):
    """IO密集型任务"""
    print(f"IO任务 {id} 开始执行")
    time.sleep(1)  # 模拟IO操作
    return f"IO任务 {id} 完成"

def cpu_bound_task(id):
    """CPU密集型任务"""
    print(f"CPU任务 {id} 开始执行")
    # 执行一些计算密集型操作
    result = 0
    for i in range(10000000):
        result += i
    return f"CPU任务 {id} 完成,结果: {result}"

def numpy_task(id):
    """使用NumPy的CPU密集型任务(会释放GIL)"""
    print(f"NumPy任务 {id} 开始执行")
    # NumPy操作会释放GIL
    a = np.random.rand(1000, 1000)
    b = np.random.rand(1000, 1000)
    c = np.dot(a, b)  # 矩阵乘法
    return f"NumPy任务 {id} 完成"

# 获取CPU核心数
cpu_count = os.cpu_count()
print(f"CPU核心数: {cpu_count}")

# 对于IO密集型任务,使用较多的线程
print("\n执行IO密集型任务:")
with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count * 4) as executor:
    start_time = time.time()
    results = list(executor.map(io_bound_task, range(20)))
    end_time = time.time()
    print(f"IO密集型任务总耗时: {end_time - start_time:.2f}秒")

# 对于纯Python的CPU密集型任务,使用较少的线程
print("\n执行CPU密集型任务:")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    start_time = time.time()
    results = list(executor.map(cpu_bound_task, range(4)))
    end_time = time.time()
    print(f"CPU密集型任务总耗时: {end_time - start_time:.2f}秒")

# 对于使用NumPy等释放GIL的CPU密集型任务,可以使用更多线程
print("\n执行NumPy任务:")
with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count) as executor:
    start_time = time.time()
    results = list(executor.map(numpy_task, range(cpu_count * 2)))
    end_time = time.time()
    print(f"NumPy任务总耗时: {end_time - start_time:.2f}秒")

四、asyncio模块详解(续)

4.6 事件循环深入理解

asyncio的核心是事件循环(Event Loop),它负责调度和执行协程、处理IO事件、执行网络操作等。理解事件循环的工作原理对于高效使用asyncio至关重要。

import asyncio
import time

async def demo():
    print("协程开始执行")
    await asyncio.sleep(1)
    print("协程执行完毕")
    return "结果"

# 获取事件循环
loop = asyncio.get_event_loop()

# 方法1:在事件循环中运行协程直到完成
result = loop.run_until_complete(demo())
print(f"结果: {result}")

# 方法2:创建任务并添加到事件循环
task = loop.create_task(demo())
loop.run_until_complete(task)
print(f"任务结果: {task.result()}")

# 方法3:使用asyncio.run(推荐,Python 3.7+)
# 这会创建一个新的事件循环,运行协程,然后关闭事件循环
result = asyncio.run(demo())
print(f"asyncio.run结果: {result}")

# 手动操作事件循环(不推荐,但有助于理解)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
    loop.run_until_complete(demo())
finally:
    loop.close()

在现代Python中(3.7+),推荐使用asyncio.run()来运行协程,它会自动处理事件循环的创建和关闭。

4.7 协程与任务的区别

asyncio中,协程(Coroutine)和任务(Task)是两个相关但不同的概念:

  • 协程:使用async def定义的函数,可以在执行过程中暂停(使用await)并稍后恢复。协程本身不会自动执行,需要被事件循环调度。
  • 任务:是对协程的包装,表示一个正在由事件循环调度的协程。任务是Future的子类,可以被等待、取消,并可以添加完成回调。
import asyncio

async def my_coroutine():
    print("协程开始执行")
    await asyncio.sleep(1)
    print("协程执行完毕")
    return "协程结果"

async def main():
    # 直接等待协程
    result1 = await my_coroutine()
    print(f"直接等待结果: {result1}")

    # 创建任务
    task = asyncio.create_task(my_coroutine())

    # 等待任务完成
    result2 = await task
    print(f"任务结果: {result2}")

    # 创建多个任务并并发执行
    task1 = asyncio.create_task(my_coroutine())
    task2 = asyncio.create_task(my_coroutine())

    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    print(f"多任务结果: {results}")

asyncio.run(main())

使用任务的主要优势是可以实现真正的并发执行,而直接等待协程是顺序执行的。

4.8 异步上下文管理器

asyncio支持异步上下文管理器,可以在进入和退出上下文时执行异步操作:

import asyncio

class AsyncContextManager:
    async def __aenter__(self):
        print("异步进入上下文")
        await asyncio.sleep(1)  # 模拟异步操作
        return "上下文资源"

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("异步退出上下文")
        await asyncio.sleep(0.5)  # 模拟异步清理操作
        # 返回True表示异常已处理,False表示异常需要传播
        return False

async def main():
    # 使用异步上下文管理器
    async with AsyncContextManager() as resource:
        print(f"获取资源: {resource}")
        await asyncio.sleep(1)
        print("在上下文中执行操作")

    print("上下文之外")

asyncio.run(main())

asyncio库中的许多对象都实现了异步上下文管理器协议,如LockSemaphore等。

4.9 异步迭代器

asyncio还支持异步迭代器,允许在for循环中使用await

import asyncio

class AsyncIterator:
    def __init__(self, limit):
        self.limit = limit
        self.counter = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.counter < self.limit:
            self.counter += 1
            await asyncio.sleep(0.5)  # 模拟异步操作
            return f"Item {self.counter}"
        else:
            raise StopAsyncIteration

async def main():
    # 使用异步for循环
    async for item in AsyncIterator(5):
        print(f"收到: {item}")

    # 手动使用异步迭代器
    iterator = AsyncIterator(3)
    try:
        while True:
            item = await iterator.__anext__()
            print(f"手动迭代: {item}")
    except StopAsyncIteration:
        print("迭代结束")

asyncio.run(main())

4.10 与其他异步库的集成

asyncio可以与其他异步库集成,如aiohttp(HTTP客户端/服务器)、aiofiles(文件IO)、asyncpg(PostgreSQL数据库)等。

以下是一个使用aiofiles进行异步文件IO的示例:

import asyncio
import aiofiles
import time

async def read_file(filename):
    async with aiofiles.open(filename, mode='r') as f:
        content = await f.read()
    return content

async def write_file(filename, content):
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)

async def process_files(filenames):
    # 并发读取多个文件
    tasks = [read_file(filename) for filename in filenames]
    contents = await asyncio.gather(*tasks)

    # 处理内容
    processed_contents = [content.upper() for content in contents]

    # 并发写入处理后的内容
    write_tasks = [
        write_file(f"processed_{filename}", content)
        for filename, content in zip(filenames, processed_contents)
    ]
    await asyncio.gather(*write_tasks)

async def main():
    # 创建一些测试文件
    test_files = ["test1.txt", "test2.txt", "test3.txt"]
    for i, filename in enumerate(test_files):
        with open(filename, 'w') as f:
            f.write(f"This is test file {i+1}.\nIt contains some text for testing async file IO.")

    # 处理文件
    start_time = time.time()
    await process_files(test_files)
    end_time = time.time()

    print(f"异步处理 {len(test_files)} 个文件,耗时: {end_time - start_time:.2f}秒")

    # 验证结果
    for filename in test_files:
        processed_filename = f"processed_{filename}"
        with open(processed_filename, 'r') as f:
            content = f.read()
            print(f"{processed_filename} 内容: {content[:50]}...")

# 注意:需要先安装aiofiles库:pip install aiofiles
# asyncio.run(main())
print("注意:运行此代码需要先安装aiofiles库:pip install aiofiles")

八、多线程调试与常见问题

8.1 多线程程序的调试技巧

调试多线程程序比调试单线程程序更具挑战性,因为线程的执行顺序是不确定的,问题可能难以重现。以下是一些有用的调试技巧:

  1. 使用日志而不是print:使用Python的logging模块,它是线程安全的,并且可以包含线程ID、时间戳等信息。
import logging
import threading
import time

# 配置日志
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
)

def worker(name):
    logging.debug(f"线程 {name} 开始执行")
    time.sleep(1)
    logging.debug(f"线程 {name} 执行完毕")

# 创建线程
threads = []
for i in range(3):
    t = threading.Thread(target=worker, name=f"Thread-{i}")
    threads.append(t)
    t.start()

# 等待所有线程结束
for t in threads:
    t.join()

logging.debug("所有线程执行完毕")
  1. 使用线程名称:给线程设置有意义的名称,便于在日志和调试器中识别。

  2. 使用调试器:Python的调试器(如pdb)可以用于调试多线程程序,但需要注意线程切换可能导致的复杂性。

  3. 线程追踪:使用threading.settrace()threading.setprofile()可以为所有线程设置跟踪函数。

  4. 使用threading.enumerate()threading.current_thread():这些函数可以帮助你了解当前活动的线程。

import threading
import time

def list_threads():
    """列出当前所有活动线程"""
    current = threading.current_thread()
    print(f"当前线程: {current.name} (ID: {current.ident})")

    print("\n所有活动线程:")
    for thread in threading.enumerate():
        print(f"- {thread.name} (ID: {thread.ident}, 守护线程: {thread.daemon})")

def worker(name, delay):
    print(f"线程 {name} 开始执行")
    time.sleep(delay)
    list_threads()  # 列出线程
    print(f"线程 {name} 执行完毕")

# 主线程
list_threads()

# 创建线程
threads = []
for i in range(3):
    t = threading.Thread(target=worker, name=f"Worker-{i}", args=(f"Worker-{i}", i+1))
    threads.append(t)
    t.start()

# 等待一段时间
time.sleep(0.5)
list_threads()  # 列出线程

# 等待所有线程结束
for t in threads:
    t.join()

print("所有线程执行完毕")
list_threads()  # 列出线程

8.2 常见的多线程问题及解决方案

8.2.1 竞态条件(Race Condition)

问题:多个线程同时访问和修改共享数据,导致数据不一致。

解决方案

  • 使用锁(LockRLock)保护共享资源
  • 使用线程安全的数据结构(如Queue
  • 使用线程本地存储(threading.local())避免共享

8.2.2 死锁(Deadlock)

问题:两个或多个线程互相等待对方持有的资源,导致永久阻塞。

解决方案

  • 按固定顺序获取锁
  • 使用超时机制
  • 使用RLock代替Lock(适用于同一线程多次获取锁的情况)
  • 减少锁的使用范围和持有时间

8.2.3 活锁(Livelock)

问题:线程不断响应彼此的操作,但无法向前推进。

解决方案

  • 在重试逻辑中添加随机延迟
  • 实现退避策略(如指数退避)
import threading
import time
import random

def worker_with_backoff(id, shared_resource, lock):
    backoff = 0.1  # 初始退避时间
    max_backoff = 2.0  # 最大退避时间

    while True:
        if lock.acquire(timeout=0.1):  # 尝试获取锁,设置超时
            try:
                # 使用共享资源
                print(f"线程 {id} 获取了锁,正在使用共享资源")
                shared_resource['value'] += 1
                time.sleep(0.1)  # 模拟工作
                print(f"线程 {id} 完成工作,共享资源值: {shared_resource['value']}")
                break  # 成功完成,退出循环
            finally:
                lock.release()
        else:
            # 未能获取锁,实现指数退避
            jitter = random.uniform(0, 0.1)  # 添加随机抖动
            sleep_time = min(backoff + jitter, max_backoff)
            print(f"线程 {id} 未能获取锁,等待 {sleep_time:.2f} 秒后重试")
            time.sleep(sleep_time)
            backoff = min(backoff * 2, max_backoff)  # 指数增长,但不超过最大值

# 共享资源
resource = {'value': 0}
lock = threading.Lock()

# 创建线程
threads = []
for i in range(5):
    t = threading.Thread(target=worker_with_backoff, args=(i, resource, lock))
    threads.append(t)
    t.start()

# 等待所有线程结束
for t in threads:
    t.join()

print(f"最终共享资源值: {resource['value']}")

8.2.4 线程饥饿(Starvation)

问题:某些线程长时间无法获取所需资源,导致任务无法进展。

解决方案

  • 使用公平锁(Python标准库中没有直接提供,但可以自行实现)
  • 使用超时机制
  • 适当调整线程优先级(Python中较难直接控制)

8.2.5 GIL相关问题

问题:Python的GIL限制了多线程在CPU密集型任务上的性能。

解决方案

  • 对于CPU密集型任务,使用多进程代替多线程
  • 使用C扩展(如NumPy、Pandas)进行计算密集型操作,这些库在计算时会释放GIL
  • 考虑使用其他Python实现(如Jython、IronPython)或其他语言

8.2.6 线程安全问题

问题:某些Python标准库和第三方库可能不是线程安全的。

解决方案

  • 查阅库文档,确认线程安全性
  • 使用锁保护对非线程安全库的调用
  • 考虑使用线程本地存储或每个线程一个实例
import threading
import sqlite3  # sqlite3在多线程环境中需要小心使用

# 使用线程本地存储
local_data = threading.local()

def get_connection():
    """获取线程本地的数据库连接"""
    if not hasattr(local_data, 'connection'):
        local_data.connection = sqlite3.connect(':memory:')
        # 创建表
        cursor = local_data.connection.cursor()
        cursor.execute('CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, value TEXT)')
        local_data.connection.commit()
    return local_data.connection

def worker(name, values):
    """每个线程使用自己的数据库连接"""
    conn = get_connection()
    cursor = conn.cursor()

    # 插入数据
    for value in values:
        cursor.execute('INSERT INTO test (value) VALUES (?)', (f"{name}-{value}",))
    conn.commit()

    # 查询数据
    cursor.execute('SELECT * FROM test')
    results = cursor.fetchall()
    print(f"线程 {name} 查询结果: {results}")

# 创建线程
threads = []
for i in range(3):
    values = [f"value-{j}" for j in range(i*3, (i+1)*3)]
    t = threading.Thread(target=worker, args=(f"Thread-{i}", values))
    threads.append(t)
    t.start()

# 等待所有线程结束
for t in threads:
    t.join()

print("所有线程执行完毕")

8.3 性能监控与优化

监控和优化多线程程序的性能是一项重要任务。以下是一些有用的技巧:

  1. 使用cProfile进行性能分析
import cProfile
import threading
import time

def worker(count):
    """模拟工作线程"""
    total = 0
    for i in range(count):
        total += i
    time.sleep(0.1)  # 模拟IO操作
    return total

def run_threads(num_threads, count_per_thread):
    """运行多个线程"""
    threads = []
    for i in range(num_threads):
        t = threading.Thread(target=worker, args=(count_per_thread,))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

# 使用cProfile分析性能
cProfile.run('run_threads(5, 1000000)')
  1. 使用time模块测量执行时间
import time
import threading

def measure_time(func, *args, **kwargs):
    """测量函数执行时间的装饰器"""
    start_time = time.time()
    result = func(*args, **kwargs)
    end_time = time.time()
    print(f"{func.__name__} 执行耗时: {end_time - start_time:.4f}秒")
    return result

@measure_time
def run_in_threads(func, args_list, num_threads):
    """在线程池中运行多个任务"""
    # 实现一个简单的线程池
    results = [None] * len(args_list)

    def worker(index, args):
        results[index] = func(*args)

    # 创建并启动线程
    threads = []
    for i, args in enumerate(args_list):
        t = threading.Thread(target=worker, args=(i, args))
        threads.append(t)
        t.start()

        # 控制并发线程数
        if len(threads) >= num_threads:
            threads[0].join()
            threads.pop(0)

    # 等待剩余线程完成
    for t in threads:
        t.join()

    return results

# 测试函数
def compute_sum(n):
    return sum(range(n))

# 准备参数
args_list = [(1000000,), (2000000,), (3000000,), (4000000,), (5000000,)]

# 使用不同数量的线程测试性能
for num_threads in [1, 2, 3, 4, 5]:
    print(f"\n使用 {num_threads} 个线程:")
    results = run_in_threads(compute_sum, args_list, num_threads)
    print(f"结果: {results}")
  1. 使用threading.stack_size()调整线程栈大小
import threading
import sys

# 查看默认栈大小
default_stack_size = threading.stack_size()
print(f"默认线程栈大小: {default_stack_size} 字节")

# 调整栈大小(仅在某些平台上支持)
try:
    # 设置为2MB
    new_stack_size = 2 * 1024 * 1024
    threading.stack_size(new_stack_size)
    print(f"线程栈大小已调整为: {threading.stack_size()} 字节")
except (ValueError, ThreadError) as e:
    print(f"无法调整栈大小: {e}")

# 创建使用新栈大小的线程
def recursive_function(depth):
    if depth <= 0:
        return
    # 创建一些局部变量,占用栈空间
    data = [0] * 1000
    recursive_function(depth - 1)

def thread_function():
    try:
        # 尝试较深的递归
        recursive_function(500)
        print("递归成功完成")
    except RecursionError as e:
        print(f"递归错误: {e}")

# 创建并启动线程
t = threading.Thread(target=thread_function)
t.start()
t.join()

# 恢复默认栈大小
threading.stack_size(default_stack_size)
print(f"已恢复默认栈大小: {threading.stack_size()} 字节")
  1. 使用concurrent.futures.ThreadPoolExecutormax_workers参数控制线程数
import concurrent.futures
import time
import random

def task(id):
    """模拟任务,随机耗时"""
    sleep_time = random.uniform(0.1, 1.0)
    time.sleep(sleep_time)
    return f"任务 {id} 完成,耗时 {sleep_time:.2f}秒"

def benchmark_thread_pool(num_tasks, max_workers):
    """测试不同线程池大小的性能"""
    start_time = time.time()

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(task, i) for i in range(num_tasks)]
        results = [future.result() for future in concurrent.futures.as_completed(futures)]

    end_time = time.time()
    elapsed_time = end_time - start_time

    print(f"线程池大小: {max_workers}, 任务数: {num_tasks}, 总耗时: {elapsed_time:.2f}秒")
    return elapsed_time

# 测试不同线程池大小
num_tasks = 50
for max_workers in [1, 2, 5, 10, 20, 50, 100]:
    benchmark_thread_pool(num_tasks, max_workers)

通过这些技术,你可以监控和优化多线程程序的性能,找到最适合你的应用场景的线程配置。

动物装饰