目录
一、高级数据加载与预处理
1.1 高效读取大文件
1.2 处理缺失值的进阶技巧
二、高效数据转换与计算
2.1 向量化操作与性能优化
2.2 高级分组与聚合
三、时间序列数据处理
3.1 高级时间序列操作
一、高级数据加载与预处理
1.1 高效读取大文件
python
import pandas as pd import numpy as np from pathlib import Path # 分块读取大文件 def read_large_file_in_chunks(filepath, chunk_size=10000): """分块读取大文件,节省内存""" chunks = [] for chunk in pd.read_csv(filepath, chunksize=chunk_size, low_memory=False): # 在读取时进行初步处理 chunk = chunk.dropna(subset=['important_column']) chunks.append(chunk) return pd.concat(chunks, ignore_index=True) # 指定数据类型以减少内存使用 dtype_mapping = { 'user_id': 'int32', 'price': 'float32', 'category': 'category', 'date': 'str' } df = pd.read_csv('large_data.csv', dtype=dtype_mapping, parse_dates=['date'])1.2 处理缺失值的进阶技巧
python
# 创建示例数据 df = pd.DataFrame({ 'A': [1, 2, np.nan, 4, 5], 'B': [np.nan, 2, 3, np.nan, 5], 'C': [1, np.nan, np.nan, np.nan, 5], 'D': ['a', 'b', None, 'd', 'e'] }) # 1. 基于统计的填充 df['A_filled'] = df['A'].fillna(df['A'].mean()) df['B_filled'] = df['B'].fillna(df['B'].median()) # 2. 前向/后向填充(时间序列) df['C_ffill'] = df['C'].ffill() # 前向填充 df['C_bfill'] = df['C'].bfill() # 后向填充 # 3. 插值法填充 df['A_interpolated'] = df['A'].interpolate(method='linear') # 4. 使用模型预测填充(简单示例) from sklearn.ensemble import RandomForestRegressor def model_based_imputation(df, target_col): """使用随机森林预测缺失值""" # 分离有值和无值的数据 df_train = df[df[target_col].notna()] df_missing = df[df[target_col].isna()] if len(df_missing) == 0: return df # 准备特征(排除目标列和其他高缺失率列) features = [col for col in df.columns if col != target_col and df[col].notna().sum() > len(df)*0.7] X_train = df_train[features] y_train = df_train[target_col] X_missing = df_missing[features] # 训练模型并预测 model = RandomForestRegressor(n_estimators=100, random_state=42) model.fit(X_train, y_train) predictions = model.predict(X_missing) # 填充预测值 df.loc[df[target_col].isna(), target_col] = predictions return df # 5. 创建缺失值指示器 df['A_is_missing'] = df['A'].isna().astype(int)二、高效数据转换与计算
2.1 向量化操作与性能优化
python
import pandas as pd import numpy as np from numba import jit import swifter # pip install swifter # 创建测试数据 np.random.seed(42) df = pd.DataFrame({ 'x': np.random.randn(1000000), 'y': np.random.randn(1000000), 'category': np.random.choice(['A', 'B', 'C'], 1000000) }) # 1. 避免循环,使用向量化操作 # ❌ 慢:使用循环 def slow_calculation(df): result = [] for i in range(len(df)): result.append(df['x'].iloc[i] ** 2 + df['y'].iloc[i] ** 2) return result # ✅ 快:使用向量化 df['vectorized'] = df['x'] ** 2 + df['y'] ** 2 # 2. 使用Numba加速(针对复杂计算) @jit(nopython=True) def numba_calc(x, y): n = len(x) result = np.zeros(n) for i in range(n): result[i] = np.sqrt(x[i]**2 + y[i]**2) return result df['numba_calc'] = numba_calc(df['x'].values, df['y'].values) # 3. 使用swifter自动选择最佳计算方式 df['swifter_calc'] = df.swifter.apply(lambda row: row['x'] * row['y'], axis=1) # 4. 内存优化技巧 def optimize_memory(df): """优化DataFrame内存使用""" start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) else: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) else: # 对象类型转换为分类数据 unique_count = df[col].nunique() total_count = len(df[col]) if unique_count / total_count < 0.5: df[col] = df[col].astype('category') end_mem = df.memory_usage().sum() / 1024**2 print(f'内存使用减少: {100 * (start_mem - end_mem) / start_mem:.1f}%') return df df = optimize_memory(df)2.2 高级分组与聚合
python
# 创建示例数据 sales_data = pd.DataFrame({ 'date': pd.date_range('2023-01-01', '2023-12-31', freq='D'), 'region': np.random.choice(['North', 'South', 'East', 'West'], 365), 'product': np.random.choice(['A', 'B', 'C', 'D'], 365), 'sales': np.random.randint(100, 1000, 365), 'customers': np.random.randint(10, 100, 365) }) # 1. 多级分组聚合 grouped = sales_data.groupby(['region', 'product']).agg({ 'sales': ['sum', 'mean', 'std', 'count'], 'customers': ['sum', lambda x: x.quantile(0.8)] # 自定义聚合 }) # 重命名列 grouped.columns = ['_'.join(col).strip() for col in grouped.columns.values] # 2. 使用transform进行组内标准化 sales_data['sales_normalized'] = sales_data.groupby('region')['sales'].transform( lambda x: (x - x.mean()) / x.std() ) # 3. 使用filter筛选分组 high_sales_regions = sales_data.groupby('region').filter( lambda x: x['sales'].sum() > 100000 ) # 4. 分组应用复杂函数 def calculate_metrics(group): """计算分组的多维度指标""" result = pd.Series({ 'total_sales': group['sales'].sum(), 'avg_sale_per_customer': group['sales'].sum() / group['customers'].sum(), 'peak_day': group.loc[group['sales'].idxmax(), 'date'], 'sales_growth': (group['sales'].iloc[-1] - group['sales'].iloc[0]) / group['sales'].iloc[0] if len(group) > 1 else 0, 'unique_products': group['product'].nunique() }) return result region_metrics = sales_data.groupby('region').apply(calculate_metrics) # 5. 滚动窗口分组计算 sales_data['rolling_7d_sales'] = sales_data.groupby('region')['sales'].transform( lambda x: x.rolling(window=7, min_periods=1).mean() )三、时间序列数据处理
3.1 高级时间序列操作
python
# 创建时间序列数据 date_rng = pd.date_range('2023-01-01', '2023-12-31', freq='H') ts_data = pd.DataFrame(date_rng, columns=['timestamp']) ts_data['value'] = np.random.randn(len(date_rng)) * 10 + 50 ts_data['category'] = np.random.choice(['A', 'B', 'C'], len(date_rng)) # 设置为索引 ts_data = ts_data.set_index('timestamp') # 1. 重采样与降采样 # 按天重采样,计算每天的平均值 daily_data = ts_data['value'].resample('D').agg(['mean', 'min', 'max', 'std']) # 按周重采样,计算每周总和 weekly_data = ts_data['value'].resample('W-MON').sum() # 2. 滚动窗口统计 ts_data['7d_rolling_mean'] = ts_data['value'].rolling(window=7*24).mean() # 7天滚动平均 ts_data['24h_rolling_std'] = ts_data['value'].rolling(window=24).std() # 24小时滚动标准差 # 3. 扩展窗口(累计)统计 ts_data['expanding_mean'] = ts_data['value'].expanding().mean() ts_data['expanding_max'] = ts_data['value'].expanding().max() # 4. 时间序列分解(趋势、季节性、残差) from statsmodels.tsa.seasonal import seasonal_decompose # 需要按天或按月的数据 daily_series = ts_data['value'].resample('D').mean() decomposition = seasonal_decompose(daily_series.dropna(), model='additive', period=30) # 5. 时间序列特征工程 def create_time_features(df, datetime_index): """创建时间相关特征""" df = df.copy() df['hour'] = datetime_index.hour df['dayofweek'] = datetime_index.dayofweek df['quarter'] = datetime_index.quarter df['month'] = datetime_index.month df['year'] = datetime_index.year df['dayofyear'] = datetime_index.dayofyear df['weekofyear'] = datetime_index.isocalendar().week # 是否是周末 df['is_weekend'] = datetime_index.dayofweek >= 5 # 时间周期特征 df['sin_hour'] = np.sin(2 * np.pi * df['hour']/24) df['cos_hour'] = np.cos(2 * np.pi * df['hour']/24) return df ts_data = create_time_features(ts_data, ts_data.index) # 6. 滞后特征(lag features) for lag in [1, 2, 3, 7, 30]: ts_data[f'lag_{lag}_hour'] = ts_data['value'].shift(lag) # 7. 滑动窗口统计特征 ts_data['rolling_mean_6h'] = ts_data['value'].rolling('6h').mean() ts_data['rolling_std_12h'] = ts_data['value'].rolling('12h').std() ts_data['rolling_max_24h'] = ts_data['value'].rolling('24h').max()最佳实践:
1、需要根据数据规模和使用对象选择合适的写出格式,小规模、通用性交换可选 CSV,大规模分析型数据应使用 Parquet,而涉及人工核对时才使用 Excel。
2、在写出前应明确数据的使用场景,并对关键字段、异常值和重复记录进行必要检查,避免将问题数据固化到生产系统中。同时,要特别注意索引的持久化、时间索引的时区信息保留以及缺失值的统一表示方式,确保数据在不同环境和系统中读取结果一致。
3、通过合理的命名、版本控制和元信息记录,保证数据结果可追溯、可复现,从而满足生产环境对数据“可上线、可维护、可回滚”的基本要求。