news 2026/6/13 17:52:21

Python 多线程 多任务 分布式进程 ThreadLocal

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python 多线程 多任务 分布式进程 ThreadLocal

target: 传入 函数,开辟线程,这个线程要执行的任务


线程

import threading import time# 方法1:直接使用 Thread 类defworker(name,delay):print(f"线程 {name} 开始工作")time.sleep(delay)print(f"线程 {name} 完成工作")# 创建线程t1=threading.Thread(target=worker,args=("Thread-1",2))t2=threading.Thread(target=worker,args=("Thread-2",1))# 启动线程t1.start()t2.start()# 等待线程完成t1.join()t2.join()print("所有线程完成")### 线程同步 互斥锁importthreadingcounter=0lock=threading.Lock()defincrement():globalcounterfor_inrange(100000):lock.acquire()# 获取锁 counter+=1lock.release()# 释放锁# 或者使用 with 语句(推荐)# with lock:# counter += 1# 创建多个线程threads=[]for_inrange(5):t=threading.Thread(target=increment)threads.append(t)t.start()fortinthreads:t.join()print(f"最终计数: {counter}")#500000## RLock(可重入锁)# 可重入锁(Reentrant Lock) 允许同一个线程多次获取同一个锁,而不会造成死锁。它会记录锁的持有线程和获取次数。importthreadingrlock=threading.RLock()defrecursive_function(n):withrlock:ifn>0:print(f"递归深度: {n}")recursive_function(n-1)# 同一个线程可以多次获取RLockt=threading.Thread(target=recursive_function,args=(5,))t.start()t.join()## Semaphore(信号量)import threading import time# 限制同时最多3个线程访问semaphore=threading.Semaphore(3)deflimited_access(name):withsemaphore:print(f"{name} 获取访问权限")time.sleep(2)# 模拟工作print(f"{name} 释放访问权限")# 创建10个线程threads=[]foriinrange(10):t=threading.Thread(target=limited_access,args=(f"Thread-{i}",))threads.append(t)t.start()fortinthreads:t.join()

》》》普通的lock 和 Rlock

import threading# 普通 Lock - 同一个线程不能重复获取lock=threading.Lock()deflock_example():lock.acquire()print("第一次获取锁")# lock.acquire() # 死锁!线程会永远等待自己lock.release()# RLock - 同一个线程可以重复获取rlock=threading.RLock()defrlock_example():rlock.acquire()print("第一次获取锁")rlock.acquire()# 可以再次获取print("第二次获取锁")rlock.release()# 需要释放相同次数 rlock.release()

主线程和子线程 线程守护

默认情况下,子线程不会随这主线程执行完毕,而结束,所有主线程会等待所有子线程执行结束,才结束。
如主线程结束,子线程无论是否执行完,都是退出程序


创建线程时,指定daemon=True
或者
sub_thread.darmon=True
sub_thread.setDaemon(True)

线程直接执行是无序的是有CPU决定的

进程创建步骤



python 多进程 多线程 传参

args 元组传参:传参一定要和函数定义的参数顺序保持一致
kwargs字典传参:字典中的key一定要和参数名保持一致。 顺便随便。

元组为一个元素时,记得后面要有个 ,

my_tuple =(‘KEY’,)

获取进程编号

  1. 获取当前进程编号 整数 os.getpid()
  2. 获取当前父进程编号 整数 os.getppid()

子进程守护

默认主进程执行完,不会终止程序,而是等待所有子进程执行完毕,才能终止
sub_process.daemon = True, 这个子进程会守护,就是主进程终止,不需要等待这个子进程执行完,便可终止
这跟.NET 多进程效果一样。主进程终止,程序就终止了
创建子进程时,指定daemon=True
或者
sub_process.daemon=True

线程和进程区别


创建进程的的资源开销要比创建线程的资源开销大(每个进程里面至少存在一个线程)

进程是操作系统资源分配的基本单位、线程是CPU调度的基本单位

