#!/usr/bin/env python # # file: $ISIP_EXP/demos/exp_0007/v0.0.6/imld_alg_kmeans.py # # revision history: # 20200811 (LV): standardization, completion # 20200505 (SJ): initial version # # This file contains the AlgorithmKMeans class which implements the K-Means # machine learning algorithm for the ISIP Machine Learning Demo software #------------------------------------------------------------------------------ # import system modules # import numpy as np # import KMeans tools # from sklearn.cluster import KMeans # Import PyQT module for UI # from PyQt5 import QtCore, QtGui, QtWidgets # class: AlgorithmKMeans # # This class performs the KMeans algorithm on the stored training and # evaluation data. # class AlgorithmKMeans(): # method: AlgorithmKMeans::constructor # def __init__(self, input_display, eval_display, process_box): self.input = input_display self.eval = eval_display self.process = process_box self.iter_count = 1 self.iteration = 8 self.clusters = 4 self.kmeans = KMeans() self.class_num = np.empty((0, 0)) self.X = np.empty((0, 0)) self.classes = 0 self.mu = np.empty((0, 0)) self.support_region = np.empty((0, 0)) self.step_index = 0 self.trans_matrix = 0 self.data = False self.Z = None # check what step process is at currently and execute step # def run(self): if self.step_index == 1: self.step1() elif self.step_index == 2: self.step2() elif self.step_index == 3: self.step3() self.process.append(" *** Algorithm Complete ***\n") # method: AlogirhtmKMeans::get_step_index # # arguments: # self: class instance # # return: # finished: boolean value to indicate status # # This method checks whether an algorithm has finished or not and returns # this info. # def get_step_index(self): if self.step_index > 2: finished = True else: finished = False return finished # method: AlgorithmKMeans::increment_step # # This method increments the step index. # def increment_step(self): self.step_index = self.step_index + 1 # # end of method # method: AlgorithmKMeans::reset_step # # This method resets the step index to zero. # def reset_step(self): self.step_index = 0 def initialize(self): text = "Algorithm: K-Nearest Neighbor:" self.process.append(text) text = "\n *** Initializing the Algorithm ***\n" self.process.append(text) self.iter_count = 1 self.iteration = 8 self.clusters = 4 self.kmeans = KMeans(n_clusters=self.clusters, max_iter=self.iteration) self.class_num = np.empty((0, 0)) self.X = np.empty((0, 0)) self.classes = 0 self.mu = np.empty((0, 0)) self.support_region = np.empty((0, 0)) self.step_index = 0 self.trans_matrix = 0 self.data = False # method: AlgorithmKMeans::step 1 # # This algorithm communicates with the user via the process description # and invokes the compute_mean method. # def step1(self): # display text in the process description window # text = " 1. Computing the Statistics.\n" +\ " Means:" self.process.append(text) self.compute_mean() self.input.canvas.draw_idle() def step2(self): # display text in the process description window # if self.iter_count == 1: text = " 2. Stepping through iterations.\n" self.process.append(text) self.compute_cluster_mean() self.process.repaint() if self.iter_count != self.iteration: self.step_index = 1 def step3(self): # display text in the process description window # text = " 3. Drawing the Decision Surface(s).\n" self.process.append(text) self.plot_decision_regions(self.X, self.kmeans, self.input) # display text in the process description window # text = " 4. Computing Error Rates:\n " self.process.append(text) text = " Train:" self.process.append(text) self.compute_errors(self.input) text = " Eval:" self.process.append(text) #self.classify_eval() # set step_index back to 0 # self.input.reset_classes() def extract_data(self): self.classes = 0 observed_classes = np.empty((0, 0)) if len(self.input.canvas.axes.collections) == 0: return False else: self.data = True index = np.empty((0,0),dtype="int64") collection_index = 0 for collections in self.input.canvas.axes.collections: # retrieve the x,y and gid of the collection # try: current_gid = collections.get_gid()[0][0] except TypeError: current_gid = collections.get_gid() if current_gid is not None: data = np.array(collections.get_offsets()) classes = np.array(collections.get_gid()) self.class_num = np.append(self.class_num, classes) self.X = np.append(self.X, data) if current_gid not in observed_classes: self.classes = self.classes + 1 observed_classes = np.append(observed_classes, current_gid) index = np.append(index,collection_index) else: for collects in self.input.canvas.axes.collections: if current_gid in collects.get_gid(): save_sets = collects.get_offsets() current_set = collections.get_offsets() final_set = np.vstack((save_sets, current_set)) collects.set_offsets(final_set) collects.set_gid(final_set.shape[0], current_gid) break collection_index = collection_index + 1 for means in self.input.canvas.axes.collections: mu = np.mean(means.get_offsets(),axis=0) self.mu = np.append(self.mu,mu) self.mu = np.reshape(self.mu, (self.classes,2)) remove_collections = np.array(self.input.canvas.axes.collections)[index].tolist() self.input.canvas.axes.collections = remove_collections self.X = np.reshape(self.X, (-1, 2)) self.class_num = np.ndarray.flatten(self.class_num) self.kmeans.fit(self.X, self.class_num) return True def compute_cluster_mean(self): # display text in the process description window # text = " Iteration {}\n".format(self.iter_count) self.process.append(text) k_means_iter = KMeans(n_clusters=4,max_iter=self.iter_count).fit(self.X,self.class_num) for clusters in range(0,self.clusters): text = " Mean for cluster {}: {}\n".format(clusters,k_means_iter.cluster_centers_[clusters]) self.process.append(text) cov = np.cov(self.X[np.where(k_means_iter.labels_ == clusters)],rowvar=False) # display text in the process description window # text = " Covariance matrix:\n" self.process.append(text) a11 = str(round(cov[0][0], 2)) a12 = str(round(cov[0][1], 2)) a21 = str(round(cov[1][0], 2)) a22 = str(round(cov[1][1], 2)) text = (" " + a11 + " " + a12 + "\n" + " " + a21 + " " + a22 + "\n") self.process.append(text) self.iter_count = self.iter_count + 1 def compute_mean(self): for means in range(self.classes): text = " Class {}: {}".format(means + 1, self.mu[means]) self.process.append(text) self.input.canvas.axes.scatter(self.mu[:,0], self.mu[:,1], facecolors='none',edgecolors='black', s=8) def plot_decision_regions(self, X, clf, display): res = (display.canvas.axes.get_xlim()[1] - display.canvas.axes.get_ylim()[0]) / 100 x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1 y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1 xx, yy = np.meshgrid(np.arange(x_min, x_max, res), np.arange(y_min, y_max, res)) Z = clf.predict(np.c_[xx.ravel(), yy.ravel()]) Z = Z.reshape(xx.shape) display.canvas.axes.contourf(xx, yy, Z, alpha=0.4) display.canvas.draw_idle() def compute_errors(self,display): # initialize variables # samples = 0 incorrect1 = 0 incorrect2 = 0 incorrect3 = 0 incorrect4 = 0 for collection in range(self.classes): size = display.canvas.axes.collections[collection].get_offsets().shape[0] samples = samples + size # display text in the process description window # text = " Number of samples: {}".format(samples) self.process.append(text) if self.classes == 4: text = " Ref/Hyp: {:10s}{:10s}{:10s}{:10s}".format("Class 1","Class 2","Class 3","Class 4") elif self.classes == 3: text = " Ref/Hyp: Class 1 Class 2 Class 3" elif self.classes == 2: text = " Ref/Hyp: {:10s}{:10s}".format("Class 1","Class 2") elif self.classes == 1: text = " Ref/Hyp: Class 1 " self.process.append(text) # computes the classification error for the first set # if self.classes > 0: data = np.array(display.canvas.axes.collections[0].get_offsets()) size = data.shape[0] predicted = [0, 0, 0, 0] expected = display.canvas.axes.collections[0].get_gid()[0][0] for i in range(size): prediction = int(self.kmeans.predict(np.reshape(data[i], (1, -1)))[0]) if prediction != expected: predicted[prediction] = predicted[prediction] + 1 incorrect1 = incorrect1 + 1 else: predicted[prediction] = predicted[prediction] + 1 text = " Class 1:" for classes in range(self.classes): text = text + "{:12d}".format(predicted[classes]) self.process.append(text) # computes the classification error for the second set # text = " Class 2:" if self.classes > 1: data = np.array(display.canvas.axes.collections[1].get_offsets()) size = data.shape[0] predicted = [0, 0, 0, 0] expected = display.canvas.axes.collections[1].get_gid()[0][0] for i in range(size): prediction = int(self.kmeans.predict(np.reshape(data[i], (1, -1)))[0]) if prediction != expected: predicted[prediction] = predicted[prediction] + 1 incorrect2 = incorrect2 + 1 else: predicted[prediction] = predicted[prediction] + 1 for classes in range(self.classes): text = text + "{:12d}".format(predicted[classes]) self.process.append(text) # computes the classification error for the third set # text = " Class 3:" if self.classes > 2: data = np.array(display.canvas.axes.collections[2].get_offsets()) size = data.shape[0] predicted = [0, 0, 0, 0] expected = display.canvas.axes.collections[2].get_gid()[0][0] for i in range(size): prediction = int(self.kmeans.predict(np.reshape(data[i], (1, -1)))[0]) if prediction != expected: predicted[prediction] = predicted[prediction] + 1 incorrect3 = incorrect3 + 1 else: predicted[prediction] = predicted[prediction] + 1 for classes in range(self.classes): text = text + "{:12d}".format(predicted[classes]) self.process.append(text) # computes the classification error for the fourth set # text = " Class 4:" if self.classes > 3: data = np.array(display.canvas.axes.collections[3].get_offsets()) size = data.shape[0] predicted = [0, 0, 0, 0] expected = display.canvas.axes.collections[3].get_gid()[0][0] for i in range(size): prediction = int(self.kmeans.predict(np.reshape(data[i], (1, -1)))[0]) if prediction != expected: predicted[prediction] = predicted[prediction] + 1 incorrect4 = incorrect4 + 1 else: predicted[prediction] = predicted[prediction] + 1 for classes in range(self.classes): text = text + "{:12d}".format(predicted[classes]) self.process.append(text) incorrect = incorrect1 + incorrect2 + incorrect3 + incorrect4 error = (incorrect / samples) * 100 text = "\n Error Rate = {} / {} = {:.2f}%\n".format(incorrect,samples,error) self.process.append(text) # def compute_errors(self, display): # # initialize variables # # # samples = 0 # # incorrect1 = 0 # incorrect2 = 0 # incorrect3 = 0 # incorrect4 = 0 # # # computes the classification error for the first set # # # if self.classes > 0: # data = np.array(display.canvas.axes.collections[0].get_offsets()) # size = data.shape[0] # samples = samples + size # samples1 = size # # for i in range(size): # prediction = int(self.kmeans.predict(np.reshape(data[i], (1, -1)))[0]) # if prediction != expected: # incorrect1 = incorrect1 + 1 # # error = (incorrect1 / samples1) * 100 # # text = ( # " Results for class 0:\n" # + " Total number of samples: " # + str(samples1) # + "\n" # + " Misclassified samples: " # + str(incorrect1) # + "\n" # + " Classification error: " # + str(round(error, 2)) # + "%") # self.process.append(text) # # # computes the classification error for the second set # # # if self.classes > 1: # data = np.array(display.canvas.axes.collections[1].get_offsets()) # size = data.shape[0] # samples = samples + size # samples2 = size # expected = 1 # # for i in range(size): # prediction = self.mahalanobis_classifier_point(data[i], self.pcas) # if prediction != expected: # incorrect2 = incorrect2 + 1 # # error = (incorrect2 / samples2) * 100 # # text = ( # " Results for class 1:\n" # + " Total number of samples: " # + str(samples2) # + "\n" # + " Misclassified samples: " # + str(incorrect2) # + "\n" # + " Classification error: " # + str(round(error, 2)) # + "%") # self.process.append(text) # # # computes the classification error for the third set # # # if self.classes > 2: # data = np.array(display.canvas.axes.collections[2].get_offsets()) # size = data.shape[0] # samples = samples + size # samples3 = size # expected = 2 # # for i in range(size): # prediction = self.mahalanobis_classifier_point(data[i], self.pcas) # if prediction != expected: # incorrect3 = incorrect3 + 1 # # error = (incorrect3 / samples3) * 100 # # text = ( # " Results for class 2:\n" # + " Total number of samples: " # + str(samples3) # + "\n" # + " Misclassified samples: " # + str(incorrect3) # + "\n" # + " Classification error: " # + str(round(error, 2)) # + "%") # self.process.append(text) # # # computes the classification error for the fourth set # # # if self.classes > 3: # data = np.array(display.canvas.axes.collections[3].get_offsets()) # size = data.shape[0] # samples = samples + size # samples4 = size # expected = 3 # # for i in range(size): # prediction = self.mahalanobis_classifier_point(data[i], self.pcas) # if prediction != expected: # incorrect4 = incorrect4 + 1 # # error = (incorrect4 / samples4) * 100 # # text = ( # " Results for class 3:\n" # + " Total number of samples: " # + str(samples4) # + "\n" # + " Misclassified samples: " # + str(incorrect4) # + "\n" # + " Classification error: " # + str(round(error, 2)) # + "%") # self.process.append(text) # # incorrect = incorrect1 + incorrect2 + incorrect3 + incorrect4 # # error = (incorrect / samples) * 100 # # text = ( # " Overall results:\n" # + " Total number of samples: " # + str(samples) # + "\n" # + " Misclassified samples: " # + str(incorrect) # + "\n" # + " Classification error: " # + str(round(error, 2)) # + "%") # # self.process.append(text) def classify_eval(self): eval_classes = 0 eval_data = np.empty((0,0)) for collections in self.eval.canvas.axes.collections: if None is not collections.get_gid(): eval_classes = eval_classes + 1 if eval_classes == 0: self.process.append(" There is no Eval Data to Classify.\n") return False elif eval_classes != self.classes: text = " The number of classes in Eval Data ({}) does not match train data ({}).\n".format(eval_classes, self.classes) self.process.append(text) return False for collections in self.eval.canvas.axes.collections: # retrieve the x,y and gid of the collection # data = np.array(collections.get_offsets()) eval_data = np.append(eval_data, data) eval_data = np.reshape(eval_data, (-1,2)) self.plot_decision_regions(eval_data, self.kmeans, self.eval) self.compute_errors(self.eval)