深度学习:卷积神经网络微小特征提取与逻辑回归风控建模
卷积神经网络在计算机视觉领域的成功为金融风控领域带来了新的可能性。用户上传的身份证照片、营业执照扫描件、财务报表截图等图像数据中蕴含着丰富的风险信号,CNN可以有效提取这些图像中的微小特征,为风控决策提供输入。与此同时,逻辑回归作为风控领域的经典模型,以其高可解释性和成熟的监管合规性,仍然是风险评分卡的核心算法。
本文将探讨如何将CNN提取的图像微小特征与逻辑回归风控模型有机结合,重点分析CNN卷积核尺寸设计对特征提取质量的影响,以及逻辑回归在风控场景中的建模细节。
一、CNN与逻辑回归的融合架构
在风控场景中,CNN负责从图像数据中提取结构化特征,逻辑回归则基于这些特征和传统的结构化字段进行风险预测。这种融合架构既利用了深度学习在图像理解方面的优势,又保留了逻辑回归在可解释性方面的优点。
融合架构的关键在于CNN输出的特征向量需要与逻辑回归的输入层对齐。CNN的最后一层全连接层输出的特征向量,与用户的基础信息特征拼接后,输入到逻辑回归分类器中进行最终的风险评估。
import numpy as np import pandas as pd import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from sklearn.linear_model import LogisticRegression from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.model_selection import train_test_split, cross_val_score from sklearn.metrics import roc_auc_score, roc_curve, classification_report from sklearn.metrics import confusion_matrix, precision_recall_curve, auc import warnings warnings.filterwarnings('ignore') np.random.seed(42) torch.manual_seed(42) class CNNFeatureExtractor(nn.Module): def __init__(self, in_channels=3, feature_dim=128): super().__init__() self.features = nn.Sequential( nn.Conv2d(in_channels, 32, kernel_size=3, padding=1), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(2), nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.MaxPool2d(2), nn.Conv2d(64, 128, kernel_size=5, padding=2), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(2), nn.Conv2d(128, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.MaxPool2d(2), ) self.avg_pool = nn.AdaptiveAvgPool2d(1) self.fc = nn.Linear(256, feature_dim) def forward(self, x): x = self.features(x) x = self.avg_pool(x) x = x.view(x.size(0), -1) x = self.fc(x) return x cnn_extractor = CNNFeatureExtractor(in_channels=3, feature_dim=128) dummy_image = torch.randn(8, 3, 128, 128) cnn_features = cnn_extractor(dummy_image) print(f"CNN特征提取器输入: {dummy_image.shape}") print(f"CNN特征提取器输出: {cnn_features.shape}") print(f"CNN特征提取器参数量: {sum(p.numel() for p in cnn_extractor.parameters()):,}")二、逻辑回归的数学原理与风控适配
逻辑回归通过Sigmoid函数将线性回归的输出映射到[0,1]区间,输出值可以解释为样本属于正类的概率。对于风控场景,这个概率值即为用户的违约概率。
逻辑回归的数学表达式为:
P(y=1|x) = 1 / (1 + exp(-(w^T * x + b)))其中w是权重向量,b是偏置项,x是输入特征向量。模型通过最大化对数似然函数来学习参数。
在风控领域,逻辑回归的权重系数可以转化为评分卡中的分数。通过将每个特征的不同取值映射到对应的分数,最终累加得到用户的总评分。
def sigmoid(z): return 1.0 / (1.0 + np.exp(-np.clip(z, -100, 100))) class LogisticRegressionRiskModel: def __init__(self, learning_rate=0.01, max_iter=1000, C=1.0, penalty='l2'): self.learning_rate = learning_rate self.max_iter = max_iter self.C = C self.penalty = penalty self.weights = None self.bias = None self.loss_history = [] def fit(self, X, y): n_samples, n_features = X.shape self.weights = np.zeros(n_features) self.bias = 0 for iteration in range(self.max_iter): linear_model = np.dot(X, self.weights) + self.bias y_predicted = sigmoid(linear_model) dw = (1 / n_samples) * np.dot(X.T, (y_predicted - y)) db = (1 / n_samples) * np.sum(y_predicted - y) if self.penalty == 'l2': dw += (1 / self.C) * self.weights / n_samples self.weights -= self.learning_rate * dw self.bias -= self.learning_rate * db loss = -np.mean(y * np.log(y_predicted + 1e-8) + (1 - y) * np.log(1 - y_predicted + 1e-8)) l2_reg = 0.5 * np.sum(self.weights ** 2) / (self.C * n_samples) if self.penalty == 'l2' else 0 loss += l2_reg self.loss_history.append(loss) if iteration % 100 == 0: print(f"Iteration {iteration}: Loss = {loss:.6f}") return self def predict_proba(self, X): linear_model = np.dot(X, self.weights) + self.bias return sigmoid(linear_model) def predict(self, X, threshold=0.5): probas = self.predict_proba(X) return (probas >= threshold).astype(int) def score_to_credit(self, base_score=600, pdo=50, odds=50): factor = pdo / np.log(2) offset = base_score - factor * np.log(odds) scores = [] for w in self.weights: score = offset - factor * w scores.append(score) return np.array(scores) def feature_importance(self, feature_names): importance = np.abs(self.weights) indices = np.argsort(importance)[::-1] result = pd.DataFrame({ '特征名称': [feature_names[i] for i in indices], '权重系数': [self.weights[i] for i in indices], '绝对重要性': [importance[i] for i in indices] }) return result def generate_risk_data(n_samples=5000): np.random.seed(42) data = { 'age': np.random.randint(22, 65, n_samples), 'income': np.random.lognormal(mean=10, sigma=0.6, size=n_samples), 'credit_history_length': np.random.randint(0, 20, n_samples), 'existing_loans': np.random.randint(0, 8, n_samples), 'debt_ratio': np.random.beta(2, 5, n_samples), 'late_payment_ratio': np.random.beta(1, 10, n_samples), 'credit_card_utilization': np.random.beta(2, 3, n_samples), 'inquiry_count_6m': np.random.poisson(2, n_samples), 'income_to_debt_ratio': np.random.lognormal(mean=1, sigma=0.5, size=n_samples), 'employment_length': np.random.randint(0, 30, n_samples), 'residential_stability': np.random.randint(1, 10, n_samples), 'total_assets': np.random.lognormal(mean=12, sigma=0.8, size=n_samples), 'mobile_usage_years': np.random.randint(0, 15, n_samples), 'social_network_score': np.random.uniform(0, 100, n_samples), 'education_level': np.random.randint(1, 6, n_samples) } df = pd.DataFrame(data) risk_score = ( -0.15 * np.log(df['income'] + 1) + 0.20 * df['debt_ratio'] + 0.25 * df['late_payment_ratio'] + 0.15 * df['credit_card_utilization'] + 0.10 * np.log(df['inquiry_count_6m'] + 1) + -0.10 * df['credit_history_length'] / 20 + -0.05 * df['age'] / 65 + -0.10 * np.log(df['total_assets'] + 1) + 0.05 * df['existing_loans'] / 8 + -0.05 * df['residential_stability'] / 10 + -0.03 * df['employment_length'] / 30 + -0.03 * df['education_level'] / 5 + np.random.normal(0, 0.15, n_samples) ) proba = 1 / (1 + np.exp(-risk_score)) labels = (proba > 0.5).astype(int) bad_rate = labels.mean() return df, labels, proba df_risk, y_risk, proba_risk = generate_risk_data(5000) print(f"风控数据样本量: {len(df_risk)}") print(f"坏样本率: {y_risk.mean():.2%}") print(f"特征维度: {df_risk.shape[1]}") feature_names = list(df_risk.columns) X_risk = df_risk.values scaler_risk = StandardScaler() X_risk_scaled = scaler_risk.fit_transform(X_risk) X_train_r, X_test_r, y_train_r, y_test_r = train_test_split( X_risk_scaled, y_risk, test_size=0.25, random_state=42, stratify=y_risk )三、CNN卷积核尺寸对风控特征提取的影响
在风控场景中,CNN处理的图像通常包括身份证照片、手持照片、营业执照、银行流水截图等。这些图像中的微小特征可能包括:身份证上的防伪标记、照片中的细微篡改痕迹、营业执照上的印章细节等。
卷积核尺寸的选择直接影响这些微小特征的提取效果。针对风控图像的特点,我们设计了不同卷积核尺寸的CNN模型进行对比。
class RiskCNNFeatureExtractor(nn.Module): def __init__(self, in_channels=3, feature_dim=64, kernel_config='mixed'): super().__init__() self.kernel_config = kernel_config if kernel_config == 'small': self.backbone = nn.Sequential( nn.Conv2d(in_channels, 32, 3, padding=1), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(32, 64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(64, 128, 3, padding=1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2), ) elif kernel_config == 'medium': self.backbone = nn.Sequential( nn.Conv2d(in_channels, 32, 5, padding=2), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(32, 64, 5, padding=2), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(64, 128, 5, padding=2), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2), ) elif kernel_config == 'large': self.backbone = nn.Sequential( nn.Conv2d(in_channels, 32, 7, padding=3), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(32, 64, 7, padding=3), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2), ) elif kernel_config == 'mixed': self.backbone = nn.Sequential( nn.Conv2d(in_channels, 32, 3, padding=1), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(32, 64, 5, padding=2), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(64, 128, 3, padding=1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2), ) self.avg_pool = nn.AdaptiveAvgPool2d(1) self.fc = nn.Linear(128, feature_dim) def forward(self, x): x = self.backbone(x) x = self.avg_pool(x) x = x.view(x.size(0), -1) x = self.fc(x) return x for config in ['small', 'medium', 'large', 'mixed']: model = RiskCNNFeatureExtractor(in_channels=3, feature_dim=64, kernel_config=config) params = sum(p.numel() for p in model.parameters()) print(f"卷积核配置 {config}: 参数量 {params:,}")四、逻辑回归的特征工程与分箱处理
在风控建模中,连续特征通常需要经过分箱处理后再进行WOE编码。分箱的目的是捕捉特征与目标变量之间的非线性关系,同时减少异常值的影响。
WOE编码的计算公式为:
WOE_i = ln(Distr_Good_i / Distr_Bad_i)其中Distr_Good_i是第i个分箱中好样本的比例,Distr_Bad_i是第i个分箱中坏样本的比例。
IV值用于衡量特征对目标变量的区分能力:
IV = ∑(Distr_Good_i - Distr_Bad_i) * WOE_iclass RiskFeatureEngineer: def __init__(self, n_bins=10): self.n_bins = n_bins self.bin_edges = {} self.woe_maps = {} self.iv_values = {} def optimal_binning(self, feature, target, max_bins=10): feature_clean = feature.copy() sorted_idx = np.argsort(feature_clean) sorted_target = target[sorted_idx] total_good = np.sum(target == 0) total_bad = np.sum(target == 1) unique_vals = np.unique(feature_clean) if len(unique_vals) <= max_bins: bins = unique_vals else: bins = np.percentile(feature_clean, np.linspace(0, 100, max_bins + 1)) bins = np.unique(bins) bin_indices = np.digitize(feature_clean, bins, right=True) woe_map = {} iv = 0 for i in range(len(bins)): mask = bin_indices == i if mask.sum() == 0: continue bin_good = np.sum((target == 0) & mask) bin_bad = np.sum((target == 1) & mask) distr_good = (bin_good + 0.5) / (total_good + 0.5 * len(bins)) distr_bad = (bin_bad + 0.5) / (total_bad + 0.5 * len(bins)) woe = np.log(distr_good / distr_bad) woe_map[i] = woe iv += (distr_good - distr_bad) * woe return bins, woe_map, iv def fit(self, X, y, feature_names): for i, name in enumerate(feature_names): bins, woe_map, iv = self.optimal_binning(X[:, i], y, self.n_bins) self.bin_edges[name] = bins self.woe_maps[name] = woe_map self.iv_values[name] = iv iv_df = pd.DataFrame({ '特征名称': list(self.iv_values.keys()), 'IV值': list(self.iv_values.values()) }).sort_values('IV值', ascending=False) print("\n=== 特征IV值排序(前10) ===") print(iv_df.head(10).to_string(index=False)) return iv_df def transform(self, X, feature_names): X_woe = np.zeros_like(X) for i, name in enumerate(feature_names): bins = self.bin_edges[name] woe_map = self.woe_maps[name] bin_indices = np.digitize(X[:, i], bins, right=True) for bin_idx, woe in woe_map.items(): mask = bin_indices == bin_idx X_woe[mask, i] = woe return X_woe engineer = RiskFeatureEngineer(n_bins=8) iv_df = engineer.fit(X_risk.values, y_risk, feature_names) X_risk_woe = engineer.transform(X_risk.values, feature_names) scaler_woe = StandardScaler() X_train_woe, X_test_woe, y_train_r, y_test_r = train_test_split( scaler_woe.fit_transform(X_risk_woe), y_risk, test_size=0.25, random_state=42, stratify=y_risk ) print(f"WOE编码后训练集: {X_train_woe.shape}") print(f"WOE编码后测试集: {X_test_woe.shape}")五、不平衡样本处理策略
风控数据中坏样本比例通常很低(1%-5%),直接训练会导致模型倾向于预测所有样本为好样本。常用的处理方法包括过采样、欠采样和代价敏感学习。
SMOTE过采样通过在少数类样本之间进行插值生成新样本,可以有效缓解类别不平衡问题。
class SMOTE: def __init__(self, k_neighbors=5, random_state=42): self.k_neighbors = k_neighbors self.random_state = random_state self.synthetic_samples = [] def fit_resample(self, X, y): np.random.seed(self.random_state) X_minority = X[y == 1] X_majority = X[y == 0] n_minority = len(X_minority) n_majority = len(X_majority) n_synthetic = n_majority - n_minority if n_synthetic <= 0: return X, y synthetic = [] for _ in range(n_synthetic): idx = np.random.randint(0, n_minority) sample = X_minority[idx] distances = np.linalg.norm(X_minority - sample, axis=1) nearest_indices = np.argsort(distances)[1:self.k_neighbors+1] neighbor_idx = nearest_indices[np.random.randint(0, self.k_neighbors)] neighbor = X_minority[neighbor_idx] alpha = np.random.random() synthetic_sample = sample + alpha * (neighbor - sample) synthetic.append(synthetic_sample) X_resampled = np.vstack([X, np.array(synthetic)]) y_resampled = np.hstack([y, np.ones(n_synthetic)]) print(f"SMOTE过采样: 原始样本数 {len(X)}, 合成样本数 {n_synthetic}, " f"最终样本数 {len(X_resampled)}") return X_resampled, y_resampled smote = SMOTE(k_neighbors=5, random_state=42) X_resampled, y_resampled = smote.fit_resample(X_train_r, y_train_r) print(f"过采样后正样本数: {y_resampled.sum():.0f}") print(f"过采样后负样本数: {(1 - y_resampled).sum():.0f}")六、评分卡构建与分数校准
风控评分卡将逻辑回归的预测概率转换为直观的分数。分数越高,代表用户的信用越好,违约风险越低。
评分卡的转换公式为:
Score = offset - factor * ln(odds)其中odds = P(good)/P(bad),offset和factor通过预设的基准分数和PDO(Point-to-Double Odds)计算得出。
class CreditScoreCard: def __init__(self, base_score=600, pdo=50, base_odds=50): self.base_score = base_score self.pdo = pdo self.base_odds = base_odds self.factor = pdo / np.log(2) self.offset = base_score - self.factor * np.log(base_odds) self.model = None self.feature_scores = {} def fit(self, X, y, feature_names): self.model = LogisticRegression(C=0.1, penalty='l2', solver='liblinear', class_weight='balanced', random_state=42) self.model.fit(X, y) self.feature_names = feature_names intercept = self.model.intercept_[0] coefficients = self.model.coef_[0] base_score_contribution = self.offset - self.factor * intercept for i, name in enumerate(feature_names): self.feature_scores[name] = -self.factor * coefficients[i] print(f"\n=== 评分卡参数 ===") print(f"基础分数: {self.base_score}") print(f"PDO(翻倍周期): {self.pdo}") print(f"Factor: {self.factor:.4f}") print(f"Offset: {self.offset:.4f}") print(f"截距项默认分数贡献: {base_score_contribution:.4f}") return self def predict_score(self, X): proba = self.model.predict_proba(X)[:, 1] odds = (1 - proba) / (proba + 1e-8) scores = self.offset - self.factor * np.log(odds + 1e-8) scores = np.clip(scores, 300, 900) return scores def get_score_distribution(self, X, y=None): scores = self.predict_score(X) print(f"\n=== 评分分布 ===") print(f"平均分: {scores.mean():.1f}") print(f"标准差: {scores.std():.1f}") print(f"最低分: {scores.min():.1f}") print(f"最高分: {scores.max():.1f}") percentiles = [5, 10, 25, 50, 75, 90, 95] for p in percentiles: print(f"第{p}百分位: {np.percentile(scores, p):.1f}") if y is not None: good_scores = scores[y == 0] bad_scores = scores[y == 1] print(f"\n好样本平均分: {good_scores.mean():.1f}") print(f"坏样本平均分: {bad_scores.mean():.1f}") return scores def score_breakdown(self, X, sample_idx=0): if self.model is None: raise ValueError("模型未训练") scores = self.predict_score(X) print(f"\n=== 样本{sample_idx}评分明细 ===") print(f"总评分: {scores[sample_idx]:.1f}") for i, name in enumerate(self.feature_names): feature_score = -self.factor * self.model.coef_[0][i] * X[sample_idx, i] print(f" {name}: {feature_score:+.4f}") score_card = CreditScoreCard(base_score=600, pdo=50, base_odds=50) score_card.fit(X_train_r, y_train_r, feature_names) train_scores = score_card.get_score_distribution(X_train_r, y_train_r) test_scores = score_card.get_score_distribution(X_test_r, y_test_r) score_card.score_breakdown(X_test_r, sample_idx=0)七、模型评估与风控指标
风控模型的评估不仅关注准确率,更关注模型对坏样本的识别能力。常用的评估指标包括KS值、AUC、Gini系数、Lift值等。
KS值衡量的是好样本和坏样本累积分布之间的最大差异,是风控领域最核心的评估指标之一。AUC衡量模型对正负样本的排序能力。Gini系数是AUC的线性变换:Gini = 2*AUC - 1。
class RiskModelEvaluator: def __init__(self): self.metrics = {} def calculate_ks(self, y_true, y_score): n_good = np.sum(y_true == 0) n_bad = np.sum(y_true == 1) sorted_idx = np.argsort(y_score) y_true_sorted = y_true[sorted_idx] cum_good = np.cumsum(y_true_sorted == 0) / n_good cum_bad = np.cumsum(y_true_sorted == 1) / n_bad ks = np.max(np.abs(cum_good - cum_bad)) ks_threshold = y_score[sorted_idx][np.argmax(np.abs(cum_good - cum_bad))] return ks, ks_threshold def calculate_lift(self, y_true, y_score, n_deciles=10): sorted_idx = np.argsort(y_score)[::-1] y_sorted = y_true[sorted_idx] total_bad_rate = y_true.mean() lift_values = [] decile_size = len(y_true) // n_deciles for i in range(n_deciles): start = i * decile_size end = min((i+1) * decile_size, len(y_true)) decile_bad_rate = y_sorted[start:end].mean() lift = decile_bad_rate / total_bad_rate if total_bad_rate > 0 else 0 lift_values.append(lift) return lift_values def evaluate(self, y_true, y_score, model_name='Model'): y_pred = (y_score >= 0.5).astype(int) accuracy = np.mean(y_pred == y_true) precision = np.sum((y_pred == 1) & (y_true == 1)) / (np.sum(y_pred == 1) + 1e-8) recall = np.sum((y_pred == 1) & (y_true == 1)) / (np.sum(y_true == 1) + 1e-8) f1 = 2 * precision * recall / (precision + recall + 1e-8) ks, ks_threshold = self.calculate_