frommultiprocessing importProcessli=['aa']# 这个列表在主进程中defwrite():foriinrange(10):li.append(i)# 这是在子进程 p1 中修改自己的liprint(li)# p1 进程中的 li:['aa',0,1,2,...]defread():print(li)# 这是在子进程 p2 中打印自己的 li,只有['aa']if__name__=='__main__':p1=Process(target=write)p2=Process(target=read)p1.start()p1.join()# 等待 p1 执行完 p2.start()# p2 读取的是自己的内存空间,不是 p1 修改后的# 输出:# ['aa', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] (p1的输出)# ['aa'] (p2的输出)

# 使用 multiprocessing.Manager 共享数据frommultiprocessingimportProcess,Managerdefwrite(shared_list):foriinrange(10):shared_list.append(i)print(f"write进程中的列表: {shared_list}")defread(shared_list):print(f"read进程中的列表: {shared_list}")if__name__=='__main__':# 创建共享列表manager=Manager()li=manager.list(['aa'])p1=Process(target=write,args=(li,))p2=Process(target=read,args=(li,))p1.start()p1.join()p2.start()p2.join()print(f"主进程中的列表: {li}")# 输出:# write进程中的列表: ['aa', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]# read进程中的列表: ['aa', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]# 主进程中的列表: ['aa', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 使用 Queue 传递数据frommultiprocessingimportProcess,Queuedefwrite(q):li=['aa']foriinrange(10):li.append(i)q.put(li)# 将结果放入队列defread(q):li=q.get()# 从队列获取数据print(li)if__name__=='__main__':q=Queue()p1=Process(target=write,args=(q,))p2=Process(target=read,args=(q,))p1.start()p1.join()p2.start()p2.join()# 输出: ['aa', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

分布式进程

ThreadLocal

ThreadLocal 是一种线程局部存储(Thread-Local Storage, TLS) 机制。简单说,它让你在不同线程中拥有相互隔离的变量副本——每个线程访问到的都是自己的那份数据,互不干扰。

importthreadingimporttime# 创建一个 ThreadLocal 对象local_data=threading.local()defworker(name):# 为当前线程设置属性local_data.user=nameprint(f"线程{name}设置 user ={local_data.user}")# 模拟一些操作time.sleep(0.1)# 再次读取,仍然是刚才设置的值print(f"线程{name}再次读取 user ={local_data.user}")# 创建多个线程t1=threading.Thread(target=worker,args=("Alice",))t2=threading.Thread(target=worker,args=("Bob",))t1.start()t2.start()t1.join()t2.join()线程 Alice 设置 user=Alice 线程 Bob 设置 user=Bob 线程 Alice 再次读取 user=Alice 线程 Bob 再次读取 user=Bob 可以看到,两个线程各自操作的 local_data.user 是完全隔离的。

线程通信

## 1. 共享变量 + 锁(threading.Lock) 最推荐的线程间通信方式,无需手动加锁。importthreading shared_data=0lock=threading.Lock()defworker():globalshared_datafor_inrange(10000):withlock:shared_data+=1t1=threading.Thread(target=worker)t2=threading.Thread(target=worker)t1.start();t2.start()t1.join();t2.join()print(shared_data)# 20000# 2. queue.Queue(线程安全队列)importthreading,queue q=queue.Queue()defproducer():foriinrange(5):q.put(i)q.put(None)# 结束信号defconsumer():whileTrue:item=q.get()ifitemisNone:breakprint(item)threading.Thread(target=producer).start()threading.Thread(target=consumer).start()# 3. 事件(threading.Event) 用于线程间的信号通知(一个线程等待另一个线程的“标志”)。importthreading event=threading.Event()defwaiter():event.wait()# 阻塞直到 event.set()print("Go!")defsetter():importtime time.sleep(2)event.set()threading.Thread(target=waiter).start()threading.Thread(target=setter).start()

进程通信(IPC)

进程不共享内存,不能直接访问对方变量。Python 的 multiprocessing 模块提供了多种 IPC 机制。

