import csv import numpy as np import time from skmultilearn.adapt import MLkNN from multiprocessing import Pool import numpy as np import scipy.stats as stats import pywt from sklearn.neighbors import KNeighborsClassifier as KNN from sklearn.metrics import accuracy_score class DataProcessor: def __init__(self): self.anno_path = "" self.dat_list_path = "" self.X = [] # Feature matrix self.y = [] # Target array self.features=[] def process_data(self): print("Now loading files:", self.anno_path, " ", self.dat_list_path) start_time = time.time() # Preallocate memory for X and y arrays num_files = sum(1 for _ in open(self.dat_list_path)) new_files = np.empty((num_files, 8, 2200), dtype=np.int16) with open(self.anno_path, 'r') as csv_file, open(self.dat_list_path, 'r') as list_file: csv_reader = csv.reader(csv_file) next(csv_reader) # Skip the header row in CSV for i, (row, file_path) in enumerate(zip(csv_reader, list_file)): file_path = file_path.rstrip('\n') new_files[i] = np.fromfile(file_path, dtype=np.int16).reshape((8, -1)) values = [int(value) for value in row] self.y.append(values) self.X.append(new_files[i]) def extract_features(self): sampling_rate=300 x=np.array(self.X) n_files,n_channels,n_samples=x.shape print("Now starting feature extraction") for i in range(0,n_files): feature_list=[] for n in range(0,n_channels): #for n in range(0,1): feature_list.append(np.mean(x[i][n])) feature_list.append(np.std(x[i][n])) feature_list.append(np.var(x[i][n])) # Calculate additional features diff_sign_mean = np.mean(np.diff(np.sign(x[i][n]))) feature_list.append(diff_sign_mean) feature_list.append(np.max(x[i][n]) - np.min(x[i][n])) # Calculate dominant frequency using FFT fft_result = np.fft.fft(x[i][n]) power_spectrum = np.abs((fft_result) ** 2 / n_samples) dominant_freq = np.argmax(power_spectrum) feature_list.append(dominant_freq) # Include additional less dominant frequencies (e.g., top 3) top_frequencies = np.argsort(power_spectrum)[::-1][:3] for freq_index in top_frequencies: feature_list.append(freq_index) # Calculate wavelet transform features coeffs = pywt.dwt(x[i][n], 'db1') wavelet_energy = np.sum(np.abs(coeffs) ** 2) feature_list.append(wavelet_energy) self.features.append(feature_list) class DataProcessor2: def __init__(self): self.anno_path = "" self.dat_list_path = "" self.X = [] # Feature matrix self.y = [] # Target array self.features=[] def process_data(self): print("Now loading files:", self.anno_path, " ", self.dat_list_path) # Preallocate memory for X and y arrays #X = np.empty((num_files, 8, 2200), dtype=np.int16) with open(self.dat_list_path, 'r') as list_file: for i, (file_path) in enumerate(list_file): file_path = file_path.rstrip('\n') new_X = np.fromfile(file_path, dtype=np.int16).reshape((8, -1)) self.X.append(new_X) def extract_features(self): sampling_rate=300 x=np.array(self.X) n_files,n_channels,n_samples=x.shape print("Now starting feature extraction") for i in range(0,n_files): feature_list=[] for n in range(0,n_channels): #for n in range(0,1): feature_list.append(np.mean(x[i][n])) feature_list.append(np.std(x[i][n])) feature_list.append(np.var(x[i][n])) # Calculate additional features diff_sign_mean = np.mean(np.diff(np.sign(x[i][n]))) feature_list.append(diff_sign_mean) feature_list.append(np.max(x[i][n]) - np.min(x[i][n])) # Calculate dominant frequency using FFT fft_result = np.fft.fft(x[i][n]) power_spectrum = np.abs((fft_result) ** 2 / n_samples) dominant_freq = np.argmax(power_spectrum) feature_list.append(dominant_freq) # Include additional less dominant frequencies (e.g., top 3) top_frequencies = np.argsort(power_spectrum)[::-1][:3] for freq_index in top_frequencies: feature_list.append(freq_index) # Calculate wavelet transform features coeffs = pywt.dwt(x[i][n], 'db1') wavelet_energy = np.sum(np.abs(coeffs) ** 2) feature_list.append(wavelet_energy) self.features.append(feature_list) dataproc = DataProcessor() dataproc.anno_path = './set_15/lists/data_train_healthy.csv' dataproc.dat_list_path = 'set_15/lists/data_train_healthy.list' dataproc.process_data() dataproc.anno_path = './set_15/lists/data_train_unhealthy.csv' dataproc.dat_list_path = 'set_15/lists/data_train_unhealthy.list' dataproc.process_data() dataproc.extract_features() X_train = np.array(dataproc.features) # Convert to numpy array y_train = np.array(dataproc.y) print(X_train.shape) dataproc2 = DataProcessor() dataproc2.anno_path = './set_15/lists/data_dev_healthy.csv' dataproc2.dat_list_path = 'set_15/lists/data_dev_healthy.list' dataproc2.process_data() dataproc2.anno_path = './set_15/lists/data_dev_unhealthy.csv' dataproc2.dat_list_path = 'set_15/lists/data_dev_unhealthy.list' dataproc2.process_data() dataproc2.extract_features() X_dev = np.array(dataproc2.features) y_dev = np.array(dataproc2.y) print(X_dev.shape) dataproceval = DataProcessor2() dataproceval.dat_list_path = 'set_15/lists/data_eval.list' dataproceval.process_data() dataproceval.extract_features() X_eval=np.array(dataproceval.features) print(X_eval.shape) for k in range(6, 7): print("Using k =", k) classifier = KNN(n_neighbors=k) classifier.fit(X_train, y_train) print("Predicting results for training set...") predictions_train = classifier.predict(X_train) predictions_eval = classifier.predict(X_eval) predictions_dev = classifier.predict(X_dev) # Writing predictions to a CSV file with open('predictions_train_k{}.csv'.format(k), mode='w', newline='') as file: writer = csv.writer(file) writer.writerow('1dAVb,RBBB,LBBB,SB,AF,ST') for i, prediction in enumerate(predictions_train): writer.writerow(prediction) with open('predictions_dev_k{}.csv'.format(k), mode='w', newline='') as file: writer = csv.writer(file) writer.writerow('1dAVb,RBBB,LBBB,SB,AF,ST') for i, prediction in enumerate(predictions_dev): writer.writerow(prediction) with open('predictions_eval_k{}.csv'.format(k), mode='w', newline='') as file: writer = csv.writer(file) writer.writerow('1dAVb,RBBB,LBBB,SB,AF,ST') for i, prediction in enumerate(predictions_eval): writer.writerow(prediction) print("Predictions for training set have been written to predictions_train_k{}.csv".format(k)) accuracy_train = accuracy_score(y_train, predictions_train) print("Accuracy on training set:", accuracy_train) print("Predicting results for development set...") predictions_dev = classifier.predict(X_dev) acc_dev = accuracy_score(y_dev, predictions_dev) print("Accuracy on development set: ", acc_dev)