import wfdb import os import numpy as np from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score import csv import joblib def read_ecg_signal(file_path): # Read ECG signal signals, fields = wfdb.rdsamp(file_path) return signals def preprocess_data(input_files): all_flattened_channels = [] for file_path in input_files: with open(file_path, 'r') as f: paths = f.readlines() # Iterate over each file path for path in paths: # Remove trailing newline character path = path.strip() ecg_signals = read_ecg_signal(path) #print(ecg_signals.shape) # Transpose the signals matrix ecg_signals_T = ecg_signals.T # Flatten each channel to a single row flattened_channels = np.concatenate(ecg_signals_T, axis=0) # Append flattened channels to the list all_flattened_channels.append(flattened_channels.tolist()) return all_flattened_channels def preprocess_test_data(input_files): all_signals = [] for file_path in input_files: with open(file_path, 'r') as f: paths = f.readlines() for path in paths: path = path.strip() data = np.fromfile(path, dtype=np.int16) data = data.astype(np.float32)/200 #print(data.shape) # Reshape data into 2D array (channels x samples) flattened_data = data.reshape(data.shape[0], -1) print(flattened_data.shape) #data = data.reshape(-1, 2200, 8) # Flatten all channels into a single row #data_flat = data.flatten() all_signals.extend(data) return np.array(all_signals) def read_annotations(annotation_files): annotations = {label: [] for label in ["1dAVb", "RBBB", "LBBB", "SB", "AF", "ST"]} for annotation_file in annotation_files: with open(annotation_file, 'r') as f: next(f) # Skip the header for line in f: # Split the line by comma to get annotations for each column values = line.strip().split(',') for i, label in enumerate(["1dAVb", "RBBB", "LBBB", "SB", "AF", "ST"]): # Assuming the first column annotation is numeric annotations[label].append(int(values[i])) return annotations def write_predictions_to_csv(filename, predictions): with open(filename, 'w', newline='') as csvfile: writer = csv.writer(csvfile) # Write the header writer.writerow(["1dAVb", "RBBB", "LBBB", "SB", "AF", "ST"]) # Write the predictions row by row for row in zip(*predictions): writer.writerow(row) def main(): train_input_files = ["./data/train/data_train_healthy_records.list", "./data/train/data_train_unhealthy_records.list", "./data/dev/data_dev_healthy_records.list", "./data/dev/data_dev_unhealthy_records.list"] # Path to the list file containing file paths train_annotation_files = ["./data/train/data_train_healthy.csv", "./data/train/data_train_unhealthy.csv", "./data/dev/data_dev_healthy.csv", "./data/dev/data_dev_unhealthy.csv"] # Path to the annotation file test_input_files = ["./data/eval/data_eval.list"] # Path to the list file containing file paths for testing data # Preprocess the training data print("Starting training data processing") train_data = preprocess_test_data(train_input_files) print(train_date.shape) print("Finished training data processing") print("Reading training annotations") # Read training annotations #train_annotations = read_annotations(train_annotation_files) print("Finished reading training annotations") # Preprocess the testing data print("Starting testing data processing") test_data = preprocess_test_data(test_input_files) print("Finished testing data processing") classifiers = {} predictions = {} # Train classifiers and make predictions for each label for label in train_annotations: print(f"Beginning training for {label}") # Initialize the random forest classifier classifiers[label] = RandomForestClassifier(n_estimators=100, random_state=42) # Train the classifier classifiers[label].fit(train_data, train_annotations[label]) # Save the trained model model_filename = f"{label}_model.pkl" joblib.dump(classifiers[label], model_filename) print(f"Model saved as: {model_filename}") # Now, after the first training loop completes, proceed to make predictions for label in train_annotations: print(f"Making predictions for {label}") # Predict on the testing set predictions[label] = classifiers[label].predict(test_data) # Write predictions to CSV file write_predictions_to_csv("hyp_rnf_eval.csv", list(predictions.values())) print("Predictions written to predictions.csv") if __name__ == "__main__": main()