ottertune/server/analysis/cluster.py

674 lines
19 KiB
Python
Raw Normal View History

2019-08-23 08:47:19 -07:00
#
# OtterTune - cluster.py
#
# Copyright (c) 2017-18, Carnegie Mellon University Database Group
#
from abc import ABCMeta, abstractproperty
from collections import OrderedDict
import os
import json
import copy
import numpy as np
from scipy.spatial.distance import cdist
from sklearn.metrics import silhouette_score
from sklearn.cluster import KMeans as SklearnKMeans
from celery.utils.log import get_task_logger
from .base import ModelBase
# Log debug messages
LOGGER = get_task_logger(__name__)
class KMeans(ModelBase):
"""
KMeans:
Fits an Sklearn KMeans model to X.
See also
--------
http://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html
Attributes
----------
n_clusters_ : int
The number of clusters, K
cluster_inertia_ : float
Sum of squared distances of samples to their closest cluster center
cluster_labels_ : array, [n_clusters_]
Labels indicating the membership of each point
cluster_centers_ : array, [n_clusters, n_features]
Coordinates of cluster centers
sample_labels_ : array, [n_samples]
Labels for each of the samples in X
sample_distances_ : array, [n_samples]
The distance between each sample point and its cluster's center
Constants
---------
SAMPLE_CUTOFF_ : int
If n_samples > SAMPLE_CUTOFF_ then sample distances
are NOT recorded
"""
SAMPLE_CUTOFF_ = 1000
def __init__(self):
self.model_ = None
self.n_clusters_ = None
self.sample_labels_ = None
self.sample_distances_ = None
@property
def cluster_inertia_(self):
# Sum of squared distances of samples to their closest cluster center
return None if self.model_ is None else \
self.model_.inertia_
@property
def cluster_labels_(self):
# Cluster membership labels for each point
return None if self.model_ is None else \
copy.deepcopy(self.model_.labels_)
@property
def cluster_centers_(self):
# Coordinates of the cluster centers
return None if self.model_ is None else \
copy.deepcopy(self.model_.cluster_centers_)
def _reset(self):
"""Resets all attributes (erases the model)"""
self.model_ = None
self.n_clusters_ = None
self.sample_labels_ = None
self.sample_distances_ = None
def fit(self, X, K, sample_labels=None, estimator_params=None):
"""Fits a Sklearn KMeans model to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data.
K : int
The number of clusters.
sample_labels : array-like, shape (n_samples), optional
Labels for each of the samples in X.
estimator_params : dict, optional
The parameters to pass to the KMeans estimators.
Returns
-------
self
"""
self._reset()
# Note: previously set n_init=50
self.model_ = SklearnKMeans(K)
if estimator_params is not None:
assert isinstance(estimator_params, dict)
self.model_.set_params(**estimator_params)
# Compute Kmeans model
self.model_.fit(X)
if sample_labels is None:
sample_labels = ["sample_{}".format(i) for i in range(X.shape[0])]
assert len(sample_labels) == X.shape[0]
self.sample_labels_ = np.array(sample_labels)
self.n_clusters_ = K
# Record sample label/distance from its cluster center
self.sample_distances_ = OrderedDict()
for cluster_label in range(self.n_clusters_):
assert cluster_label not in self.sample_distances_
member_rows = X[self.cluster_labels_ == cluster_label, :]
member_labels = self.sample_labels_[self.cluster_labels_ == cluster_label]
centroid = np.expand_dims(self.cluster_centers_[cluster_label], axis=0)
# "All clusters must have at least 1 member!"
if member_rows.shape[0] == 0:
return None
# Calculate distance between each member row and the current cluster
dists = np.empty(member_rows.shape[0])
dist_labels = []
for j, (row, label) in enumerate(zip(member_rows, member_labels)):
dists[j] = cdist(np.expand_dims(row, axis=0), centroid, "euclidean").squeeze()
dist_labels.append(label)
# Sort the distances/labels in ascending order
sort_order = np.argsort(dists)
dists = dists[sort_order]
dist_labels = np.array(dist_labels)[sort_order]
self.sample_distances_[cluster_label] = {
"sample_labels": dist_labels,
"distances": dists,
}
return self
def get_closest_samples(self):
"""Returns a list of the labels of the samples that are located closest
to their cluster's center.
Returns
----------
closest_samples : list
A list of the sample labels that are located the closest to
their cluster's center.
"""
if self.sample_distances_ is None:
raise Exception("No model has been fit yet!")
return [samples['sample_labels'][0] for samples in list(self.sample_distances_.values())]
def get_memberships(self):
'''
Return the memberships in each cluster
'''
memberships = OrderedDict()
for cluster_label, samples in list(self.sample_distances_.items()):
memberships[cluster_label] = OrderedDict(
[(l, d) for l, d in zip(samples["sample_labels"], samples["distances"])])
return json.dumps(memberships, indent=4)
class KMeansClusters(ModelBase):
"""
KMeansClusters:
Fits a KMeans model to X for clusters in the range [min_cluster_, max_cluster_].
Attributes
----------
min_cluster_ : int
The minimum cluster size to fit a KMeans model to
max_cluster_ : int
The maximum cluster size to fit a KMeans model to
cluster_map_ : dict
A dictionary mapping the cluster size (K) to the KMeans
model fitted to X with K clusters
sample_labels_ : array, [n_samples]
Labels for each of the samples in X
"""
def __init__(self):
self.min_cluster_ = None
self.max_cluster_ = None
self.cluster_map_ = None
self.sample_labels_ = None
def _reset(self):
"""Resets all attributes (erases the model)"""
self.min_cluster_ = None
self.max_cluster_ = None
self.cluster_map_ = None
self.sample_labels_ = None
def fit(self, X, min_cluster, max_cluster, sample_labels=None, estimator_params=None):
"""Fits a KMeans model to X for each cluster in the range [min_cluster, max_cluster].
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data.
min_cluster : int
The minimum cluster size to fit a KMeans model to.
max_cluster : int
The maximum cluster size to fit a KMeans model to.
sample_labels : array-like, shape (n_samples), optional
Labels for each of the samples in X.
estimator_params : dict, optional
The parameters to pass to the KMeans estimators.
Returns
-------
self
"""
self._reset()
self.min_cluster_ = min_cluster
self.max_cluster_ = max_cluster
self.cluster_map_ = {}
if sample_labels is None:
sample_labels = ["sample_{}".format(i) for i in range(X.shape[1])]
self.sample_labels_ = sample_labels
for K in range(self.min_cluster_, self.max_cluster_ + 1):
tmp = KMeans().fit(X, K, self.sample_labels_, estimator_params)
if tmp is None: # Set maximum cluster
assert K > min_cluster, "min_cluster is too large for the model"
self.max_cluster_ = K - 1
break
else:
self.cluster_map_[K] = tmp
return self
class KSelection(ModelBase, metaclass=ABCMeta):
"""KSelection:
Abstract class for techniques that approximate the optimal
number of clusters (K).
Attributes
----------
optimal_num_clusters_ : int
An estimation of the optimal number of clusters K for
a KMeans model fit to X
clusters_ : array, [n_clusters]
The sizes of the clusters
name_ : string
The name of this technique
"""
NAME_ = None
def __init__(self):
self.optimal_num_clusters_ = None
self.clusters_ = None
def _reset(self):
"""Resets all attributes (erases the model)"""
self.optimal_num_clusters_ = None
self.clusters_ = None
@abstractproperty
def name_(self):
pass
def save(self, savedir):
"""Saves the estimation of the optimal # of clusters.
Parameters
----------
savedir : string
Path to the directory to save the results in.
"""
if self.optimal_num_clusters_ is None:
raise Exception("Optimal number of clusters has not been computed!")
# Save the computed optimal number of clusters
savepath = os.path.join(savedir, self.name_ + "_optimal_num_clusters.txt")
with open(savepath, "w") as f:
f.write(str(self.optimal_num_clusters_))
class GapStatistic(KSelection):
"""GapStatistic:
Approximates the optimal number of clusters (K).
References
----------
https://web.stanford.edu/~hastie/Papers/gap.pdf
Attributes
----------
optimal_num_clusters_ : int
An estimation of the optimal number of clusters K for
a KMeans model fit to X
clusters_ : array, [n_clusters]
The sizes of the clusters
name_ : string
The name of this technique
log_wks_ : array, [n_clusters]
The within-dispersion measures of X (log)
log_wkbs_ : array, [n_clusters]
The within-dispersion measures of the generated
reference data sets
khats_ : array, [n_clusters]
The gap-statistic for each cluster
"""
NAME_ = "gap-statistic"
def __init__(self):
super(GapStatistic, self).__init__()
self.log_wks_ = None
self.log_wkbs_ = None
self.khats_ = None
@property
def name_(self):
return self.NAME_
def _reset(self):
"""Resets all attributes (erases the model)"""
super(GapStatistic, self)._reset()
self.log_wks_ = None
self.log_wkbs_ = None
self.khats_ = None
def fit(self, X, cluster_map, n_b=50):
"""Estimates the optimal number of clusters (K) for a
KMeans model trained on X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data.
cluster_map_ : dict
A dictionary mapping each cluster size (K) to the KMeans
model fitted to X with K clusters
n_B : int
The number of reference data sets to generate
Returns
-------
self
"""
self._reset()
mins, maxs = GapStatistic.bounding_box(X)
n_clusters = len(cluster_map)
# Dispersion for real distribution
log_wks = np.zeros(n_clusters)
log_wkbs = np.zeros(n_clusters)
sk = np.zeros(n_clusters)
for indk, (K, model) in enumerate(sorted(cluster_map.items())):
# Computes Wk: the within-dispersion of each cluster size (k)
log_wks[indk] = np.log(model.cluster_inertia_ / (2.0 * K))
# Create B reference datasets
log_bwkbs = np.zeros(n_b)
for i in range(n_b):
Xb = np.empty_like(X)
for j in range(X.shape[1]):
Xb[:, j] = np.random.uniform(mins[j], maxs[j], size=X.shape[0])
Xb_model = KMeans().fit(Xb, K)
log_bwkbs[i] = np.log(Xb_model.cluster_inertia_ / (2.0 * K))
log_wkbs[indk] = sum(log_bwkbs) / n_b
sk[indk] = np.sqrt(sum((log_bwkbs - log_wkbs[indk]) ** 2) / n_b)
sk = sk * np.sqrt(1 + 1.0 / n_b)
khats = np.zeros(n_clusters)
gaps = log_wkbs - log_wks
gsks = gaps - sk
khats[1:] = gaps[0:-1] - gsks[1:]
self.clusters_ = np.array(sorted(cluster_map.keys()))
for i in range(1, n_clusters):
if gaps[i - 1] >= gsks[i]:
self.optimal_num_clusters_ = self.clusters_[i - 1]
break
if self.optimal_num_clusters_ is None:
LOGGER.info("GapStatistic NOT found the optimal k, \
use the last(maximum) k instead ")
self.optimal_num_clusters_ = self.clusters_[-1]
self.log_wks_ = log_wks
self.log_wkbs_ = log_wkbs
self.khats_ = khats
return self
@staticmethod
def bounding_box(X):
"""Computes the box that tightly bounds X
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data.
Returns
-------
The mins and maxs that make up the bounding box
"""
mins = np.min(X, axis=0)
maxs = np.max(X, axis=0)
return mins, maxs
@staticmethod
def Wk(X, mu, cluster_labels):
"""Computes the within-dispersion of each cluster size (k)
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data.
mu : array-like, shape (n_clusters, n_features)
Coordinates of cluster centers
cluster_labels: array-like, shape (n_samples)
Labels for each of the samples in X.
Returns
-------
The within-dispersion of each cluster (K)
"""
K = len(mu)
return sum([np.linalg.norm(mu[i] - x) ** 2 / (2.0 * K)
for i in range(K)
for x in X[cluster_labels == i]])
class DetK(KSelection):
"""DetK:
Approximates the optimal number of clusters (K).
References
----------
https://www.ee.columbia.edu/~dpwe/papers/PhamDN05-kmeans.pdf
Attributes
----------
optimal_num_clusters_ : int
An estimation of the optimal number of clusters K for
KMeans models fit to X
clusters_ : array, [n_clusters]
The sizes of the clusters
name_ : string
The name of this technique
fs_ : array, [n_clusters]
The computed evaluation functions F(K) for each cluster size K
"""
NAME_ = "det-k"
def __init__(self):
super(DetK, self).__init__()
self.fs_ = None
@property
def name_(self):
return DetK.NAME_
def _reset(self):
"""Resets all attributes (erases the model)"""
super(DetK, self)._reset()
self.fs_ = None
def fit(self, X, cluster_map):
"""Estimates the optimal number of clusters (K) for a
KMeans model trained on X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data.
cluster_map_ : dict
A dictionary mapping each cluster size (K) to the KMeans
model fitted to X with K clusters
Returns
-------
self
"""
self._reset()
n_clusters = len(cluster_map)
nd = X.shape[1]
fs = np.empty(n_clusters)
sks = np.empty(n_clusters)
alpha = {}
# K from 1 to maximum_cluster_
for i, (K, model) \
in enumerate(sorted(cluster_map.items())):
# Compute alpha(K, nd) (i.e. alpha[K])
if K == 2:
alpha[K] = 1 - 3.0 / (4 * nd)
elif K > 2:
alpha[K] = alpha[K - 1] + (1 - alpha[K - 1]) / 6.0
sks[i] = model.cluster_inertia_
if K == 1:
fs[i] = 1
elif sks[i - 1] == 0:
fs[i] = 1
else:
fs[i] = sks[i] / (alpha[K] * sks[i - 1])
self.clusters_ = np.array(sorted(cluster_map.keys()))
self.optimal_num_clusters_ = self.clusters_[np.argmin(fs)]
self.fs_ = fs
return self
class Silhouette(KSelection):
"""Det:
Approximates the optimal number of clusters (K).
References
----------
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.silhouette_score.html
Attributes
----------
optimal_num_clusters_ : int
An estimation of the optimal number of clusters K for
KMeans models fit to X
clusters_ : array, [n_clusters]
The sizes of the clusters
name_ : string
The name of this technique
Score_ : array, [n_clusters]
The mean Silhouette Coefficient for each cluster size K
"""
# short for Silhouette score
NAME_ = "s-score"
def __init__(self):
super(Silhouette, self).__init__()
self.scores_ = None
@property
def name_(self):
return Silhouette.NAME_
def _reset(self):
"""Resets all attributes (erases the model)"""
super(Silhouette, self)._reset()
self.scores_ = None
def fit(self, X, cluster_map):
"""Estimates the optimal number of clusters (K) for a
KMeans model trained on X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data.
cluster_map_ : dict
A dictionary mapping each cluster size (K) to the KMeans
model fitted to X with K clusters
Returns
-------
self
"""
self._reset()
n_clusters = len(cluster_map)
# scores = np.empty(n_clusters)
scores = np.zeros(n_clusters)
for i, (K, model) \
in enumerate(sorted(cluster_map.items())):
if K <= 1: # K >= 2
continue
scores[i] = silhouette_score(X, model.cluster_labels_)
self.clusters_ = np.array(sorted(cluster_map.keys()))
self.optimal_num_clusters_ = self.clusters_[np.argmax(scores)]
self.scores_ = scores
return self
def create_kselection_model(model_name):
"""Constructs the KSelection model object with the given name
Parameters
----------
model_name : string
Name of the KSelection model.
One of ['gap-statistic', 'det-k', 's-score']
Returns
-------
The constructed model object
"""
kselection_map = {
DetK.NAME_: DetK,
GapStatistic.NAME_: GapStatistic,
Silhouette.NAME_: Silhouette
}
if model_name not in kselection_map:
raise Exception("KSelection model {} not supported!".format(model_name))
else:
return kselection_map[model_name]()