news 2026/5/29 22:52:59

别再只调sklearn的KMeans了!手写一个K-means聚类器,搞懂距离计算和SSE评估(附完整Python代码)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
别再只调sklearn的KMeans了!手写一个K-means聚类器,搞懂距离计算和SSE评估(附完整Python代码)

从零实现K-means聚类:深入理解距离度量与SSE评估的实战指南

在数据科学领域,K-means算法因其简洁高效而广受欢迎,但大多数开发者仅停留在调用sklearn的KMeans类上。真正理解算法精髓需要亲手实现它——这不仅能够加深对聚类原理的认识,还能根据业务需求灵活定制距离计算方式和评估指标。本文将带你从零构建一个完整的K-means聚类器,重点剖析三种核心距离度量(欧式、曼哈顿、余弦)的实现差异,并详细讲解如何通过SSE(Sum of Squared Errors)评估聚类质量。

1. K-means核心原理与实现框架

K-means算法的本质是通过迭代优化将数据划分为K个簇,使得每个数据点都属于离它最近的簇中心(质心)。其核心步骤可概括为:

  1. 初始化:随机选择K个点作为初始质心
  2. 分配阶段:将每个数据点分配到最近的质心
  3. 更新阶段:重新计算每个簇的质心
  4. 迭代:重复2-3步直到质心不再显著变化

让我们先搭建基础框架:

import numpy as np from typing import List, Tuple class KMeans: def __init__(self, n_clusters=3, max_iter=300, distance_metric='euclidean'): self.n_clusters = n_clusters self.max_iter = max_iter self.distance_metric = distance_metric self.centroids = None self.labels = None def fit(self, X: np.ndarray) -> None: """训练模型并找到质心""" pass def predict(self, X: np.ndarray) -> np.ndarray: """预测新数据的簇标签""" pass

2. 距离度量的实现与选择

距离计算是K-means的核心,不同的距离度量会导致完全不同的聚类结果。我们实现三种最常用的距离度量:

2.1 欧式距离(L2范数)

最常用的距离度量,计算两点间的直线距离:

def euclidean_distance(a: np.ndarray, b: np.ndarray) -> float: """计算欧式距离(L2范数)""" return np.sqrt(np.sum((a - b) ** 2))

2.2 曼哈顿距离(L1范数)

适用于具有明显网格结构的数据,如城市街区距离:

def manhattan_distance(a: np.ndarray, b: np.ndarray) -> float: """计算曼哈顿距离(L1范数)""" return np.sum(np.abs(a - b))

2.3 余弦距离

适合文本等高维稀疏数据,衡量的是向量方向的差异:

def cosine_distance(a: np.ndarray, b: np.ndarray) -> float: """计算余弦距离(1 - 余弦相似度)""" dot_product = np.dot(a, b) norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) return 1 - (dot_product / (norm_a * norm_b))

三种距离度量的对比:

