news 2026/6/10 19:15:10

Python进程间通信与消息队列

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python进程间通信与消息队列

Python进程间通信与消息队列

一、进程间通信概述

进程间通信(IPC)是多进程协作的基础。Python提供了多种IPC机制:
- 管道(Pipe)
- 队列(Queue)
- 共享内存(SharedMemory)
- 信号(Signal)
- 套接字(Socket)
- 内存映射文件(mmap)


二、multiprocessing.Queue

from multiprocessing import Process, Queue
import time

class TaskQueue:
"""基于多进程队列的任务分发系统"""
def __init__(self, num_workers=4):
self.task_queue = Queue()
self.result_queue = Queue()
self.num_workers = num_workers
self.workers = []

def start(self):
for i in range(self.num_workers):
p = Process(target=self._worker, args=(i,))
p.daemon = True
p.start()
self.workers.append(p)

def _worker(self, worker_id):
while True:
task = self.task_queue.get()
if task is None: # 毒丸信号
break
task_id, func, args, kwargs = task
try:
result = func(*args, **kwargs)
self.result_queue.put((task_id, 'success', result))
except Exception as e:
self.result_queue.put((task_id, 'error', str(e)))

def submit(self, task_id, func, *args, **kwargs):
self.task_queue.put((task_id, func, args, kwargs))

def get_result(self, timeout=None):
return self.result_queue.get(timeout=timeout)

def shutdown(self):
for _ in self.workers:
self.task_queue.put(None)
for p in self.workers:
p.join()

# 使用
def heavy_computation(n):
return sum(i * i for i in range(n))

tq = TaskQueue(num_workers=4)
tq.start()

for i in range(10):
tq.submit(i, heavy_computation, 1000000)

for i in range(10):
task_id, status, result = tq.get_result()
print(f"任务 {task_id}: {status} = {result}")

tq.shutdown()


三、Pipe管道

from multiprocessing import Process, Pipe

def sender(conn, messages):
for msg in messages:
conn.send(msg)
time.sleep(0.1)
conn.send(None) # 结束信号
conn.close()

def receiver(conn):
while True:
msg = conn.recv()
if msg is None:
break
print(f"收到: {msg}")
conn.close()

# Pipe返回两个连接对象
parent_conn, child_conn = Pipe()

p1 = Process(target=sender, args=(parent_conn, ["hello", "world", "python"]))
p2 = Process(target=receiver, args=(child_conn,))

p1.start()
p2.start()
p1.join()
p2.join()


四、共享内存(Python 3.8+)

from multiprocessing import shared_memory, Process
import numpy as np

def worker_with_shared_memory(shm_name, shape, dtype):
"""工作进程访问共享内存"""
existing_shm = shared_memory.SharedMemory(name=shm_name)
array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

# 直接修改共享数组
array *= 2

existing_shm.close()

# 主进程创建共享内存
data = np.array([1, 2, 3, 4, 5], dtype=np.int64)
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:]

# 启动工作进程
p = Process(target=worker_with_shared_memory,
args=(shm.name, data.shape, data.dtype))
p.start()
p.join()

print(shared_array) # [2, 4, 6, 8, 10]

# 清理
shm.close()
shm.unlink()


五、Manager对象

from multiprocessing import Manager, Process

def worker(shared_dict, shared_list, lock, worker_id):
with lock:
shared_dict[f'worker_{worker_id}'] = f'result_{worker_id}'
shared_list.append(worker_id)

# Manager提供进程安全的共享对象
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
lock = manager.Lock()

processes = []
for i in range(5):
p = Process(target=worker, args=(shared_dict, shared_list, lock, i))
processes.append(p)
p.start()

for p in processes:
p.join()

print(dict(shared_dict))
print(list(shared_list))


六、信号处理

import signal
import sys

class GracefulShutdown:
"""优雅关闭处理"""
def __init__(self):
self.shutdown_requested = False
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)

def _handle_signal(self, signum, frame):
print(f"\n收到信号 {signum},准备关闭...")
self.shutdown_requested = True

def should_continue(self):
return not self.shutdown_requested

# 使用
shutdown = GracefulShutdown()

while shutdown.should_continue():
# 执行工作
process_next_task()
time.sleep(1)

print("清理资源...")
cleanup()
print("已安全关闭")


七、简易消息队列实现

import json
import threading
from queue import Queue, Empty
from typing import Callable, Any

class MessageBroker:
"""简易发布/订阅消息代理"""
def __init__(self):
self._topics = {}
self._subscribers = {}
self._lock = threading.Lock()

def create_topic(self, topic_name, max_size=1000):
with self._lock:
if topic_name not in self._topics:
self._topics[topic_name] = Queue(maxsize=max_size)
self._subscribers[topic_name] = []

def publish(self, topic_name, message):
if topic_name not in self._topics:
raise ValueError(f"主题不存在: {topic_name}")
self._topics[topic_name].put(message)
self._notify_subscribers(topic_name, message)

def subscribe(self, topic_name, callback: Callable):
if topic_name not in self._subscribers:
raise ValueError(f"主题不存在: {topic_name}")
self._subscribers[topic_name].append(callback)

def _notify_subscribers(self, topic_name, message):
for callback in self._subscribers[topic_name]:
threading.Thread(target=callback, args=(message,)).start()

def consume(self, topic_name, timeout=None):
if topic_name not in self._topics:
raise ValueError(f"主题不存在: {topic_name}")
try:
return self._topics[topic_name].get(timeout=timeout)
except Empty:
return None

# 使用
broker = MessageBroker()
broker.create_topic('orders')
broker.create_topic('notifications')

# 订阅
broker.subscribe('orders', lambda msg: print(f"处理订单: {msg}"))