# 1. multiprocessing.Queue 类似 queue.Queue,但数据会通过 pickle 序列化后在进程间传递。frommultiprocessingimportProcess,Queuedefworker(q):q.put("Hello from child")if__name__=="__main__":q=Queue()p=Process(target=worker,args=(q,))p.start()print(q.get())# Hello from childp.join()# 2. multiprocessing.Pipe 返回一对连接对象(conn1, conn2) 是一对 一对 ,类似双向管道。比 Queue 更轻量。frommultiprocessingimportProcess,Pipedefchild(conn):conn.send("Hi parent")conn.close()parent_conn,child_conn=Pipe()p=Process(target=child,args=(child_conn,))p.start()print(parent_conn.recv())# Hi parentp.join()# 3. 共享内存(multiprocessing.Value / Array) 通过共享内存区域实现进程间共享数据,需要配合锁避免竞争。frommultiprocessingimportProcess,Value,Lockdefadd(shared_val,lock):for_inrange(1000):# with自动加锁自动释放锁 这比手动调用 lock.acquire() 和 lock.release() 更安全、简洁,能避免因忘记释放锁或异常导致锁未释放的问题。withlock:shared_val.value+=1if__name__=="__main__":v=Value('i',0)# 'i' 表示有符号整型lock=Lock()p1=Process(target=add,args=(v,lock))p2=Process(target=add,args=(v,lock))p1.start();p2.start()p1.join();p2.join()print(v.value)# 2000# 4. multiprocessing.Manager 提供更高级的共享对象(如列表、字典、队列等),但性能相对较低。frommultiprocessingimportProcess,Managerdefworker(d,key,value):d[key]=valueif__name__=="__main__":withManager()asmanager:d=manager.dict()p=Process(target=worker,args=(d,"name","Alice"))p.start()p.join()print(d)# {'name': 'Alice'}

分布式通信


celery

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/13 17:51:42

VR-Reversal:零成本解锁3D视频观看体验的智能播放方案

VR-Reversal:零成本解锁3D视频观看体验的智能播放方案 【免费下载链接】VR-reversal VR-Reversal - Player for conversion of 3D video to 2D with optional saving of head tracking data and rendering out of 2D copies. 项目地址: https://gitcode.com/gh_mi…

作者头像 李华
网站建设 2026/6/13 17:48:52

如何用MOFA2破解多组学数据的隐藏维度?3个关键应用场景解析

如何用MOFA2破解多组学数据的隐藏维度?3个关键应用场景解析 【免费下载链接】MOFA2 Multi-Omics Factor Analysis 项目地址: https://gitcode.com/gh_mirrors/mo/MOFA2 当我们面对基因组学、转录组学、蛋白质组学等多维度生物数据时,传统分析方法…

作者头像 李华
网站建设 2026/6/13 17:42:01

Artisan烘焙软件:如何用开源技术重塑咖啡烘焙的数据化未来?

Artisan烘焙软件:如何用开源技术重塑咖啡烘焙的数据化未来? 【免费下载链接】artisan artisan: the worlds most trusted roasting software 项目地址: https://gitcode.com/gh_mirrors/ar/artisan 你是否曾想过,一杯完美的咖啡背后需…

作者头像 李华
网站建设 2026/6/13 17:34:02

MCU定时器中断与PWM生成实战:从MC9RS08KB12 TPM模块到通用原理

1. 项目概述:从芯片手册到实战经验如果你曾经在嵌入式项目中尝试过用MCU的定时器生成一个精准的PWM信号来控制电机转速或者LED亮度,大概率会翻过芯片的参考手册。手册里那些关于中断标志、计数器溢出、通道比较寄存器的描述,字都认识&#xf…

作者头像 李华
网站建设 2026/6/13 17:31:30

嵌入式汇编器配置实战:环境变量、编辑器集成与项目构建优化

1. 汇编器配置的核心价值与工作流定位在嵌入式开发和底层系统编程的日常里,汇编器是我们与硬件直接对话的桥梁。它不像高级语言编译器那样有复杂的语法糖和抽象层,它的工作简单而纯粹:将我们写的助记符(MOV, ADD, JMP)…

作者头像 李华
网站建设 2026/6/13 17:31:30

10分钟快速搭建学之思开源考试系统:从零到上线完整指南

10分钟快速搭建学之思开源考试系统:从零到上线完整指南 【免费下载链接】xzs 在线考试系统 项目地址: https://gitcode.com/gh_mirrors/xz/xzs 学之思开源考试系统是一款功能强大的在线考试平台,支持题库管理、试卷创建、在线考试和成绩分析等核心…

作者头像 李华