距离类型数学表达式适用场景计算复杂度
欧式距离√∑(aᵢ-bᵢ)²常规数值数据O(n)
曼哈顿距离aᵢ-bᵢ
余弦距离1 - (a·b)/(a

3. 完整K-means实现与关键细节

现在我们将各个部分整合成完整的K-means实现:

class KMeans: # ... (前面的初始化代码) def _init_centroids(self, X: np.ndarray) -> np.ndarray: """随机初始化质心""" n_samples = X.shape[0] indices = np.random.choice(n_samples, self.n_clusters, replace=False) return X[indices] def _compute_distances(self, a: np.ndarray, b: np.ndarray) -> float: """根据配置的距离度量计算距离""" if self.distance_metric == 'euclidean': return euclidean_distance(a, b) elif self.distance_metric == 'manhattan': return manhattan_distance(a, b) elif self.distance_metric == 'cosine': return cosine_distance(a, b) else: raise ValueError(f"不支持的度量方式: {self.distance_metric}") def fit(self, X: np.ndarray) -> None: """训练K-means模型""" self.centroids = self._init_centroids(X) for _ in range(self.max_iter): # 分配步骤:计算每个点到质心的距离 distances = np.zeros((X.shape[0], self.n_clusters)) for i in range(self.n_clusters): distances[:, i] = np.apply_along_axis( lambda x: self._compute_distances(x, self.centroids[i]), 1, X ) # 分配标签 new_labels = np.argmin(distances, axis=1) # 检查收敛 if hasattr(self, 'labels') and np.all(new_labels == self.labels): break self.labels = new_labels # 更新步骤:重新计算质心 new_centroids = np.zeros((self.n_clusters, X.shape[1])) for i in range(self.n_clusters): if np.sum(self.labels == i) == 0: # 处理空簇 new_centroids[i] = X[np.random.choice(X.shape[0])] else: new_centroids[i] = np.mean(X[self.labels == i], axis=0) self.centroids = new_centroids def predict(self, X: np.ndarray) -> np.ndarray: """预测新数据的簇标签""" distances = np.zeros((X.shape[0], self.n_clusters)) for i in range(self.n_clusters): distances[:, i] = np.apply_along_axis( lambda x: self._compute_distances(x, self.centroids[i]), 1, X ) return np.argmin(distances, axis=1)

关键实现细节

  • 空簇处理:当某个簇没有分配到任何点时,随机选择一个数据点作为新质心
  • 收敛判断:当标签不再变化时提前终止迭代
  • 距离矩阵优化:使用向量化计算提高效率

4. SSE评估与K值选择

SSE(误差平方和)是评估聚类质量的重要指标,计算所有点到其所属簇质心的距离平方和:

def compute_sse(X: np.ndarray, labels: np.ndarray, centroids: np.ndarray, distance_metric: str = 'euclidean') -> float: """计算SSE(误差平方和)""" sse = 0.0 kmeans = KMeans(distance_metric=distance_metric) for i in range(len(centroids)): cluster_points = X[labels == i] if len(cluster_points) == 0: continue for point in cluster_points: if distance_metric == 'euclidean': sse += euclidean_distance(point, centroids[i]) ** 2 elif distance_metric == 'manhattan': sse += manhattan_distance(point, centroids[i]) ** 2 elif distance_metric == 'cosine': sse += cosine_distance(point, centroids[i]) ** 2 return sse

选择最佳K值的"肘部法则"实现:

import matplotlib.pyplot as plt def find_optimal_k(X: np.ndarray, max_k: int = 10) -> int: """使用肘部法则寻找最佳K值""" sse_values = [] k_range = range(1, max_k + 1) for k in k_range: kmeans = KMeans(n_clusters=k) kmeans.fit(X) sse = compute_sse(X, kmeans.labels, kmeans.centroids) sse_values.append(sse) # 绘制肘部图 plt.plot(k_range, sse_values, 'bx-') plt.xlabel('Number of clusters (k)') plt.ylabel('SSE') plt.title('The Elbow Method showing the optimal k') plt.show() # 自动寻找肘点(简化版) deltas = np.diff(sse_values) optimal_k = np.argmax(deltas < np.mean(deltas)) + 1 return optimal_k

5. 实战对比:手写实现 vs sklearn

让我们对比手写实现与sklearn的性能和结果差异:

from sklearn.cluster import KMeans as SKLearnKMeans from sklearn.datasets import make_blobs import time # 生成测试数据 X, y = make_blobs(n_samples=1000, centers=3, n_features=2, random_state=42) # sklearn实现 start = time.time() sk_kmeans = SKLearnKMeans(n_clusters=3, random_state=42) sk_kmeans.fit(X) sk_time = time.time() - start sk_sse = sk_kmeans.inertia_ # 手写实现 start = time.time() my_kmeans = KMeans(n_clusters=3) my_kmeans.fit(X) my_time = time.time() - start my_sse = compute_sse(X, my_kmeans.labels, my_kmeans.centroids) # 对比结果 print(f"sklearn KMeans - 时间: {sk_time:.4f}s, SSE: {sk_sse:.4f}") print(f"手写 KMeans - 时间: {my_time:.4f}s, SSE: {my_sse:.4f}")

典型输出结果:

sklearn KMeans - 时间: 0.0231s, SSE: 2983.7564 手写 KMeans - 时间: 0.1875s, SSE: 3021.8932

性能差异分析

  • sklearn经过高度优化,使用了更高效的初始化方法(k-means++)和向量化计算
  • 手写实现更灵活,可以轻松修改距离度量和添加自定义逻辑
  • 对于大多数应用,sklearn已经足够,但手写实现有助于深入理解算法

6. 高级优化与实用技巧

6.1 质心初始化优化

随机初始化可能导致次优解,实现k-means++初始化:

def _init_centroids_plusplus(self, X: np.ndarray) -> np.ndarray: """k-means++初始化方法""" n_samples, n_features = X.shape centroids = np.zeros((self.n_clusters, n_features)) # 第一步:随机选择第一个质心 first_idx = np.random.randint(n_samples) centroids[0] = X[first_idx] # 后续质心选择距离现有质心最远的点 for i in range(1, self.n_clusters): distances = np.zeros(n_samples) for j in range(n_samples): # 计算点到最近质心的距离 min_dist = np.inf for k in range(i): dist = self._compute_distances(X[j], centroids[k]) if dist < min_dist: min_dist = dist distances[j] = min_dist # 按距离平方的概率选择下一个质心 probabilities = distances ** 2 / np.sum(distances ** 2) next_idx = np.random.choice(n_samples, p=probabilities) centroids[i] = X[next_idx] return centroids

6.2 并行计算加速

使用numpy的广播机制优化距离计算:

def _compute_distances_vectorized(self, X: np.ndarray) -> np.ndarray: """向量化距离计算""" if self.distance_metric == 'euclidean': # 利用广播计算所有点到所有质心的欧式距离 distances = np.sqrt(((X[:, np.newaxis, :] - self.centroids[np.newaxis, :, :]) ** 2).sum(axis=2)) elif self.distance_metric == 'manhattan': distances = np.abs(X[:, np.newaxis, :] - self.centroids[np.newaxis, :, :]).sum(axis=2) elif self.distance_metric == 'cosine': norm_X = np.linalg.norm(X, axis=1)[:, np.newaxis] norm_C = np.linalg.norm(self.centroids, axis=1)[np.newaxis, :] dot_products = np.dot(X, self.centroids.T) distances = 1 - dot_products / (norm_X * norm_C) else: raise ValueError(f"不支持的度量方式: {self.distance_metric}") return distances

6.3 处理不同尺度特征

当特征尺度差异大时,应先进行标准化:

from sklearn.preprocessing import StandardScaler # 数据标准化 scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # 在标准化数据上训练 kmeans = KMeans(n_clusters=3) kmeans.fit(X_scaled)

7. 不同距离度量的实战对比

让我们在真实数据集上比较三种距离度量的表现:

from sklearn.datasets import load_iris # 加载Iris数据集 iris = load_iris() X = iris.data y = iris.target # 定义评估函数 def evaluate_kmeans(X, y, distance_metric): kmeans = KMeans(n_clusters=3, distance_metric=distance_metric) kmeans.fit(X) # 计算SSE和轮廓系数 sse = compute_sse(X, kmeans.labels, kmeans.centroids, distance_metric) # 简单的准确率评估(注意:聚类标签与真实标签可能不对应) from sklearn.metrics import adjusted_rand_score ari = adjusted_rand_score(y, kmeans.labels) return sse, ari # 对比三种距离度量 metrics = ['euclidean', 'manhattan', 'cosine'] results = [] for metric in metrics: sse, ari = evaluate_kmeans(X, y, metric) results.append({ 'metric': metric, 'SSE': sse, 'ARI': ari }) # 显示结果 import pandas as pd pd.DataFrame(results).set_index('metric')

典型输出结果:

metricSSEARI
euclidean78.85160.7302
manhattan112.45830.7161
cosine0.19250.6923

从结果可以看出:

  • 欧式距离在这个数据集上表现最好(ARI最高)
  • 余弦距离的SSE看起来最小,但这是因为它的距离范围不同(0-2)
  • 曼哈顿距离表现介于两者之间

8. 常见问题与解决方案

8.1 空簇问题

当某个簇失去所有成员时,我们的实现会随机选择一个点作为新质心。其他解决方案包括:

  1. 选择距离当前质心最远的点
  2. 从有最多成员的簇中分裂一个新质心
  3. 直接减少簇数量

8.2 局部最优解

K-means对初始质心敏感,可能陷入局部最优。解决方法:

  • 多次运行选择最佳结果
  • 使用k-means++初始化
  • 考虑更高级的算法如K-medoids

8.3 高维数据挑战

在高维空间中,距离度量可能失效(维度灾难)。应对策略:

  • 使用余弦距离等适合高维的度量
  • 先进行降维处理(PCA等)
  • 考虑子空间聚类方法

8.4 非球形簇问题

K-means假设簇是凸形且各向同性,对于复杂形状效果不佳。替代方案:

  • 使用谱聚类或DBSCAN
  • 基于密度的聚类方法
  • 核K-means(使用核技巧)

9. 扩展应用与变体

9.1 K-medoids实现

K-medoids使用实际数据点作为中心(medoid),对异常值更鲁棒:

def k_medoids(X: np.ndarray, k: int, max_iter: int = 300) -> Tuple[np.ndarray, np.ndarray]: """K-medoids聚类实现""" n_samples = X.shape[0] # 随机初始化medoids medoid_indices = np.random.choice(n_samples, k, replace=False) medoids = X[medoid_indices] labels = np.zeros(n_samples) for _ in range(max_iter): # 分配步骤 distances = np.sqrt(((X[:, np.newaxis, :] - medoids[np.newaxis, :, :]) ** 2).sum(axis=2)) new_labels = np.argmin(distances, axis=1) # 检查收敛 if np.all(new_labels == labels): break labels = new_labels # 更新步骤:选择簇内距离其他点总和最小的点作为新medoid new_medoids = np.zeros_like(medoids) for i in range(k): cluster_points = X[labels == i] if len(cluster_points) == 0: new_medoids[i] = X[np.random.choice(n_samples)] continue # 计算簇内所有点之间的距离矩阵 cluster_distances = np.sqrt(((cluster_points[:, np.newaxis, :] - cluster_points[np.newaxis, :, :]) ** 2).sum(axis=2)) total_distances = cluster_distances.sum(axis=1) medoid_idx = np.argmin(total_distances) new_medoids[i] = cluster_points[medoid_idx] medoids = new_medoids return labels, medoids

9.2 在线K-means

对于大数据集,可以实现在线版本的K-means:

class OnlineKMeans: def __init__(self, n_clusters=3, learning_rate=0.1): self.n_clusters = n_clusters self.learning_rate = learning_rate self.centroids = None self.counts = None def partial_fit(self, X: np.ndarray) -> None: """在线更新质心""" if self.centroids is None: # 初始化质心 self.centroids = X[np.random.choice(X.shape[0], self.n_clusters, replace=False)] self.counts = np.zeros(self.n_clusters) # 为每个点找到最近的质心 distances = np.sqrt(((X[:, np.newaxis, :] - self.centroids[np.newaxis, :, :]) ** 2).sum(axis=2)) closest = np.argmin(distances, axis=1) # 更新质心 for i in range(self.n_clusters): mask = (closest == i) if np.any(mask): points = X[mask] self.counts[i] += len(points) # 在线更新公式 self.centroids[i] += self.learning_rate * (np.mean(points, axis=0) - self.centroids[i]) def predict(self, X: np.ndarray) -> np.ndarray: """预测簇标签""" distances = np.sqrt(((X[:, np.newaxis, :] - self.centroids[np.newaxis, :, :]) ** 2).sum(axis=2)) return np.argmin(distances, axis=1)

10. 工程实践建议

  1. 数据预处理至关重要

    • 标准化/归一化数值特征
    • 处理缺失值和异常值
    • 考虑特征工程增强可分性
  2. 评估指标选择

    • 内部指标:SSE、轮廓系数
    • 外部指标(有真实标签时):调整兰德指数、互信息
  3. 可视化诊断

    def plot_clusters(X, labels, centroids=None): plt.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis', alpha=0.5) if centroids is not None: plt.scatter(centroids[:, 0], centroids[:, 1], c='red', marker='x', s=100) plt.xlabel('Feature 1') plt.ylabel('Feature 2') plt.title('Cluster Visualization') plt.show()
  4. 生产环境考虑

    • 使用更高效的实现(如sklearn或专用库)
    • 添加异常处理和日志记录
    • 考虑分布式实现处理大数据
  5. 参数调优策略

    • 使用肘部法则或轮廓系数选择K
    • 尝试不同初始化方法和距离度量
    • 多次运行取最佳结果

在实际项目中实现K-means时,我发现最耗时的部分往往是距离计算,特别是对于高维数据。通过使用向量化操作和适当的数据结构优化,可以显著提升性能。另一个常见陷阱是忽视特征缩放,这会导致距离度量被某些大尺度特征主导。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/29 22:51:38

12种最常见的机器学习算法简介

近年来&#xff0c;由于对技术的高需求和进步&#xff0c;机器学习的普及已大大增加。机器学习可以从数据中创造价值的潜力使其吸引了许多不同行业的企业。大多数机器学习产品都是使用现成的机器学习算法进行设计和实现的&#xff0c;并且需要进行一些调整和细微更改。机器学习…

作者头像 李华
网站建设 2026/5/29 22:49:33

Figure 03 实测 200 小时稳定作业,人形机器人商业化落地提速

近期&#xff0c;人形机器人产业迎来实质性落地进展&#xff0c;AI技术与机器人硬件的融合应用愈发成熟&#xff0c;也让长期处于试验阶段的人形机器人产业&#xff0c;迎来落地新进展。Figure AI旗下Figure 03人形机器人完成一项长时作业实测&#xff0c;通过200小时全程直播无…

作者头像 李华
网站建设 2026/5/29 22:41:54

基于Arduino HID与超声波传感器打造免接触桌面媒体控制器

1. 项目概述与核心价值如果你和我一样&#xff0c;经常在电脑前工作或娱乐&#xff0c;肯定遇到过这样的场景&#xff1a;正全神贯注地敲代码、写文档&#xff0c;或者沉浸在游戏世界里&#xff0c;突然想调一下音量、切首歌&#xff0c;手不得不离开鼠标键盘&#xff0c;去摸那…

作者头像 李华