news 2026/5/31 3:28:51

Day27 机器学习流水线

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Day27 机器学习流水线

@浙大疏锦行

作业:尝试制作出机器学习通用的pipeline

import pandas as pd import numpy as np import time import warnings import matplotlib.pyplot as plt import seaborn as sns from typing import Dict, List, Union, Optional, Tuple from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.preprocessing import ( OrdinalEncoder, OneHotEncoder, StandardScaler, MinMaxScaler, RobustScaler ) from sklearn.impute import SimpleImputer from sklearn.model_selection import train_test_split from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin from sklearn.metrics import ( # 分类指标 accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix, # 回归指标 mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_error ) # 全局配置 warnings.filterwarnings("ignore") plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class MLGeneralPipeline: """通用机器学习Pipeline类""" def __init__( self, model: Union[ClassifierMixin, RegressorMixin], task_type: str = "classification", # "classification" 或 "regression" ordinal_features: Optional[List[str]] = None, ordinal_categories: Optional[List[List[str]]] = None, nominal_features: Optional[List[str]] = None, continuous_features: Optional[List[str]] = None, missing_strategy: Dict[str, str] = None, scaler_type: str = "standard", # "standard"/"minmax"/"robust"/None handle_unknown: str = "ignore" ): """ 初始化通用Pipeline 参数说明: -------- model: 机器学习模型实例(如RandomForestClassifier(random_state=42)) task_type: 任务类型,"classification"(分类)或 "regression"(回归) ordinal_features: 有序分类特征列表(如['Home Ownership', 'Term']) ordinal_categories: 有序特征的类别顺序(与ordinal_features一一对应) nominal_features: 无序分类特征列表(如['Purpose']) continuous_features: 连续特征列表(若为None则自动从数据中推导) missing_strategy: 缺失值填充策略,格式:{'ordinal': 'most_frequent', 'nominal': 'most_frequent', 'continuous': 'mean'} scaler_type: 连续特征缩放类型,None则不缩放 handle_unknown: 未知类别处理策略("ignore"/"use_encoded_value") """ # 初始化默认参数 self.model = model self.task_type = task_type.lower() self.ordinal_features = ordinal_features or [] self.ordinal_categories = ordinal_categories or [] self.nominal_features = nominal_features or [] self.continuous_features = continuous_features self.handle_unknown = handle_unknown # 缺失值填充策略默认值 self.missing_strategy = missing_strategy or { 'ordinal': 'most_frequent', 'nominal': 'most_frequent', 'continuous': 'mean' } # 缩放器映射 self.scaler_map = { "standard": StandardScaler(), "minmax": MinMaxScaler(), "robust": RobustScaler(), None: "passthrough" # 不缩放 } self.scaler = self.scaler_map[scaler_type] # 存储预处理和完整Pipeline self.preprocessor = None self.pipeline = None # 验证任务类型 if self.task_type not in ["classification", "regression"]: raise ValueError("task_type必须是 'classification' 或 'regression'") def _build_preprocessor(self, X: pd.DataFrame) -> ColumnTransformer: """构建特征预处理管道(核心)""" transformers = [] # 1. 有序分类特征预处理 if self.ordinal_features: ordinal_transformer = Pipeline(steps=[ ("imputer", SimpleImputer(strategy=self.missing_strategy['ordinal'])), ("encoder", OrdinalEncoder( categories=self.ordinal_categories or 'auto', handle_unknown='use_encoded_value' if self.handle_unknown == 'ignore' else self.handle_unknown, unknown_value=-1 )) ]) transformers.append(("ordinal", ordinal_transformer, self.ordinal_features)) # 2. 无序分类特征预处理 if self.nominal_features: nominal_transformer = Pipeline(steps=[ ("imputer", SimpleImputer(strategy=self.missing_strategy['nominal'])), ("onehot", OneHotEncoder( handle_unknown=self.handle_unknown, sparse_output=False )) ]) transformers.append(("nominal", nominal_transformer, self.nominal_features)) # 3. 连续特征预处理(自动推导或手动指定) if not self.continuous_features: # 自动识别:排除分类特征后的数值型列 all_categorical = self.ordinal_features + self.nominal_features self.continuous_features = X.select_dtypes(include=['int64', 'float64']).columns.difference(all_categorical).tolist() if self.continuous_features: continuous_steps = [("imputer", SimpleImputer(strategy=self.missing_strategy['continuous']))] if self.scaler != "passthrough": continuous_steps.append(("scaler", self.scaler)) continuous_transformer = Pipeline(steps=continuous_steps) transformers.append(("continuous", continuous_transformer, self.continuous_features)) # 构建ColumnTransformer(保留未指定的列,或丢弃) self.preprocessor = ColumnTransformer( transformers=transformers, remainder="passthrough" # 保留未处理的列,若要丢弃则设为"drop" ) return self.preprocessor def build_pipeline(self, X: pd.DataFrame) -> Pipeline: """构建完整的机器学习Pipeline(预处理 + 模型)""" # 构建预处理管道 self._build_preprocessor(X) # 构建完整Pipeline self.pipeline = Pipeline(steps=[ ("preprocessor", self.preprocessor), ("model", self.model) ]) return self.pipeline def train_evaluate( self, data: pd.DataFrame, target_col: str, test_size: float = 0.2, random_state: int = 42, verbose: bool = True ) -> Dict[str, Union[float, np.ndarray, str]]: """ 完整的训练+评估流程 参数: ---- data: 原始数据集(DataFrame) target_col: 目标列名 test_size: 测试集比例 random_state: 随机种子 verbose: 是否打印详细结果 返回: ---- 评估结果字典 """ # 1. 分离特征和标签 X = data.drop(columns=[target_col]) y = data[target_col] # 2. 划分训练集/测试集(预处理前划分,避免数据泄露) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, random_state=random_state, stratify=y if self.task_type == "classification" else None # 分类任务分层抽样 ) # 3. 构建并训练Pipeline self.build_pipeline(X) start_time = time.time() self.pipeline.fit(X_train, y_train) train_time = time.time() - start_time # 4. 预测 y_pred = self.pipeline.predict(X_test) # 5. 评估 metrics = self._calculate_metrics(y_test, y_pred, verbose) metrics["train_time"] = train_time metrics["y_test"] = y_test metrics["y_pred"] = y_pred return metrics def _calculate_metrics( self, y_true: Union[pd.Series, np.ndarray], y_pred: Union[pd.Series, np.ndarray], verbose: bool = True ) -> Dict[str, Union[float, str, np.ndarray]]: """计算评估指标(适配分类/回归)""" metrics = {} if self.task_type == "classification": # 分类指标 metrics["accuracy"] = accuracy_score(y_true, y_pred) metrics["precision"] = precision_score(y_true, y_pred, average='weighted') metrics["recall"] = recall_score(y_true, y_pred, average='weighted') metrics["f1_score"] = f1_score(y_true, y_pred, average='weighted') metrics["confusion_matrix"] = confusion_matrix(y_true, y_pred) metrics["classification_report"] = classification_report(y_true, y_pred) if verbose: print("="*50) print(f"【分类任务评估结果】") print(f"训练耗时: {metrics['train_time']:.4f} 秒") print(f"准确率(Accuracy): {metrics['accuracy']:.4f}") print(f"精准率(Precision): {metrics['precision']:.4f}") print(f"召回率(Recall): {metrics['recall']:.4f}") print(f"F1分数: {metrics['f1_score']:.4f}") print("\n混淆矩阵:") print(metrics["confusion_matrix"]) print("\n分类报告:") print(metrics["classification_report"]) else: # 回归指标 metrics["mae"] = mean_absolute_error(y_true, y_pred) metrics["mse"] = mean_squared_error(y_true, y_pred) metrics["rmse"] = np.sqrt(metrics["mse"]) metrics["r2"] = r2_score(y_true, y_pred) metrics["mape"] = mean_absolute_percentage_error(y_true, y_pred) if verbose: print("="*50) print(f"【回归任务评估结果】") print(f"训练耗时: {metrics['train_time']:.4f} 秒") print(f"平均绝对误差(MAE): {metrics['mae']:.4f}") print(f"均方误差(MSE): {metrics['mse']:.4f}") print(f"均方根误差(RMSE): {metrics['rmse']:.4f}") print(f"决定系数(R²): {metrics['r2']:.4f}") print(f"平均绝对百分比误差(MAPE): {metrics['mape']:.4f}") return metrics def predict(self, X_new: pd.DataFrame) -> np.ndarray: """对新数据预测(需先训练)""" if not self.pipeline: raise RuntimeError("请先调用 train_evaluate 训练Pipeline") return self.pipeline.predict(X_new) def get_feature_names_out(self) -> List[str]: """获取预处理后的特征名(方便后续分析)""" if not self.preprocessor: raise RuntimeError("请先构建preprocessor") return self.preprocessor.get_feature_names_out().tolist() # ====================== 通用Pipeline使用示例 ====================== if __name__ == "__main__": # 1. 加载数据(替换为你的数据路径) data = pd.read_csv("data.csv") # ------------------- 示例1:分类任务(复现原代码逻辑) ------------------- print("【示例1:分类任务 - 随机森林】") # 导入分类模型 from sklearn.ensemble import RandomForestClassifier # 定义特征配置(与原代码一致) ordinal_features = ['Home Ownership', 'Years in current job', 'Term'] ordinal_categories = [ ['Own Home', 'Rent', 'Have Mortgage', 'Home Mortgage'], ['< 1 year', '1 year', '2 years', '3 years', '4 years', '5 years', '6 years', '7 years', '8 years', '9 years', '10+ years'], ['Short Term', 'Long Term'] ] nominal_features = ['Purpose'] # 初始化通用Pipeline cls_pipeline = MLGeneralPipeline( model=RandomForestClassifier(random_state=42), task_type="classification", ordinal_features=ordinal_features, ordinal_categories=ordinal_categories, nominal_features=nominal_features, missing_strategy={ 'ordinal': 'most_frequent', 'nominal': 'most_frequent', 'continuous': 'most_frequent' # 复现原代码的众数填充 }, scaler_type="standard" ) # 训练+评估 cls_metrics = cls_pipeline.train_evaluate( data=data, target_col="Credit Default", test_size=0.2, random_state=42 ) # ------------------- 示例2:回归任务(扩展用法) ------------------- print("\n【示例2:回归任务 - 随机森林回归】") # 模拟回归数据(替换为你的回归数据集) from sklearn.ensemble import RandomForestRegressor from sklearn.datasets import load_diabetes diabetes = load_diabetes() reg_data = pd.DataFrame(diabetes.data, columns=diabetes.feature_names) reg_data["target"] = diabetes.target # 初始化回归Pipeline(无分类特征,仅连续特征) reg_pipeline = MLGeneralPipeline( model=RandomForestRegressor(random_state=42), task_type="regression", scaler_type="standard", missing_strategy={'continuous': 'mean'} ) # 训练+评估 reg_metrics = reg_pipeline.train_evaluate( data=reg_data, target_col="target", test_size=0.2, random_state=42 ) # ------------------- 示例3:替换模型(逻辑回归) ------------------- print("\n【示例3:替换模型 - 逻辑回归】") from sklearn.linear_model import LogisticRegression lr_pipeline = MLGeneralPipeline( model=LogisticRegression(random_state=42, max_iter=1000), task_type="classification", ordinal_features=ordinal_features, ordinal_categories=ordinal_categories, nominal_features=nominal_features, scaler_type="standard" ) lr_metrics = lr_pipeline.train_evaluate( data=data, target_col="Credit Default", test_size=0.2, random_state=42 )
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/30 17:22:52

L298N电机驱动原理图与单片机接口设计实战案例

从零构建电机控制系统&#xff1a;L298N驱动原理与实战设计全解析你有没有遇到过这样的场景&#xff1f;单片机程序写得完美无缺&#xff0c;逻辑清晰、延时精准&#xff0c;结果一接上电机——小车原地“抽搐”&#xff0c;芯片发烫冒烟&#xff0c;甚至单片机莫名其妙重启。问…

作者头像 李华
网站建设 2026/5/28 18:30:00

科研党必备工具:Fun-ASR助力学术会议录音自动整理笔记

科研党必备工具&#xff1a;Fun-ASR助力学术会议录音自动整理笔记 在一次长达三小时的国际学术研讨会结束后&#xff0c;你面对的是手机里12段零散录音、几位专家夹杂中英文术语的发言&#xff0c;以及一份空白的笔记文档。手动回听、逐字记录&#xff1f;这不仅耗时数小时&…

作者头像 李华
网站建设 2026/5/28 23:19:09

requirements.txt依赖列表说明:各库版本要求

Fun-ASR依赖库深度解析&#xff1a;从requirements.txt看现代语音识别系统的构建逻辑 在智能会议、远程办公和语音助手日益普及的今天&#xff0c;一个看似简单的“语音转文字”功能背后&#xff0c;往往隐藏着复杂的工程架构。当你打开 Fun-ASR 的 WebUI 界面&#xff0c;点击…

作者头像 李华
网站建设 2026/5/28 18:30:10

一人一句对话场景识别准确率已达70%

一人一句对话场景识别准确率已达70% 在企业会议结束后的工位上&#xff0c;你是否曾面对一段长达一小时的录音发愁&#xff1f;听着模糊的发言、夹杂着专业术语和数字表达&#xff0c;手动整理纪要不仅耗时费力&#xff0c;还容易遗漏关键信息。更不用说那些频繁出现的产品代号…

作者头像 李华
网站建设 2026/5/28 18:30:07

HTTPS加密传输支持:保护敏感语音数据

HTTPS加密传输支持&#xff1a;保护敏感语音数据 在企业级语音识别系统日益普及的今天&#xff0c;一个看似简单的问题却可能引发严重后果&#xff1a;当员工通过浏览器上传一段包含客户身份证号、银行账户或商业谈判细节的会议录音时&#xff0c;这段音频是否会在传输过程中被…

作者头像 李华
网站建设 2026/5/28 20:50:42

航天领域应用探索:火箭发射倒计时语音识别

航天领域应用探索&#xff1a;火箭发射倒计时语音识别 在酒泉卫星发射中心的指挥大厅里&#xff0c;每一秒都牵动人心。当倒计时进入最后十分钟&#xff0c;“推进剂加注完成”、“塔架解锁”、“T-10秒”等关键口令通过广播系统依次响起——这些声音不仅是任务节奏的节拍器&am…

作者头像 李华