一、Profiler深度使用指南
1.1 Profiler完整配置与数据收集
# profiler_setup.py import mindspore as ms import mindspore.nn as nn from mindspore import Profiler, GpuMemoryProfiler, AscendCollector import os import json class ComprehensiveProfiler: """全面性能分析工具类""" def __init__(self, output_path: str = "./profiling_results", profile_memory: bool = True, profile_communication: bool = True, profile_ai_core: bool = True, profile_framework: bool = True): self.output_path = output_path os.makedirs(output_path, exist_ok=True) # 1. 基础Profiler配置 self.profiler = Profiler( output_path=output_path, profile_memory=profile_memory, # 内存分析 profile_communication=profile_communication, # 通信分析 start_profile=False, # 手动启动 data_sink_mode=False, # 数据下沉模式分析 parallel_strategy={"sharding_profiling": True} # 并行策略分析 ) # 2. GPU内存分析器(如果是GPU) self.memory_profiler = GpuMemoryProfiler() if hasattr(ms, 'GpuMemoryProfiler') else None # 3. 昇腾专用分析器 self.ascend_collector = AscendCollector( output_path=output_path, data_path="/var/log/npu/slog/", # 昇腾日志路径 profile_ai_core=profile_ai_core, # AI Core分析 profile_l2_cache=True, # L2缓存分析 profile_hbm=True, # HBM分析 profile_task_queue=True # 任务队列分析 ) # 4. 分析配置 self.profile_config = { "start_step": 10, # 跳过热身步骤 "stop_step": 110, # 分析100个step "profile_framework": profile_framework, "ai_core_metrics": "PipeUtilization", # 流水线利用率 "l2_cache": "Bandwidth", # L2带宽 "data_format": "json" # 输出格式 } def start_profiling(self, model, dataloader): """启动性能分析""" print("=" * 60) print("启动MindSpore性能分析...") print("=" * 60) # 1. 启动Profiler self.profiler.start() # 2. 执行训练并分析 self._profile_training_loop(model, dataloader) # 3. 停止并分析 self.profiler.stop() self.profiler.analyse() # 4. 生成昇腾特定报告 if self.ascend_collector: self._collect_ascend_metrics() return self._generate_report() def _profile_training_loop(self, model, dataloader, num_steps: int = 100): """分析训练循环""" for step, batch in enumerate(dataloader): if step < self.profile_config["start_step"]: continue # 跳过热身 if step >= self.profile_config["stop_step"]: break # 停止分析 # 记录步骤开始 if step == self.profile_config["start_step"]: print(f"开始分析步骤 {step}") # 开始详细分析 self.profiler.start_profiling() # 执行训练步骤 loss = model(*batch) # 每10步输出状态 if step % 10 == 0: print(f"步骤 {step}: 损失 = {loss.asnumpy():.4f}") if self.memory_profiler: memory_info = self.memory_profiler.sample() print(f" 内存使用: {memory_info['used']/1024**3:.2f} GB") def _collect_ascend_metrics(self): """收集昇腾特定指标""" try: # 使用npu-smi获取硬件指标 import subprocess # 1. AI Core利用率 cmd = ["npu-smi", "info", "-t", "usages", "-i", "0"] result = subprocess.run(cmd, capture_output=True, text=True) ai_core_usage = self._parse_npu_smi_output(result.stdout) # 2. 温度监控 cmd = ["npu-smi", "info", "-t", "temperature", "-i", "0"] result = subprocess.run(cmd, capture_output=True, text=True) temperature = self._parse_npu_smi_output(result.stdout) # 3. 功耗监控 cmd = ["npu-smi", "info", "-t", "power", "-i", "0"] result = subprocess.run(cmd, capture_output=True, text=True) power = self._parse_npu_smi_output(result.stdout) ascend_metrics = { "ai_core_usage": ai_core_usage, "temperature": temperature, "power": power, "timestamp": ms.get_context("profiler_timestamp") } # 保存到文件 with open(f"{self.output_path}/ascend_metrics.json", "w") as f: json.dump(ascend_metrics, f, indent=2) except Exception as e: print(f"收集昇腾指标失败: {e}") def _generate_report(self): """生成性能报告""" report_path = f"{self.output_path}/summary.json" if os.path.exists(report_path): with open(report_path, "r") as f: report = json.load(f) # 关键性能指标提取 key_metrics = { "total_time": report.get("total_time", 0), "iteration_time": report.get("iteration_time", {}), "operator_performance": report.get("op_performance", {}), "memory_usage": report.get("memory_usage", {}), "communication": report.get("communication", {}), "bottlenecks": self._identify_bottlenecks(report) } return key_metrics return {} def _identify_bottlenecks(self, report): """识别性能瓶颈""" bottlenecks = [] # 1. 算子性能分析 op_perf = report.get("op_performance", {}) if op_perf: slow_ops = [op for op, time in op_perf.items() if time > 100] # 假设100ms为慢算子 if slow_ops: bottlenecks.append({ "type": "slow_operators", "operators": slow_ops, "suggestion": "考虑使用融合算子或TBE自定义算子优化" }) # 2. 内存瓶颈 memory = report.get("memory_usage", {}) if memory.get("peak_usage", 0) > 0.8 * memory.get("total", 1): # 使用超过80% bottlenecks.append({ "type": "memory_bottleneck", "peak_usage_gb": memory.get("peak_usage", 0) / 1024**3, "suggestion": "启用梯度检查点、混合精度训练、优化器分片" }) # 3. 通信瓶颈 comm = report.get("communication", {}) if comm.get("wait_time", 0) > comm.get("compute_time", 0): bottlenecks.append({ "type": "communication_bottleneck", "wait_time_ms": comm.get("wait_time", 0), "suggestion": "启用梯度融合、异步通信、计算通信重叠" }) return bottlenecks
1.2 实时性能监控仪表板
# performance_dashboard.py import time import threading from dataclasses import dataclass from typing import Dict, List import matplotlib.pyplot as plt import numpy as np @dataclass class PerformanceMetrics: """性能指标数据类""" timestamp: float iteration: int loss: float iteration_time: float memory_used: float memory_total: float ai_core_utilization: float data_loading_time: float communication_time: float class RealTimeDashboard: """实时性能监控仪表板""" def __init__(self, update_interval: int = 5): # 5秒更新一次 self.update_interval = update_interval self.metrics_history: List[PerformanceMetrics] = [] self.lock = threading.Lock() self.is_running = False # 创建图表 self.fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 8)) plt.ion() # 交互模式 def start_monitoring(self, model, dataloader): """启动监控""" self.is_running = True monitor_thread = threading.Thread( target=self._monitor_loop, args=(model, dataloader) ) monitor_thread.daemon = True monitor_thread.start() # 启动UI更新 self._update_dashboard() def _monitor_loop(self, model, dataloader): """监控循环""" iteration = 0 while self.is_running and iteration < 1000: # 安全限制 iteration_start = time.time() try: # 获取一个batch data_loading_start = time.time() batch = next(dataloader) data_loading_time = time.time() - data_loading_start # 执行训练 compute_start = time.time() loss = model(*batch) compute_time = time.time() - compute_start # 收集指标 metrics = self._collect_metrics( iteration=iteration, loss=loss, iteration_time=time.time() - iteration_start, data_loading_time=data_loading_time, compute_time=compute_time ) with self.lock: self.metrics_history.append(metrics) iteration += 1 # 控制频率 time.sleep(0.1) except StopIteration: break def _collect_metrics(self, **kwargs) -> PerformanceMetrics: """收集各种性能指标""" # 获取内存使用 try: import psutil process = psutil.Process() memory_info = process.memory_info() memory_used = memory_info.rss memory_total = psutil.virtual_memory().total except: memory_used = memory_total = 0 # 获取昇腾AI Core利用率 try: import subprocess cmd = ["npu-smi", "info", "-t", "usages", "-i", "0"] result = subprocess.run(cmd, capture_output=True, text=True) ai_core_util = self._parse_utilization(result.stdout) except: ai_core_util = 0.0 return PerformanceMetrics( timestamp=time.time(), memory_used=memory_used, memory_total=memory_total, ai_core_utilization=ai_core_util, **kwargs ) def _update_dashboard(self): """更新仪表板""" while self.is_running: time.sleep(self.update_interval) if not self.metrics_history: continue with self.lock: recent_metrics = self.metrics_history[-100:] # 最近100个点 if not recent_metrics: continue # 准备数据 iterations = [m.iteration for m in recent_metrics] losses = [m.loss for m in recent_metrics] times = [m.iteration_time for m in recent_metrics] memory_usage = [m.memory_used / 1024**3 for m in recent_metrics] # GB ai_core_util = [m.ai_core_utilization for m in recent_metrics] # 更新图表 self.ax1.clear() self.ax1.plot(iterations, losses, 'b-', linewidth=2) self.ax1.set_xlabel('Iteration') self.ax1.set_ylabel('Loss') self.ax1.set_title('Training Loss') self.ax1.grid(True, alpha=0.3) self.ax2.clear() self.ax2.plot(iterations, times, 'r-', linewidth=2) self.ax2.set_xlabel('Iteration') self.ax2.set_ylabel('Time (s)') self.ax2.set_title('Iteration Time') self.ax2.grid(True, alpha=0.3) self.ax3.clear() self.ax3.plot(iterations, memory_usage, 'g-', linewidth=2) self.ax3.set_xlabel('Iteration') self.ax3.set_ylabel('Memory (GB)') self.ax3.set_title('Memory Usage') self.ax3.grid(True, alpha=0.3) self.ax4.clear() self.ax4.plot(iterations, ai_core_util, 'm-', linewidth=2) self.ax4.set_xlabel('Iteration') self.ax4.set_ylabel('Utilization (%)') self.ax4.set_title('AI Core Utilization') self.ax4.grid(True, alpha=0.3) plt.tight_layout() plt.pause(0.01) def stop(self): """停止监控""" self.is_running = False plt.ioff() # 生成最终报告 self._generate_final_report() def _generate_final_report(self): """生成最终性能报告""" if not self.metrics_history: return # 计算统计信息 iterations = len(self.metrics_history) avg_loss = np.mean([m.loss for m in self.metrics_history]) avg_iteration_time = np.mean([m.iteration_time for m in self.metrics_history]) avg_memory = np.mean([m.memory_used for m in self.metrics_history]) avg_ai_core = np.mean([m.ai_core_utilization for m in self.metrics_history]) report = { "total_iterations": iterations, "average_loss": float(avg_loss), "average_iteration_time_seconds": float(avg_iteration_time), "average_memory_usage_gb": float(avg_memory / 1024**3), "average_ai_core_utilization_percent": float(avg_ai_core), "throughput_samples_per_second": iterations / sum([m.iteration_time for m in self.metrics_history]), "bottleneck_analysis": self._analyze_bottlenecks() } # 保存报告 with open("performance_report.json", "w") as f: json.dump(report, f, indent=2) print("=" * 60) print("性能报告已生成: performance_report.json") print("=" * 60) # 可视化保存 self.fig.savefig("performance_dashboard.png", dpi=150, bbox_inches='tight')
二、Dataset流水线深度优化
2.1 高级流水线优化技术
# dataset_optimizer.py import mindspore.dataset as ds import mindspore.dataset.vision as vision import mindspore.dataset.transforms as transforms from mindspore.dataset.vision import Inter import numpy as np from typing import Optional, List, Callable import multiprocessing class OptimizedDatasetPipeline: """高度优化的Dataset流水线""" def __init__(self, data_dir: str, batch_size: int = 32, image_size: tuple = (224, 224), num_parallel_workers: Optional[int] = None, device_target: str = "Ascend"): self.data_dir = data_dir self.batch_size = batch_size self.image_size = image_size self.device_target = device_target # 自动确定并行工作线程数 if num_parallel_workers is None: cpu_count = multiprocessing.cpu_count() if device_target == "Ascend": # 昇腾设备优化配置 self.num_parallel_workers = min(cpu_count // 2, 8) # 避免过度并行 else: self.num_parallel_workers = min(cpu_count, 16) else: self.num_parallel_workers = num_parallel_workers # 流水线阶段计时器 self.stage_times = {} def create_optimized_pipeline(self, is_training: bool = True, cache: bool = True, shard: bool = True) -> ds.Dataset: """ 创建优化后的数据流水线 优化策略: 1. 并行化所有可能阶段 2. 操作融合减少数据拷贝 3. 内存映射和缓存 4. 预取和流水线重叠 """ # 1. 创建基础数据集 if cache and shard: # 分布式缓存数据集 dataset = self._create_sharded_cached_dataset() else: dataset = ds.ImageFolderDataset( self.data_dir, num_parallel_workers=self.num_parallel_workers, shuffle=is_training, num_shards=self._get_num_shards() if shard else 1, shard_id=self._get_shard_id() if shard else 0 ) # 2. 数据增强管道(针对训练优化) if is_training: dataset = self._apply_training_augmentations(dataset) else: dataset = self._apply_validation_transforms(dataset) # 3. 批处理和优化 dataset = self._apply_batch_optimizations(dataset) # 4. 性能监控装饰器 dataset = self._add_performance_monitoring(dataset) return dataset def _create_sharded_cached_dataset(self) -> ds.Dataset: """创建分片缓存数据集 - 减少IO瓶颈""" try: # 使用MindRecord格式进行高效缓存 mindrecord_file = f"{self.data_dir}/cached.mindrecord" if not os.path.exists(mindrecord_file): # 转换为MindRecord格式 dataset = ds.ImageFolderDataset(self.data_dir) ds.MindDataset.convert_dataset( dataset, mindrecord_file, ["image", "label"] ) # 加载MindRecord(内存映射,高效) dataset = ds.MindDataset( mindrecord_file, num_parallel_workers=self.num_parallel_workers, shuffle=True, num_shards=self._get_num_shards(), shard_id=self._get_shard_id() ) print(f"使用MindRecord缓存: {mindrecord_file}") return dataset except Exception as e: print(f"MindRecord缓存失败,使用普通数据集: {e}") return ds.ImageFolderDataset( self.data_dir, num_parallel_workers=self.num_parallel_workers, shuffle=True, num_shards=self._get_num_shards(), shard_id=self._get_shard_id() ) def _apply_training_augmentations(self, dataset: ds.Dataset) -> ds.Dataset: """训练数据增强 - 昇腾优化版本""" # 定义融合操作(减少数据拷贝) fused_ops = [] # 阶段1: 解码和初始调整(融合操作) if self.device_target == "Ascend": # 昇腾优化解码和调整大小(硬件加速) fused_ops.append( vision.Decode(True) # 硬件加速解码 ) fused_ops.append( vision.RandomCropDecodeResize( self.image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333), interpolation=Inter.BICUBIC, max_attempts=10 ) ) else: # 通用版本 fused_ops.append(vision.Decode()) fused_ops.append( vision.RandomResizedCrop( self.image_size, scale=(0.08, 1.0), ratio=(0.75, 1.333), interpolation=Inter.BICUBIC ) ) # 阶段2: 颜色增强(随机顺序执行) color_ops = [ vision.RandomColorAdjust( brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1 ), vision.RandomSharpness(degrees=(0.1, 1.9)), vision.RandomAutoContrast() ] # 随机选择2个颜色增强操作 fused_ops.append( transforms.RandomApply(color_ops, prob=0.8) ) # 阶段3: 几何变换 fused_ops.append( vision.RandomHorizontalFlip(prob=0.5) ) # 阶段4: 归一化和格式转换 fused_ops.extend([ vision.Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], is_hwc=True ), vision.HWC2CHW() # 昇腾需要CHW格式 ]) # 应用所有操作,并行执行 dataset = dataset.map( operations=fused_ops, input_columns="image", num_parallel_workers=max(1, self.num_parallel_workers // 2), python_multiprocessing=True, # 使用多进程加速Python操作 max_rowsize=64 # 优化内存使用 ) return dataset def _apply_batch_optimizations(self, dataset: ds.Dataset) -> ds.Dataset: """批处理优化""" # 1. 批处理配置 per_batch_map = None if self.device_target == "Ascend": # 昇腾特定批处理优化 per_batch_map = self._ascend_per_batch_optimization dataset = dataset.batch( batch_size=self.batch_size, drop_remainder=True, # 丢弃不完整批次,提高性能 num_parallel_workers=max(1, self.num_parallel_workers // 4), per_batch_map=per_batch_map, input_columns=None, # 自动推断 output_columns=None, pad_info=None ) # 2. 预取优化 # 计算最优预取值:至少2个批次,最多内存允许 prefetch_size = self._calculate_optimal_prefetch() dataset = dataset.prefetch( buffer_size=prefetch_size, num_parallel_workers=1 # 预取使用单独线程 ) # 3. 数据下沉模式优化(昇腾特有) if self.device_target == "Ascend": dataset = dataset.device_que() # 设备队列模式 return dataset def _ascend_per_batch_optimization(self, images, labels): """昇腾设备批处理优化""" # 1. 批内归一化(利用昇腾向量指令) # 转换为numpy进行批处理 images_np = images.asnumpy() # 2. 批级数据增强(比单个样本更高效) if np.random.rand() > 0.5: # 批量随机擦除 images_np = self._batch_random_erasing(images_np) # 3. 内存布局优化(昇腾友好) # 确保数据在连续内存中 images_np = np.ascontiguousarray(images_np) # 4. 数据类型转换(如果需要) if images_np.dtype != np.float32: images_np = images_np.astype(np.float32) return ms.Tensor(images_np), labels def _calculate_optimal_prefetch(self) -> int: """计算最优预取大小""" try: # 根据可用内存计算 import psutil available_memory = psutil.virtual_memory().available # 估计一个批次的内存大小 batch_memory_estimate = self.batch_size * 3 * 224 * 224 * 4 # 假设RGB, float32 # 最多预取多少批次(不超过可用内存的50%) max_prefetch = int((available_memory * 0.5) // batch_memory_estimate) # 返回合理范围 return max(2, min(16, max_prefetch)) except: # 默认值 return 4 def _add_performance_monitoring(self, dataset: ds.Dataset) -> ds.Dataset: """添加性能监控""" # 创建性能回调 class PerformanceCallback: def __init__(self): self.start_time = None self.batch_times = [] self.stage_times = {} def __call__(self, get_next_info): current_time = time.time() if self.start_time is None: self.start_time = current_time # 记录批次时间 if hasattr(get_next_info, 'end_time'): batch_time = get_next_info.end_time - get_next_info.start_time self.batch_times.append(batch_time) # 每100批次输出统计 if len(self.batch_times) % 100 == 0 and len(self.batch_times) > 0: avg_time = np.mean(self.batch_times[-100:]) print(f"最近100批次平均时间: {avg_time*1000:.2f}ms") # 分析瓶颈 if avg_time > 0.1: # 如果批次时间超过100ms print("警告: 数据加载可能成为瓶颈") print("建议: 增加prefetch大小或使用缓存") # 添加回调(需要自定义dataset类) # 这里简化实现,实际需要继承Dataset类 return dataset def benchmark_pipeline(self, dataset: ds.Dataset, num_batches: int = 100): """流水线性能基准测试""" print("=" * 60) print("开始数据流水线基准测试") print("=" * 60) start_time = time.time() batch_times = [] # 预热 for i, batch in enumerate(dataset.create_dict_iterator()): if i >= 10: # 10个批次热身 break # 实际测试 test_start = time.time() for i, batch in enumerate(dataset.create_dict_iterator()): if i >= num_batches: break batch_time = time.time() - test_start if i == 0 else 0 if batch_time > 0: batch_times.append(batch_time) test_start = time.time() total_time = time.time() - start_time # 计算统计信息 stats = { "total_batches": len(batch_times), "total_time_seconds": total_time, "average_batch_time_ms": np.mean(batch_times) * 1000, "std_batch_time_ms": np.std(batch_times) * 1000, "min_batch_time_ms": np.min(batch_times) * 1000, "max_batch_time_ms": np.max(batch_times) * 1000, "throughput_samples_per_second": len(batch_times) * self.batch_size / total_time, "bottleneck_analysis": self._analyze_pipeline_bottlenecks(batch_times) } print("\n流水线性能报告:") for key, value in stats.items(): if isinstance(value, float): print(f" {key}: {value:.2f}") else: print(f" {key}: {value}") return stats def _analyze_pipeline_bottlenecks(self, batch_times): """分析流水线瓶颈""" avg_time = np.mean(batch_times) std_time = np.std(batch_times) bottlenecks = [] # 分析波动性 if std_time / avg_time > 0.5: # 波动超过50% bottlenecks.append("流水线不稳定,可能有IO瓶颈") # 分析绝对时间 if avg_time > 0.2: # 批次时间超过200ms bottlenecks.append("批次处理时间过长") if self.num_parallel_workers <= 2: bottlenecks.append("建议增加num_parallel_workers") return bottlenecks
2.2 智能数据预加载器
# smart_preloader.py import threading import queue from collections import deque import time class SmartDataPreloader: """智能数据预加载器 - 自适应预取""" def __init__(self, dataset: ds.Dataset, initial_prefetch: int = 4, max_prefetch: int = 16, adaptive: bool = True): self.dataset = dataset self.iterator = dataset.create_dict_iterator() self.adaptive = adaptive # 预取队列 self.prefetch_queue = queue.Queue(maxsize=max_prefetch) self.max_prefetch = max_prefetch self.current_prefetch = initial_prefetch # 性能跟踪 self.consumption_times = deque(maxlen=100) self.production_times = deque(maxlen=100) self.queue_sizes = deque(maxlen=100) # 控制线程 self.producer_thread = None self.is_running = False # 启动生产者线程 self.start() def start(self): """启动预加载""" self.is_running = True self.producer_thread = threading.Thread( target=self._producer_loop, daemon=True ) self.producer_thread.start() # 初始填充 self._fill_queue(self.current_prefetch) def _producer_loop(self): """生产者线程循环""" while self.is_running: try: start_time = time.time() # 生产一个批次 batch = next(self.iterator) production_time = time.time() - start_time # 放入队列(阻塞直到有空间) self.prefetch_queue.put(batch, timeout=1.0) # 记录生产时间 self.production_times.append(production_time) self.queue_sizes.append(self.prefetch_queue.qsize()) # 自适应调整预取大小 if self.adaptive: self._adaptive_adjust() except StopIteration: # 数据集结束 break except queue.Full: # 队列满,稍微等待 time.sleep(0.001) except Exception as e: print(f"预加载生产者错误: {e}") time.sleep(0.1) def __iter__(self): """迭代器接口""" return self def __next__(self): """获取下一个批次""" if not self.is_running: self.start() start_time = time.time() try: # 从队列获取批次(阻塞直到有数据) batch = self.prefetch_queue.get(timeout=30.0) # 30秒超时 consumption_time = time.time() - start_time # 记录消费时间 self.consumption_times.append(consumption_time) # 触发队列补充 self._trigger_refill() return batch except queue.Empty: # 检查生产者是否还在运行 if not self.producer_thread.is_alive(): raise StopIteration else: # 暂时没有数据,可能是生产太慢 print("警告: 数据预取队列为空,可能生产速度跟不上") time.sleep(0.01) return self.__next__() def _adaptive_adjust(self): """自适应调整预取大小""" if len(self.consumption_times) < 10 or len(self.production_times) < 10: return avg_consume = np.mean(self.consumption_times) avg_produce = np.mean(self.production_times) current_qsize = self.prefetch_queue.qsize() # 如果消费快于生产,且队列快空了,增加预取 if avg_consume < avg_produce and current_qsize < 2: self.current_prefetch = min( self.max_prefetch, int(self.current_prefetch * 1.5) ) print(f"增加预取大小到: {self.current_prefetch}") # 如果队列总是满的,减少预取节省内存 elif current_qsize > self.max_prefetch * 0.8: self.current_prefetch = max( 2, int(self.current_prefetch * 0.8) ) print(f"减少预取大小到: {self.current_prefetch}") def _fill_queue(self, num_batches: int): """填充队列到指定大小""" current_size = self.prefetch_queue.qsize() to_add = max(0, num_batches - current_size) for _ in range(to_add): if self.producer_thread.is_alive(): # 生产者线程会自动填充 pass else: # 直接填充 try: batch = next(self.iterator) self.prefetch_queue.put(batch, block=False) except (StopIteration, queue.Full): break def _trigger_refill(self): """触发队列补充""" # 如果队列大小低于阈值,通知生产者 if self.prefetch_queue.qsize() < self.current_prefetch // 2: # 这里可以发送信号给生产者线程 pass def stop(self): """停止预加载""" self.is_running = False if self.producer_thread: self.producer_thread.join(timeout=5.0) def get_stats(self): """获取统计信息""" return { "queue_size": self.prefetch_queue.qsize(), "prefetch_size": self.current_prefetch, "avg_consumption_time": np.mean(self.consumption_times) if self.consumption_times else 0, "avg_production_time": np.mean(self.production_times) if self.production_times else 0, "throughput_estimate": self._estimate_throughput() } def _estimate_throughput(self): """估计吞吐量""" if not self.consumption_times: return 0 avg_consume = np.mean(self.consumption_times) if avg_consume > 0: return 1.0 / avg_consume return 0