一、理论基础
1.1 线程与进程的区别
进程:拥有独立的内存空间、数据栈等,系统开销大,进程间通信复杂。
线程:依附于进程,共享进程的内存空间,创建和切换开销小,但需要自己处理共享数据的同步问题。
1.2 Python 的 GIL(全局解释器锁)
CPython 解释器中有一个 GIL,同一时刻只允许一个线程执行 Python 字节码。
因此,Python 的多线程并不能利用多核 CPU 并行计算(计算密集型任务反而可能变慢)。
适用场景:I/O 密集型任务(网络请求、文件读写、用户输入等),线程在等待 I/O 时会释放 GIL,从而真正并发。
1.3 两个线程模块
thread(Python 2 中的名字,Python 3 中为_thread):底层、原始,不推荐直接使用。threading:基于_thread封装,提供了更强大的Thread类和各种同步工具,推荐使用。
二、实操环境准备
Python 3.6+(任何较新版本都可以)
任意代码编辑器(VS Code、PyCharm、甚至记事本)
命令行 / 终端
验证 Python 版本:
python --version # 应显示 3.x三、实操步骤 1:创建并启动线程(两种方式)
3.1 方式一:函数式(使用threading.Thread(target=...))
新建文件thread_func.py,写入以下代码:
import threading import time def print_time(thread_name, delay, counter): """线程函数:每隔 delay 秒打印一次当前时间,重复 counter 次""" while counter > 0: time.sleep(delay) print(f"{thread_name}: {time.ctime()}") counter -= 1 # 创建两个线程 t1 = threading.Thread(target=print_time, args=("Thread-1", 2, 5)) t2 = threading.Thread(target=print_time, args=("Thread-2", 4, 5)) # 启动线程 t1.start() t2.start() # 等待两个线程都结束(否则主线程可能提前退出) t1.join() t2.join() print("所有线程执行完毕,主线程退出。")运行:
python thread_func.py预期输出(时间会变化,但顺序类似):
Thread-1: Thu Jun 11 15:42:17 2026 Thread-1: Thu Jun 11 15:42:19 2026 Thread-2: Thu Jun 11 15:42:19 2026 Thread-1: Thu Jun 11 15:42:21 2026 Thread-2: Thu Jun 11 15:42:23 2026 ...
说明:
threading.Thread(target=函数名, args=元组)创建线程对象。start()使线程开始运行(调用run(),默认run()会执行target指定的函数)。join()阻塞主线程,直到被调用的线程结束,避免主线程提前退出。
3.2 方式二:继承threading.Thread类
新建文件thread_class.py:
import threading import time class MyThread(threading.Thread): def __init__(self, thread_id, name, delay, counter): super().__init__() # 调用父类初始化 self.thread_id = thread_id self.name = name self.delay = delay self.counter = counter def run(self): """线程启动后自动执行的方法""" print(f"开始线程:{self.name}") while self.counter > 0: time.sleep(self.delay) print(f"{self.name}: {time.ctime()}") self.counter -= 1 print(f"退出线程:{self.name}") # 创建线程实例 thread1 = MyThread(1, "Thread-1", 1, 5) thread2 = MyThread(2, "Thread-2", 2, 5) thread1.start() thread2.start() thread1.join() thread2.join() print("主线程结束")运行:
python thread_class.py效果与函数式类似,但更符合面向对象风格,适合需要保存更多状态或复用的场景。
四、实操步骤 2:线程同步(使用锁)
当多个线程修改同一份数据时,会发生“竞态条件”。下面用两个线程同时对一个全局变量加 1000000 次来演示问题,然后用锁解决。
4.1 无锁的错误示例
新建race_condition.py:
import threading # 共享资源 counter = 0 def increment(thread_name, times): global counter for _ in range(times): counter += 1 # 这不是原子操作!可能被线程切换中断 # 创建两个线程,每个加 1000000 次 t1 = threading.Thread(target=increment, args=("A", 1000000)) t2 = threading.Thread(target=increment, args=("B", 1000000)) t1.start() t2.start() t1.join() t2.join() print(f"期望结果: 2000000") print(f"实际结果: {counter}")运行几次,你会发现结果往往小于 2000000(如 1845214 等),这就是因为线程交替执行时丢失了更新。
4.2 使用锁(threading.Lock)修复
新建with_lock.py:
import threading counter = 0 lock = threading.Lock() # 创建一个锁对象 def increment_safe(thread_name, times): global counter for _ in range(times): lock.acquire() # 获取锁(如果已被其他线程持有,则阻塞) # 以下临界区代码同一时刻只有一个线程能执行 counter += 1 lock.release() # 释放锁 # 也可以使用 with 语句(更推荐): def increment_safe_with(thread_name, times): global counter for _ in range(times): with lock: # 自动 acquire 和 release counter += 1 t1 = threading.Thread(target=increment_safe, args=("A", 1000000)) t2 = threading.Thread(target=increment_safe, args=("B", 1000000)) t1.start() t2.start() t1.join() t2.join() print(f"加锁后的结果: {counter}") # 稳定输出 2000000说明:
lock.acquire()和lock.release()之间的代码称为“临界区”,一次只允许一个线程进入。使用
with lock:更加简洁安全,避免忘记释放锁。
五、实操步骤 3:线程间通信 - 队列(Queue)
queue.Queue是线程安全的 FIFO 队列,非常适合生产者-消费者模式。
示例:三个工人线程处理一个任务队列
新建queue_demo.py:
import threading import queue import time import random # 任务队列,最大长度 5 task_queue = queue.Queue(maxsize=5) def producer(name, total_tasks): """生产者:往队列中放任务""" for i in range(total_tasks): task = f"任务-{i}" task_queue.put(task) # 如果队列满,会阻塞直到有空间 print(f"{name} 生产了 {task}") time.sleep(random.uniform(0.2, 0.5)) # 生产结束后发送“毒丸”信号,通知消费者退出(数量等于消费者个数) for _ in range(3): task_queue.put(None) def consumer(name): """消费者:从队列中取任务并处理""" while True: task = task_queue.get() # 如果队列空,会阻塞直到有新任务 if task is None: # 收到毒丸,退出 break print(f"{name} 开始处理 {task}") time.sleep(random.uniform(0.3, 0.7)) # 模拟处理耗时 print(f"{name} 完成 {task}") task_queue.task_done() # 告诉队列该任务已完成 # 创建线程 prod = threading.Thread(target=producer, args=("生产者", 10)) cons1 = threading.Thread(target=consumer, args=("消费者-1",)) cons2 = threading.Thread(target=consumer, args=("消费者-2",)) cons3 = threading.Thread(target=consumer, args=("消费者-3",)) # 启动消费者(先启动,会阻塞等待任务) cons1.start() cons2.start() cons3.start() # 启动生产者 prod.start() # 等待生产者结束 prod.join() # 等待所有任务被处理(队列为空且 task_done 调用次数匹配) task_queue.join() print("所有任务处理完毕")运行:
bash
python queue_demo.py
你会看到三个消费者并发地从队列中取出任务处理,整个过程自动同步,无需手动加锁。
关键方法:
put(item):放入元素(队列满时阻塞)get():取出元素(队列空时阻塞)task_done():表示之前取出的任务已完成join():阻塞直到队列中所有元素都被task_done()确认
六、实操步骤 4:使用threading的其他常用功能
6.1 获取当前线程信息
import threading def show_info(): t = threading.current_thread() print(f"当前线程: {t.name}, ID: {t.ident}, 是否存活: {t.is_alive()}") t = threading.Thread(target=show_info, name="我的线程") t.start() t.join()运行结果为:
6.2 线程枚举
print(f"活动线程数: {threading.active_count()}") for t in threading.enumerate(): print(t.name)6.3 守护线程(Daemon Thread)
主线程结束时,守护线程会自动被终止。适合后台监控任务。
d = threading.Thread(target=some_task, daemon=True) d.start() # 主线程结束后,d 会立即被强制结束七、注意事项与最佳实践
| 场景 | 推荐做法 |
|---|---|
避免使用_thread模块 | 始终使用threading和queue |
| 共享可变数据 | 使用Lock、RLock或Queue |
| 生产者-消费者模型 | 优先使用queue.Queue,不要自己用锁实现 |
| 线程池 | 使用concurrent.futures.ThreadPoolExecutor |
| 计算密集型任务 | 改用multiprocessing模块利用多核 CPU |
| 线程间通信 | 使用Queue或Event、Condition等 |
八、扩展学习:线程池(ThreadPoolExecutor)
对于大量短期任务,频繁创建销毁线程开销大,线程池可以复用线程。
from concurrent.futures import ThreadPoolExecutor import time def task(n): time.sleep(1) return n * n with ThreadPoolExecutor(max_workers=3) as executor: results = executor.map(task, [1, 2, 3, 4, 5]) print(list(results)) # [1, 4, 9, 16, 25]map方法自动将迭代器的元素分配给线程池中的线程执行,非常方便