# 发布
broker.publish('orders', {'order_id': 1, 'amount': 99.99})


八、使用Redis作为消息队列

class RedisMessageQueue:
"""基于Redis的消息队列(示意实现)"""
def __init__(self, redis_client, queue_name):
self.redis = redis_client
self.queue_name = queue_name

def publish(self, message):
data = json.dumps(message)
self.redis.lpush(self.queue_name, data)

def consume(self, timeout=0):
result = self.redis.brpop(self.queue_name, timeout=timeout)
if result:
_, data = result
return json.loads(data)
return None

def consume_batch(self, batch_size=10):
pipe = self.redis.pipeline()
for _ in range(batch_size):
pipe.rpop(self.queue_name)
results = pipe.execute()
return [json.loads(r) for r in results if r]

def length(self):
return self.redis.llen(self.queue_name)

class RedisPubSub:
"""Redis发布/订阅"""
def __init__(self, redis_client):
self.redis = redis_client
self.pubsub = redis_client.pubsub()

def subscribe(self, channel, callback):
self.pubsub.subscribe(**{channel: callback})
thread = self.pubsub.run_in_thread(sleep_time=0.01)
return thread

def publish(self, channel, message):
self.redis.publish(channel, json.dumps(message))


九、异步消息处理

import asyncio

class AsyncMessageQueue:
"""异步消息队列"""
def __init__(self, maxsize=0):
self.queue = asyncio.Queue(maxsize=maxsize)
self.handlers = []
self._running = False

def register_handler(self, handler):
self.handlers.append(handler)

async def publish(self, message):
await self.queue.put(message)

async def start_consuming(self, num_consumers=3):
self._running = True
consumers = [
asyncio.create_task(self._consumer(f"consumer-{i}"))
for i in range(num_consumers)
]
await asyncio.gather(*consumers)

async def _consumer(self, name):
while self._running:
try:
message = await asyncio.wait_for(self.queue.get(), timeout=1.0)
for handler in self.handlers:
try:
await handler(message)
except Exception as e:
print(f"{name} 处理消息失败: {e}")
self.queue.task_done()
except asyncio.TimeoutError:
continue

async def stop(self):
self._running = False
await self.queue.join()

# 使用
async def order_handler(message):
print(f"处理订单: {message}")
await asyncio.sleep(0.5)

async def main():
mq = AsyncMessageQueue()
mq.register_handler(order_handler)

# 启动消费者
consumer_task = asyncio.create_task(mq.start_consuming(3))

# 发布消息
for i in range(10):
await mq.publish({'order_id': i, 'amount': i * 10})

await mq.queue.join()
await mq.stop()


十、总结

IPC选择建议:
- 简单数据传递 -> Queue或Pipe
- 大量数据共享 -> SharedMemory
- 复杂共享对象 -> Manager
- 分布式系统 -> Redis/RabbitMQ/Kafka
- 异步场景 -> asyncio.Queue

设计原则:
1. 优先使用Queue,它是进程安全的
2. 共享内存性能最高但需要手动同步
3. 消息队列解耦生产者和消费者
4. 考虑消息丢失、重复消费、顺序性问题
5. 生产环境使用成熟的消息中间件

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 19:05:15

人员轨迹溯源算法升级|人员动态管理视频孪生应用优化方案

一、升级背景当前司法监区智慧化建设已全面普及视频监控与基础智能分析系统,但在服刑人员动态精细化管控、全时空行为追溯、异动还原复盘等核心业务上,行业通用算法普遍存在明显工程短板:多镜头切换下轨迹断裂、人员密集遮挡轨迹漂移、换装/侧…

作者头像 李华
网站建设 2026/6/10 19:01:50

微信AI不是聊天助手,是跑腿调度员

微信AI不是聊天助手,是跑腿调度员 一个人 AI 一家公司。 你好,我是 joe45。 昨天微信AI生态指引发了。有人说“连个聊天助手都没有”,有人说“小程序要起飞了”。 我看了三遍,得出一个不太一样的结论。 微信AI不是来替你聊天的&…

作者头像 李华
网站建设 2026/6/10 19:01:45

收银机用途------自动点赞评论

这个机器如果不用就太可惜了,用起来,就能持续发挥作用。点赞评论是比较简单的功能,几乎没什么压力。---------------------------可以的,36GB的固态硬盘完全可以运行Ubuntu 22.04。无论你打算安装带图形界面的桌面版,还…

作者头像 李华
网站建设 2026/6/10 19:01:44

出海企业如何高效匹配全球市场调研供应商?

面向出海企业在海外调研中供应商难筛选、信息难核验的场景,梳理如何搭建可验证的服务商数据库,快速匹配多个国家和行业资源,借助覆盖全球的供应网络提升数据收集效率,让市场研究更精准、更省时。一、海外调研起步:先把…

作者头像 李华
网站建设 2026/6/10 19:01:22

如何让刻度数字自动变化?solidworks直尺模型建模给你答案

1.基础轮廓构建 在前视基准面上建立草图,然后进行利用拉伸特征命令进行拉伸处理,拉伸长度305mm,形成直尺的主体轮廓。 2.刻度线绘制 2.1长刻度线绘制 以直尺模型的“倒角面”为参考基准,创建新的草图,尺寸见下,然后进行拉伸处理,拉伸深度为0.1mm。 2.2短刻度线绘制 再次…

作者头像 李华
网站建设 2026/6/10 18:59:17

我把常用算法做成了动画,聊聊“可视化学习“到底有没有用

先抛观点:算法是"过程 空间结构"的东西,而文字和静态图天然丢失了"过程"这一维。这就是为什么很多人"看懂了题解却写不出代码"——你记住了结论,没记住状态怎么变。 这段时间我深度用了一个算法可视化站点&am…

作者头像 李华