# Copyright (c) 2022 Panagiotis Anagnostou
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Implementation of the clustering algorithms, members of the HiPart package.
@author Panagiotis Anagnostou
@author Nicos Pavlidis
"""
import HiPart.__utility_functions as util
import numpy as np
import statsmodels.api as sm
from HiPart.__partition_class import Partition
from KDEpy import FFTKDE
from scipy import stats
from sklearn.cluster import KMeans
from treelib import Tree
[docs]
class DePDDP(Partition):
"""
Class dePDDP. It executes the dePDDP algorithm.
References
----------
Tasoulis, S. K., Tasoulis, D. K., & Plagianakos, V. P. (2010). Enhancing
principal direction divisive clustering. Pattern Recognition, 43(10), 3391-
3411.
Parameters
----------
decomposition_method : str, (optional)
One of the ('pca', 'kpca', 'ica', 'tsne') supported decomposition
methods used as kernel for the dePDDP algorithm.
max_clusters_number : int, (optional)
Desired maximum number of clusters to find the dePDDP algorithm.
bandwidth_scale : float, (optional)
Standard deviation scaler for the density approximation.
percentile : float, (optional)
The percentile distance from the dataset's edge in which a split can
not occur. [0,0.5) values are allowed.
min_sample_split : int, (optional)
The minimum number of points needed in a cluster for a split to occur.
visualization_utility : bool, (optional)
If (True) generate the data needed by the visualization utilities of
the package otherwise, if false the split_visualization and
interactive_visualization of the package can not be created. For the
'tsne' decomposition method does not support visualization because it
affects the correct execution of the dePDDP algorithm.
distance_matrix : bool, (optional)
If (True) the input data are considered as a distance matrix and not as
a data matrix. The distance matrix is a square matrix with the samples
on the rows and the variables on the columns. The distance matrix is
used only in conjunction with the 'mds' decomposition method and no
other from the supported decomposition methods.
**decomposition_args :
Arguments for each of the decomposition methods ("decomposition.PCA" as
"pca", "decomposition.KernelPCA" as "kpca", "decomposition.FastICA" as
"ica", "manifold.TSNE" as "tsne") utilized by the HiPart package, as
documented in the scikit-learn package, from which they are implemented.
Attributes
----------
output_matrix : numpy.ndarray
Model's step by step execution output.
labels_ : numpy.ndarray
Extracted clusters from the algorithm.
tree : treelib.Tree
The object which contains all the information about the execution of
the dePDDP algorithm.
samples_number : int
The number of samples contained in the data.
fit_predict(X) :
Returns the results of the fit method in the form of the labels of the
predicted clustering labels.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns. If the distance_matrix is True then X should be a square
distance matrix.
Returns
-------
labels_ : numpy.ndarray
Extracted clusters from the algorithm.
"""
decreasing = False
def __init__(
self,
decomposition_method="pca",
max_clusters_number=100,
bandwidth_scale=0.5,
percentile=0.1,
min_sample_split=5,
visualization_utility=True,
distance_matrix=False,
**decomposition_args,
):
super().__init__(
decomposition_method,
max_clusters_number,
min_sample_split,
visualization_utility,
distance_matrix,
**decomposition_args,
)
self.bandwidth_scale = bandwidth_scale
self.percentile = percentile
[docs]
def fit(self, X):
"""
Execute the dePDDP algorithm and return all the execution data in the
form of a dePDDP class object.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns. If the distance_matrix is True then X should be a square
distance matrix.
Returns
-------
self
A dePDDP class type object, with complete results on the
algorithm's analysis.
"""
self.X = X
self.samples_number = np.size(X, 0)
if self.distance_matrix:
if X.shape[0] != X.shape[1]:
raise ValueError("dePDDP: distance_matrix: Should be a square matrix")
# create an id vector for the samples of X
indices = np.array([int(i) for i in range(self.samples_number)])
# initialize the tree and root node # step (0)
den_tree = Tree()
# nodes unique IDs indicator
self.node_ids = 0
# nodes next color indicator (necessary for visualization purposes)
self.cluster_color = 0
den_tree.create_node(
tag="cl_" + str(self.node_ids),
identifier=self.node_ids,
data=self.calculate_node_data(indices, self.cluster_color),
)
# indicator for the next node to split
selected_node = 0
# if no possibility of split exists on the data # (ST2)
if not den_tree.get_node(0).data["split_permission"]:
raise RuntimeError("dePDDP cannot split the data at all!!!")
# Initialize the ST1 stopping criterion counter that count the number
# of clusters # (ST1)
found_clusters = 1
while (found_clusters < self.max_clusters_number) and (
selected_node is not None
): # (ST1) or (ST2)
self.split_function(den_tree, selected_node) # step (1, 2)
# select the next kid for split based on the local minimum density
selected_node = self.select_kid(
den_tree.leaves(), self.decreasing
) # step (3)
found_clusters = found_clusters + 1 # (ST1)
self.tree = den_tree
return self
[docs]
def calculate_node_data(self, indices, key):
"""
Calculation of the projections onto the Principal Components with the
utilization of the "Principal Components Analysis" or the "Kernel
Principal Components Analysis" or the "Independent Component Analysis"
or "t-SNE" methods.
With the incorporation of the "Multi-Dimensional Scaling" method the
function can also be used for distance matrices. The distance matrix is
used only in correlation with the "mds" decomposition method. This makes
us check for the correct configuration of the parameters each time the
function is executed.
Determination of the projection's density and search for its local
minima. The lowest local minimum point within the allowed sample
percentiles of the projection's density representation is selected
as the split point.
This function leads to the second Stopping criterion 2 of the
algorithm.
Parameters
----------
indices : numpy.ndarray
The index of the samples in the original data matrix.
key : int
The value of the color for each node.
Returns
-------
data : dict
The necessary data for each node which are splitting point.
"""
method = None # (ST2)
proj = None # (ST2)
splitpoint = None # (ST2)
split_criterion = None # (ST2)
flag = False # (ST2)
# if the number of samples
if indices.shape[0] > self.min_sample_split:
# Apply the decomposition method on the data matrix
if self.distance_matrix:
if self.decomposition_method != "mds":
raise ValueError(
"dePDDP: decomposition_method: Should be 'mds' for distance_matrix"
)
method = util.execute_decomposition_method(
data_matrix=util.select_from_distance_matrix(self.X, indices),
decomposition_method=self.decomposition_method,
two_dimentions=self.visualization_utility,
decomposition_args=self.decomposition_args,
)
proj = method.fit_transform(
util.select_from_distance_matrix(self.X, indices)
)
else:
method = util.execute_decomposition_method(
data_matrix=self.X[indices, :],
decomposition_method=self.decomposition_method,
two_dimentions=self.visualization_utility,
decomposition_args=self.decomposition_args,
)
proj = method.fit_transform(self.X[indices, :])
one_dimension = proj[:, 0]
# calculate the standard deviation of the data
bandwidth = sm.nonparametric.bandwidths.select_bandwidth(
one_dimension, "silverman", kernel=None
)
# calculate the density function on the 1st Principal Component
# x_ticks: projection points on the 1st PC
# evaluation: the density of the projections on the 1st PC
x_ticks, evaluation = (
FFTKDE(kernel="gaussian", bw=self.bandwidth_scale * bandwidth)
.fit(one_dimension)
.evaluate()
)
# calculate all the local minima
minimum_indices = np.where(
np.diff((np.diff(evaluation) > 0).astype("int8")) == 1
)[0]
# Find the location of the local minima and make sure they are
# with in the given percentile limits
quantile_value = np.quantile(
one_dimension, (self.percentile, (1 - self.percentile))
)
local_minimum_index = np.where(
np.logical_and(
x_ticks[minimum_indices] > quantile_value[0],
x_ticks[minimum_indices] < quantile_value[1],
)
)
# List all the numbers for the local minima (ee) and their
# respective position (ss) on the 1st PC.
ss = x_ticks[minimum_indices][local_minimum_index]
ee = evaluation[minimum_indices][local_minimum_index]
# if there is at least one local minima split the data
if ss.size > 0:
minimum_location = np.argmin(ee)
splitpoint = ss[minimum_location]
split_criterion = ee[minimum_location]
flag = True
return {
"indices": indices,
"projection": proj,
"projection_vectors": method,
"splitpoint": splitpoint,
"split_criterion": split_criterion,
"split_permission": flag,
"color_key": key,
"dendrogram_check": False,
}
@property
def bandwidth_scale(self):
return self._bandwidth_scale
@bandwidth_scale.setter
def bandwidth_scale(self, v):
if v <= 0:
raise ValueError("DePDDP: bandwidth_scale: Should be > 0")
self._bandwidth_scale = v
@property
def percentile(self):
return self._percentile
@percentile.setter
def percentile(self, v):
if v >= 0.5 or v < 0:
raise ValueError("DePDDP: percentile: Should be between [0,0.5) interval")
self._percentile = v
[docs]
class IPDDP(Partition):
"""
Class IPDDP. It executes the iPDDP algorithm.
References
----------
Tasoulis, S. K., Tasoulis, D. K., & Plagianakos, V. P. (2010). Enhancing
principal direction divisive clustering. Pattern Recognition, 43(10), 3391-
3411.
Parameters
----------
decomposition_method : str, (optional)
One of the ('pca', 'kpca', 'ica', 'tsne') supported decomposition
methods used as kernel for the iPDDP algorithm.
max_clusters_number : int, (optional)
Desired maximum number of clusters for the algorithm.
percentile : float, (optional)
The percentile distance from the dataset's edge in which a split can
not occur. [0,0.5) values are allowed.
min_sample_split : int, (optional)
The minimum number of points needed in a cluster for a split to occur.
visualization_utility : bool, (optional)
If (True) generate the data needed by the visualization utilities of
the package otherwise, if false the split_visualization and
interactive_visualization of the package can not be created. For the
'tsne' decomposition method does not support visualization because it
affects the correct execution of the iPDDP algorithm.
distance_matrix : bool, (optional)
If (True) the input data are considered as a distance matrix and not as
a data matrix. The distance matrix is a square matrix with the samples
on the rows and the variables on the columns. The distance matrix is
used only in conjunction with the 'mds' decomposition method and no
other from the supported decomposition methods.
**decomposition_args :
Arguments for each of the decomposition methods ("decomposition.PCA" as
"pca", "decomposition.KernelPCA" as "kpca", "decomposition.FastICA" as
"ica", "manifold.TSNE" as "tsne") utilized by the HiPart package, as
documented in the scikit-learn package, from which they are implemented.
Attributes
----------
output_matrix : numpy.ndarray
Model's step by step execution output.
labels_ :
Extracted clusters from the algorithm.
tree : treelib.Tree
The object which contains all the information about the execution of
the iPDDP algorithm.
samples_number : int
The number of samples contained in the data.
fit_predict(X) :
Returns the results of the fit method in the form of the labels of the
predicted clustering labels.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns. If the distance_matrix is True then X should be a square
distance matrix.
Returns
-------
labels_ : numpy.ndarray
Extracted clusters from the algorithm.
"""
decreasing = True
def __init__(
self,
decomposition_method="pca",
max_clusters_number=100,
percentile=0.1,
min_sample_split=5,
visualization_utility=True,
distance_matrix=False,
**decomposition_args,
):
super().__init__(
decomposition_method,
max_clusters_number,
min_sample_split,
visualization_utility,
distance_matrix,
**decomposition_args,
)
self.percentile = percentile
[docs]
def fit(self, X):
"""
Execute the iPDDP algorithm and return all the execution data in the
form of a IPDDP class object.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns.
Returns
-------
self
A iPDDP class type object, with complete results on the
algorithm's analysis.
"""
self.X = X
self.samples_number = X.shape[0]
# check for the correct form of the input data matrix
if self.distance_matrix:
if X.shape[0] != X.shape[1]:
raise ValueError("dePDDP: distance_matrix: Should be a square matrix")
# create an id vector for the samples of X
indices = np.array([int(i) for i in range(X.shape[0])])
# initialize tree and root node
proj_tree = Tree()
# nodes unique IDs indicator
self.node_ids = 0
# nodes next color indicator (necessary for visualization purposes)
self.cluster_color = 0
# Root initialization
proj_tree.create_node( # step (0)
tag="cl_" + str(self.node_ids),
identifier=self.node_ids,
data=self.calculate_node_data(indices, self.cluster_color),
)
# indicator for the next node to split
selected_node = 0
# if no possibility of split exists on the data
if not proj_tree.get_node(0).data["split_permission"]:
raise RuntimeError("iPDDP: cannot split the data at all!!!")
# Initialize the stopping criterion counter that counts the number
# of clusters
splits = 1
while (selected_node is not None) and (splits < self.max_clusters_number):
self.split_function(proj_tree, selected_node) # step (1 ,2)
# select the next kid for split based on the local minimum density
selected_node = self.select_kid(
proj_tree.leaves(), decreasing=self.decreasing
) # step (3)
splits = splits + 1
self.tree = proj_tree
return self
[docs]
def calculate_node_data(self, indices, key):
"""
Calculation of the projections onto the Principal Components with the
utilization of the "Principal Components Analysis" or the "Kernel
Principal Components Analysis" or the "Independent Component Analysis"
or "t-SNE" methods.
Determination of the projection's maximum distance between to
consecutive points and chooses it as the split-point for this node.
This function leads to the second Stopping criterion 2 of the
algorithm.
Parameters
----------
indices : ndarray of shape (n_samples,)
The index of the samples in the original data matrix.
key : int
The value of the color for each node.
Returns
-------
data : dict
The necessary data for each node which are splitting point.
"""
# Application of the minimum sample number split
# =========================
if indices.shape[0] > self.min_sample_split:
# Apply the decomposition method on the data matrix
if self.distance_matrix:
if self.decomposition_method != "mds":
raise ValueError(
"iPDDP: decomposition_method: Should be 'mds' for distance_matrix"
)
proj_vectors = util.execute_decomposition_method(
data_matrix=util.select_from_distance_matrix(self.X, indices),
decomposition_method=self.decomposition_method,
two_dimentions=self.visualization_utility,
decomposition_args=self.decomposition_args,
)
projection = proj_vectors.fit_transform(
util.select_from_distance_matrix(self.X, indices)
)
else:
proj_vectors = util.execute_decomposition_method(
data_matrix=self.X[indices, :],
decomposition_method=self.decomposition_method,
two_dimentions=self.visualization_utility,
decomposition_args=self.decomposition_args,
)
projection = proj_vectors.fit_transform(self.X[indices, :])
one_dimension = projection[:, 0]
sort_indices = np.argsort(one_dimension)
projection = projection[sort_indices, :]
one_dimension = projection[:, 0]
indices = indices[sort_indices]
quantile_value = np.quantile(
one_dimension, (self.percentile, (1 - self.percentile))
)
within_limits = np.where(
np.logical_and(
one_dimension > quantile_value[0],
one_dimension < quantile_value[1],
)
)[0]
# if there is at least one split the allowed percentile of the data
if within_limits.size > 0:
distances = np.diff(one_dimension[within_limits])
loc = np.where(np.isclose(distances, np.max(distances)))[0][0]
splitpoint = one_dimension[within_limits][loc] + distances[loc] / 2
split_criterion = distances[loc]
flag = True
else:
splitpoint = None
split_criterion = None
flag = False
# =========================
else:
proj_vectors = None
projection = None
splitpoint = None
split_criterion = None
flag = False
return {
"indices": indices,
"projection": projection,
"projection_vectors": proj_vectors,
"splitpoint": splitpoint,
"split_criterion": split_criterion,
"split_permission": flag,
"color_key": key,
"dendrogram_check": False,
}
@property
def percentile(self):
return self._percentile
@percentile.setter
def percentile(self, v):
if v >= 0.5 or v < 0:
raise ValueError("IPDDP: percentile: Should be between [0,0.5) interval")
self._percentile = v
[docs]
class KMPDDP(Partition):
"""
Class KMPDDP. It executes the kMeans-PDDP algorithm.
References
----------
Zeimpekis, D., & Gallopoulos, E. (2008). Principal direction divisive
Partition with kernels and k-means steering. In Survey of Text Mining
II (pp. 45-64). Springer, London.
Parameters
----------
decomposition_method : str, (optional)
One of the ('pca', 'kpca', 'ica', 'tsne') supported decomposition
methods used as kernel for the kMeans-PDDP algorithm.
max_clusters_number : int, (optional)
Desired maximum number of clusters for the algorithm.
min_sample_split : int, (optional)
The minimum number of points needed in a cluster for a split to occur.
visualization_utility : bool, (optional)
If (True) generate the data needed by the visualization utilities of
the package otherwise, if false the split_visualization and
interactive_visualization of the package can not be created. For the
'tsne' decomposition method does not support visualization because it
affects the correct execution of the kMeans-PDDP algorithm.
distance_matrix : bool, (optional)
If (True) the input data are considered as a distance matrix and not as
a data matrix. The distance matrix is a square matrix with the samples
on the rows and the variables on the columns. The distance matrix is
used only in conjunction with the 'mds' decomposition method and no
other from the supported decomposition methods.
random_state : int, (optional)
The random seed fed in the k-Means algorithm
**decomposition_args :
Arguments for each of the decomposition methods ("decomposition.PCA" as
"pca", "decomposition.KernelPCA" as "kpca", "decomposition.FastICA" as
"ica", "manifold.TSNE" as "tsne") utilized by the HiPart package, as
documented in the scikit-learn package, from which they are implemented.
Attributes
----------
output_matrix : numpy.ndarray
Model's step by step execution output.
labels_ :
Extracted clusters from the algorithm.
tree : treelib.Tree
The object which contains all the information about the execution of
the iPDDP algorithm.
samples_number : int
The number of samples contained in the data.
fit_predict(X) :
Returns the results of the fit method in the form of the labels of the
predicted clustering labels.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns. If the distance_matrix is True then X should be a square
distance matrix.
Returns
-------
labels_ : numpy.ndarray
Extracted clusters from the algorithm.
"""
decreasing = True
def __init__(
self,
decomposition_method="pca",
max_clusters_number=100,
min_sample_split=15,
visualization_utility=True,
distance_matrix=False,
random_state=None,
**decomposition_args,
):
super().__init__(
decomposition_method,
max_clusters_number,
min_sample_split,
visualization_utility,
distance_matrix,
**decomposition_args,
)
self.random_state = random_state
[docs]
def fit(self, X):
"""
Execute the kM-PDDP algorithm and return all the execution data in the
form of a kM_PDDP class object.
Parameters
----------
X: numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns.
Returns
-------
self : object
A kM-PDDP class type object, with complete results on the
algorithm's analysis.
"""
self.X = X
self.samples_number = np.size(X, 0)
if self.distance_matrix:
if X.shape[0] != X.shape[1]:
raise ValueError("dePDDP: distance_matrix: Should be a square matrix")
# create an id vector for the samples of X
indices = np.arange(self.samples_number)
# Variable initializations
bk_tree = Tree()
self.node_ids = 0
self.cluster_color = 0
selected_node = 0
found = 1
# Root node initialization
bk_tree.create_node(
tag="cl_" + str(self.node_ids),
identifier=self.node_ids,
data=self.calculate_node_data(indices, self.cluster_color),
)
# if no possibility of split exists on the data
if not bk_tree.get_node(0).data["split_permission"]:
raise RuntimeError("KMPDDP: cannot split the data at all!!!")
while (selected_node is not None) and (found < self.max_clusters_number):
# Split the selected node in two parts
self.split_function(bk_tree, selected_node)
# select the next kid for split based on the local minimum density
selected_node = self.select_kid(
bk_tree.leaves(), decreasing=self.decreasing
)
# every split adds one new cluster
found = found + 1
self.tree = bk_tree
return self
[docs]
def calculate_node_data(self, indices, key):
"""
Calculation of the projections onto the Principal Components with the
utilization of the "Principal Components Analysis" or the "Kernel
Principal Components Analysis" or the "Independent Component Analysis"
or "t-SNE" methods.
Determination of the projection's clusters by utilizing the binary
k-means clustering algorithm.
This function leads to the second Stopping criterion 2 of the
algorithm.
Parameters
----------
indices : numpy.ndarray
The index of the samples in the original data matrix.
key : int
The value of the color for each node.
Returns
-------
data : dict
The necessary data for each node which are splitting point
"""
# if the number of samples
if indices.shape[0] > self.min_sample_split:
# Apply the decomposition method on the data matrix
if self.distance_matrix:
if self.decomposition_method != "mds":
raise ValueError(
"KMPDDP: decomposition_method: Should be 'mds' for distance_matrix"
)
method = util.execute_decomposition_method(
data_matrix=util.select_from_distance_matrix(self.X, indices),
decomposition_method=self.decomposition_method,
two_dimentions=self.visualization_utility,
decomposition_args=self.decomposition_args,
)
projection = method.fit_transform(
util.select_from_distance_matrix(self.X, indices)
)
split_criterion = np.linalg.norm(projection, ord="fro")
else:
method = util.execute_decomposition_method(
data_matrix=self.X[indices, :],
decomposition_method=self.decomposition_method,
two_dimentions=self.visualization_utility,
decomposition_args=self.decomposition_args,
)
projection = method.fit_transform(self.X[indices, :])
# Total scatter value calculation for the selection of the next
# cluster to split.
centered = util.center_data(self.X[indices, :])
split_criterion = np.linalg.norm(centered, ord="fro")
one_dimension = np.array([[i] for i in projection[:, 0]])
model = KMeans(n_clusters=2, n_init=10, random_state=self.random_state)
labels = model.fit_predict(one_dimension)
centers = model.cluster_centers_
# Labels for the split selection
label_zero = np.where(labels == 0)
label_one = np.where(labels == 1)
# The indices of the
left_child = indices[label_zero]
right_child = indices[label_one]
right_min = np.min(one_dimension[label_one])
left_max = np.max(one_dimension[label_zero])
if left_max > right_min:
right_min = np.min(one_dimension[label_zero])
left_max = np.max(one_dimension[label_one])
splitpoint = left_max + ((right_min - left_max) / 2)
flag = True
# =========================
else:
left_child = None # (ST2)
right_child = None # (ST2)
projection = None # (ST2)
method = None # (ST2)
centers = None # (ST2)
splitpoint = None # (ST2)
split_criterion = None # (ST2)
flag = False # (ST2)
return {
"indices": indices,
"left_indices": left_child,
"right_indices": right_child,
"projection": projection,
"projection_vectors": method,
"centers": centers,
"splitpoint": splitpoint,
"split_criterion": split_criterion,
"split_permission": flag,
"color_key": key,
"dendrogram_check": False,
}
@property
def random_state(self):
return self._random_seed
@random_state.setter
def random_state(self, v):
if v is not None and (not isinstance(v, int)):
raise ValueError(
"KMPDDP: min_sample_split: Invalid value it should be int and > 1"
)
self._random_seed = v
[docs]
class PDDP(Partition):
"""
Class PDDP. It executes the PDDP algorithm.
References
----------
Boley, D. (1998). Principal direction divisive Partition. Data mining
and knowledge discovery, 2(4), 325-344.
Parameters
----------
decomposition_method : str, (optional)
One of the ('pca', 'kpca', 'ica', 'tsne') supported decomposition
methods used as kernel for the PDDP algorithm.
max_clusters_number : int, (optional)
Desired maximum number of clusters for the algorithm.
min_sample_split : int, (optional)
The minimum number of points needed in a cluster for a split to occur.
visualization_utility : bool, (optional)
If (True) generate the data needed by the visualization utilities of
the package otherwise, if false the split_visualization and
interactive_visualization of the package can not be created. For the
'tsne' decomposition method does not support visualization because it
affects the correct execution of the PDDP algorithm.
distance_matrix : bool, (optional)
If (True) the input data are considered as a distance matrix and not as
a data matrix. The distance matrix is a square matrix with the samples
on the rows and the variables on the columns. The distance matrix is
used only in conjunction with the 'mds' decomposition method and no
other from the supported decomposition methods.
**decomposition_args :
Arguments for each of the decomposition methods ("decomposition.PCA" as
"pca", "decomposition.KernelPCA" as "kpca", "decomposition.FastICA" as
"ica", "manifold.TSNE" as "tsne") utilized by the HiPart package, as
documented in the scikit-learn package, from which they are implemented.
Attributes
----------
output_matrix : numpy.ndarray
Model's step by step execution output.
labels_ :
Extracted clusters from the algorithm.
tree : treelib.Tree
The object which contains all the information about the execution of
the iPDDP algorithm.
samples_number : int
The number of samples contained in the data.
fit_predict(X) :
Returns the results of the fit method in the form of the labels of the
predicted clustering labels.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns. If the distance_matrix is True then X should be a square
distance matrix.
Returns
-------
labels_ : numpy.ndarray
Extracted clusters from the algorithm.
"""
decreasing = True
def __init__(
self,
decomposition_method="pca",
max_clusters_number=100,
min_sample_split=5,
visualization_utility=True,
distance_matrix=False,
**decomposition_args,
):
super().__init__(
decomposition_method,
max_clusters_number,
min_sample_split,
visualization_utility,
distance_matrix,
**decomposition_args,
)
[docs]
def fit(self, X):
"""
Execute the PDDP algorithm and return all the execution data in the
form of a PDDP class object.
Parameters
----------
X: numpy.ndarray
Data matrix (must check and return an error if not).
Returns
-------
self
A PDDP class type object, with complete results on the algorithm's
analysis.
"""
self.X = X
self.samples_number = X.shape[0]
if self.distance_matrix:
if X.shape[0] != X.shape[1]:
raise ValueError("dePDDP: distance_matrix: Should be a square matrix")
# create an id vector for the samples of X
indices = np.arange(X.shape[0])
# initialize tree and root node # step (0)
tree = Tree()
# nodes unique IDs indicator
self.node_ids = 0
# nodes next color indicator (necessary for visualization purposes)
self.cluster_color = 0
tree.create_node(
tag="cl_" + str(self.node_ids),
identifier=self.node_ids,
data=self.calculate_node_data(indices, self.cluster_color),
)
# indicator for the next node to split
selected = 0
# if no possibility of split exists on the data # (ST2)
if not tree.get_node(0).data["split_permission"]:
raise RuntimeError("PDDP: cannot split the data at all!!!")
# Initialize the ST1 stopping criterion counter that count the number
# of clusters # (ST1)
counter = 1
while (selected is not None) and (
counter < self.max_clusters_number
): # (ST1) or (ST2)
self.split_function(tree, selected) # step (1)
# select the next kid for split based on the local minimum density
selected = self.select_kid(
tree.leaves(), decreasing=self.decreasing
) # step (2)
counter = counter + 1 # (ST1)
self.tree = tree
return self
[docs]
def calculate_node_data(self, indices, key):
"""
Calculation of the projections onto the Principal Components with the
utilization of the "Principal Components Analysis" or the "Kernel
Principal Components Analysis" or the "Independent Component Analysis"
or "t-SNE" methods.
The projection's clusters are split on the median pf the projected
data.
This function leads to the second Stopping criterion 2 of the
algorithm.
Parameters
----------
indices : numpy.ndarray
The index of the samples in the original data matrix.
key : int
The value of the color for each node.
Returns
-------
data : dictionary
The necessary data for each node which are splitting point.
"""
projection_vectors = None
projection = None
splitpoint = None
split_criterion = None
flag = False
# if the number of samples
if indices.shape[0] > self.min_sample_split:
# Apply the decomposition method on the data matrix
if self.distance_matrix:
if self.decomposition_method != "mds":
raise ValueError(
"PDDP: decomposition_method: Should be 'mds' for distance_matrix"
)
projection_vectors = util.execute_decomposition_method(
data_matrix=util.select_from_distance_matrix(self.X, indices),
decomposition_method=self.decomposition_method,
two_dimentions=self.visualization_utility,
decomposition_args=self.decomposition_args,
)
projection = projection_vectors.fit_transform(
util.select_from_distance_matrix(self.X, indices)
)
# Total scatter value calculation for the selection of the next
# cluster to split.
scat = np.linalg.norm(projection, ord="fro")
else:
centered = util.center_data(self.X[indices, :])
# execute pca on the data matrix
projection_vectors = util.execute_decomposition_method(
data_matrix=centered,
decomposition_method=self.decomposition_method,
two_dimentions=self.visualization_utility,
decomposition_args=self.decomposition_args,
)
projection = projection_vectors.fit_transform(centered)
# Total scatter value calculation for the selection of the next
# cluster to split.
scat = np.linalg.norm(centered, ord="fro")
if not np.allclose(scat, 0):
splitpoint = 0.0
split_criterion = scat
flag = True
return {
"indices": indices,
"projection": projection,
"projection_vectors": projection_vectors,
"splitpoint": splitpoint,
"split_criterion": split_criterion,
"split_permission": flag,
"color_key": key,
"dendrogram_check": False,
}
[docs]
class BisectingKmeans(Partition):
"""
Class BisectingKmeans. It executes the bisecting k-Means algorithm.
References
----------
Savaresi, S. M., & Boley, D. L. (2001, April). On the performance of
bisecting K-means and PDDP. In Proceedings of the 2001 SIAM International
Conference on Data Mining (pp. 1-14). Society for Industrial and Applied
Mathematics.
Parameters
----------
max_clusters_number : int, (optional)
Desired maximum number of clusters for the algorithm.
min_sample_split : int, (optional)
The minimum number of points needed in a cluster for a split to occur.
random_state : int, (optional)
The random seed fed in the k-Means algorithm.
Attributes
----------
output_matrix : numpy.ndarray
Model's step by step execution output.
labels_ : numpy.ndarray
Extracted clusters from the algorithm.
tree : treelib.Tree
The object which contains all the information about the execution of
the bisecting k-Means algorithm.
samples_number : int
The number of samples contained in the data.
fit_predict(X) :
Returns the results of the fit method in the form of the labels of the
predicted clustering labels.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns. If the distance_matrix is True then X should be a square
distance matrix.
Returns
-------
labels_ : numpy.ndarray
Extracted clusters from the algorithm.
"""
decreasing = True
def __init__(self, max_clusters_number=100, min_sample_split=5, random_state=None):
super().__init__(
max_clusters_number=max_clusters_number,
min_sample_split=min_sample_split,
)
self.random_state = random_state
[docs]
def fit(self, X):
"""
Execute the BisectingKmeans algorithm and return all the execution
data in the form of a BisectingKmeans class object.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns.
Returns
-------
self
A BisectingKmeans class type object, with complete results on the
algorithm's analysis.
"""
self.X = X
self.samples_number = X.shape[0]
# create an id vector for the samples of X
indices = np.array([int(i) for i in range(np.size(self.X, 0))])
# initialize tree and root node # step (0)
tree = Tree()
# nodes` unique IDs indicator
self.node_ids = 0
# nodes` next color indicator (necessary for visualization purposes)
self.cluster_color = 0
tree.create_node(
tag="cl_" + str(self.node_ids),
identifier=self.node_ids,
data=self.calculate_node_data(indices, self.cluster_color),
)
# indicator for the next node to split
selected_node = 0
# if no possibility of split exists on the data
if not tree.get_node(0).data["split_permission"]:
print("cannot split at all")
return self
# Initialize the ST1 stopping criterion counter that count the number
# of clusters.
found_clusters = 1
while (selected_node is not None) and (
found_clusters < self.max_clusters_number
):
self.split_function(tree, selected_node) # step (1)
# select the next kid for split based on the local minimum density
selected_node = self.select_kid(
tree.leaves(), decreasing=self.decreasing
) # step (2)
found_clusters = found_clusters + 1 # (ST1)
self.tree = tree
return self
[docs]
def split_function(self, tree, selected_node):
"""
Split the indicated node by clustering the data with a binary k-means
clustering algorithm.
Because python passes by reference data this function doesn't need a
return statement.
Parameters
----------
tree : treelib.tree.Tree
The tree build by the BisectingKmeans algorithm, in order to
cluster the input data.
selected_node : int
The numerical identifier for the tree node that i about to be split.
Returns
-------
There is no returns in this function. The results of this function
pass to execution by utilizing the python's pass-by-reference
nature.
"""
node = tree.get_node(selected_node)
node.data["split_permission"] = False
# left child indices extracted from the nodes split-point and the
# indices included in the parent node
left_index = node.data["left_indices"]
# right child indices
right_index = node.data["right_indices"]
# Nodes and data creation for the children
# Uses the calculate_node_data function to create the data for the node
tree.create_node(
tag="cl" + str(self.node_ids + 1),
identifier=self.node_ids + 1,
parent=node.identifier,
data=self.calculate_node_data(
left_index,
node.data["color_key"],
),
)
tree.create_node(
tag="cl" + str(self.node_ids + 2),
identifier=self.node_ids + 2,
parent=node.identifier,
data=self.calculate_node_data(
right_index,
self.cluster_color + 1,
),
)
self.cluster_color += 1
self.node_ids += 2
[docs]
def calculate_node_data(self, indices, key):
"""
Execution of the binary k-Means algorithm on the samples presented by
the data_matrix. The two resulted clusters are the two new clusters if
the leaf is chosen to be split. And calculation of the splitting
criterion.
Parameters
----------
indices : numpy.ndarray
The index of the samples in the original data matrix.
data_matrix : numpy.ndarray
The data matrix containing all the data for the samples.
key : int
The value of the color for each node.
Returns
-------
data : dict
The necessary data for each node which are splitting point.
"""
# if the number of samples
if indices.shape[0] > self.min_sample_split:
model = KMeans(n_clusters=2, n_init=10, random_state=self.random_state)
labels = model.fit_predict(self.X[indices, :])
centers = model.cluster_centers_
left_child = indices[np.where(labels == 0)]
right_child = indices[np.where(labels == 1)]
centers = centers
centered = util.center_data(self.X[indices, :])
# Total scatter value calculation for the selection of the next
# cluster to split.
scat = np.linalg.norm(centered, ord="fro")
split_criterion = scat
flag = True
# =========================
else:
left_child = None # (ST2)
right_child = None # (ST2)
centers = None # (ST2)
split_criterion = None # (ST2)
flag = False # (ST2)
return {
"indices": indices,
"left_indices": left_child,
"right_indices": right_child,
"centers": centers,
"split_criterion": split_criterion,
"split_permission": flag,
"color_key": key,
"dendrogram_check": False,
}
@property
def random_state(self):
return self._random_seed
@random_state.setter
def random_state(self, v):
if v is not None and (not isinstance(v, int)):
raise ValueError(
"BisectingKmeans: min_sample_split: Invalid value it should be int and > 1"
)
np.random.seed(v)
self._random_seed = v
[docs]
class MDH(Partition):
"""
Class MDH. It executes the MDH algorithm.
References
----------
Pavlidis, N. G., Hofmeyr, D. P., & Tasoulis, S. K. (2016). Minimum density
hyperplanes. Journal of Machine Learning Research, 17 (156), 1-33.
Parameters
----------
max_clusters_number : int, optional
Desired maximum number of clusters to find the MDH algorithm.
max_iterations : int, optional
Maximum number of iterations on the search for the minimum density
hyperplane.
k : float, optional
The multiples of the standard deviation which the existence of a
splitting hyperplane is allowed. The default value is 2.3.
percentile : float, optional
The percentile distance from the dataset's edge in which a split can
not occur. [0,0.5) values are allowed.
min_sample_split : int, optional
The minimum number of points needed in a cluster for a split to occur.
random_state : int, optional
The random seed to be used in the algorithm's execution.
Attributes
----------
output_matrix : numpy.ndarray
Model's step by step execution output.
labels_ : numpy.ndarray
Extracted clusters from the algorithm.
tree : treelib.Tree
The object which contains all the information about the execution of
the MDH algorithm.
samples_number : int
The number of samples contained in the data.
fit_predict(X) :
Returns the results of the fit method in the form of the labels of the
predicted clustering labels.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns. If the distance_matrix is True then X should be a square
distance matrix.
Returns
-------
labels_ : numpy.ndarray
Extracted clusters from the algorithm.
"""
decreasing = True
def __init__(
self,
max_clusters_number=100,
max_iterations=10,
k=2.3,
percentile=0.1,
min_sample_split=5,
random_state=None,
):
super().__init__(
max_clusters_number=max_clusters_number,
min_sample_split=min_sample_split,
)
self.k = k
self.max_iterations = max_iterations
self.percentile = percentile
self.random_state = random_state
[docs]
def fit(self, X):
"""
Execute the MDH algorithm and return all the execution data in the form
of a MDH class object.
Parameters
----------
X : numpy.ndarray
Data matrix with the samples on the rows and the variables on the
columns.
Returns
-------
self
A MDH class type object, with complete results on the algorithm's
analysis.
"""
# initialize the random seed
np.random.seed(self.random_state)
# initialize the data matrix and the number of samples
self.X = X
self.samples_number = np.size(X, 0)
# create an id vector for the samples of X
indices = np.array([int(i) for i in range(self.samples_number)])
# initialize the tree and root node # step (0)
den_tree = Tree()
# nodes unique IDs indicator
self.node_ids = 0
# nodes next color indicator (necessary for visualization purposes)
self.cluster_color = 0
den_tree.create_node(
tag="cl_" + str(self.node_ids),
identifier=self.node_ids,
data=self.calculate_node_data(indices, self.cluster_color),
)
# indicator for the next node to split
selected_node = 0
# if no possibility of split exists on the data # (ST2)
if not den_tree.get_node(0).data["split_permission"]:
raise RuntimeError("MDH: cannot split the data at all!!!")
# Initialize the ST1 stopping criterion counter that count the number
# of clusters # (ST1)
found_clusters = 1
while (found_clusters < self.max_clusters_number) and (
selected_node is not None
): # (ST1) or (ST2)
self.split_function(den_tree, selected_node) # step (1, 2)
# select the next kid for split based on the local minimum density
selected_node = self.select_kid(
den_tree.leaves(), decreasing=self.decreasing
) # step (3)
found_clusters = found_clusters + 1 # (ST1)
self.tree = den_tree
return self
[docs]
def calculate_node_data(self, indices, key):
"""
Find a minimum density hyperplane to bisect the data. The determination
of the minimum density hyperplane is based on the minimization is found
by minimizing the first derivative of the density function. This is made
possible through the use of "Sequential Quadratic Programming" (SQP)
method, which is used to simultaneously find the optimal projection
vector v and minimum density point b.
This function leads to the second Stopping criterion 2 of the
algorithm.
Parameters
----------
indices : numpy.ndarray
The index of the samples in the original data matrix.
key : int
The value of the color for each node.
Returns
-------
data : dict
The necessary data for each node which are splitting point.
"""
# Initialization of the return variables
projection = None
splitpoint = None
split_criterion = None
flag = False
split_vector = None
# If the number of samples in the node is greater than the minimum allowed for split
if indices.shape[0] > self.min_sample_split:
node_data = self.X[indices, :]
node_size = node_data.shape[0]
# Normalize the data of the node to zero mean and unit standard deviation
node_data = (node_data - np.mean(node_data, 0)) / np.std(node_data, 0)
minC = (
100
if node_size * self.percentile > 100
else node_size * self.percentile
)
solutions = []
for _ in range(0, self.max_iterations):
# Generate a random vector in the space of the node's data and
# normalize it to unit length
# v_n_b: vector v and point b
initial_v_n_b = stats.norm.rvs(size=np.shape(node_data)[1])
initial_v_n_b = initial_v_n_b / np.linalg.norm(initial_v_n_b)
initial_v_n_b = np.append(initial_v_n_b, 0)
# Find the minimum density point of the data on the projection
# direction
minimum_b = util.initialize_b(initial_v_n_b, node_data, depth_init=True)
if minimum_b:
initial_v_n_b[-1] = minimum_b
# res has the following fields that are of interest:
# 1. success (whether algorithm terminated successfully)
# 2. x (solution)
# 3. nfev (number of function evaluations)
# 4. njev (number of jacobian/ gradient evaluations)
results, depth = util.md_sqp(initial_v_n_b, node_data, self.k)
# If the algorithm terminated successfully try to append the solution
if results.success:
v = results.x[:-1] / np.linalg.norm(results.x[:-1])
projection = np.dot(node_data, v).reshape(-1, 1)
b = results.x[-1]
c0 = np.sum(projection > b)
# Solutions in the edges of the projection are not acceptable
if min(c0, node_size - c0) >= minC:
solutions.append((v, b, depth))
# Find the solution with the minimum depth
if solutions:
split = min(solutions, key=lambda x: x[2])
if split:
splitpoint = split[1]
projection = np.dot(node_data, split[0]).reshape(-1, 1)
split_criterion = indices.shape[0]
flag = True
split_vector = split[0]
return {
"indices": indices,
"projection": projection,
"projection_vectors": split_vector,
"splitpoint": splitpoint,
"split_criterion": split_criterion,
"split_permission": flag,
"color_key": key,
"dendrogram_check": False,
}
@property
def max_iterations(self):
return self._max_iterations
@max_iterations.setter
def max_iterations(self, v):
if v < 0 or (not isinstance(v, int)):
raise ValueError(
"MDH: max_iteration: Invalid value it should be int and > 1"
)
self._max_iterations = v
@property
def k(self):
return self._k
@k.setter
def k(self, v):
if v < 0 or (not isinstance(v, float)):
raise ValueError("MDH: k: Invalid value it should be float and > 1")
self._k = v
@property
def percentile(self):
return self._percentile
@percentile.setter
def percentile(self, v):
if v >= 0.5 or v < 0:
raise ValueError("MDH: percentile: Should be between [0,0.5) interval")
self._percentile = v
@property
def random_state(self):
return self._random_state
@random_state.setter
def random_state(self, v):
if v is not None and (not isinstance(v, int)):
raise ValueError(
"MDH: min_sample_split: Invalid value it should be int and > 1"
)
np.random.seed(v)
self._random_state = v