#!/usr/bin/env python # file: $(NEDC_NFC)/class/python/nedc_edf_tools/nedc_edf_tools.py # # revision history: # # 20240524 (DB): added write_edf and put_header # 20230621 (AB): refactor code to the comment format # 20220107 (JP): cleaned up some standards issues # 20210809 (JP): added get_header_from_file # 20210809 (JP): fixed a bug with the cleanup method # 20200607 (JP): started over using our own code because the open source # versions of this software were not adequately robust # 20200219 (NS): updated to python3 # 20170622 (NC): refactored code into /class/ with more ISIP standards # 20160511 (MT): refactored code to comply with ISIP standards and to allow # input from the command line # 20141212 (MG): modified to support edf files with non-eeg channels # 20141020 (MG): changed load_edf to read an edf file between two specified # times (t1,t2) # 20140812 (MG): initial version # # This file contains a Python implementation of the C++ class Edf. It uses a # dictionary to encapsulate metadata when this data is passed as an argument. # The metadata dictionary should contain the following key-value pairs # (default means that the field is optional and will be replaced with the # default value if null): # #  'id_patient': patient ID (string, max 16 characters, default: 'X') #  'sex': patient gender (string, max 16 characters, default: 'X') #  'date_of_birth': patient's DOB (string, max 16 chars, default: 'X') #  'full_name': patient's full name (string, max 16 characters, default: 'X') #  'age': patient's age (string, max 16 characters, default: 'X') #  'date_exam_1': start date of the recording, formatted as DD-MMM-YYYY # (e.g. 01-JAN-2000) (string, max 16 characters, default: '') #  'id_exam': EEG ID (string, max 16 characters, default: 'X') #  'lrci_tech': technician (string, max 16 characters, default: '') #  'lrci_machine': machine (string, max 16 characters, default: '') #  'date_exam_2': start date of the recording, formatted as DD.MM.YY # (e.g. 01.01.00) (string, max 8 characters, default: '') #  'start_time': start time of the recording, formatted as HH.MM.SS # (e.g. 12.00.00) (string, max 8 characters, default: '') #  'ghdi_file_type': file type (string, max 5 characters, default: ' ') #  'ghdi_reserved': reserved field (string, max 39 characters, # default: ' ') #  'num_rec': number of data records (integer) #  'rec_dur': duration of each data record in seconds (float) #  'sample_frequency': Sampling frequency in Hz (float) #  'chan_labels': list of channel labels (list of strings, max 16 characters # each; number of channel labels is used as the number of # channels, which is further used for calculating other # values, it is important for this list to be correct) #  'chan_trans_type': list of transducer types (list of strings, # max 80 characters each, default: [''] * num_channels) #  'chan_phys_dim': list of physical dimensions (list of strings, # max 8 characters each, default: ['mV'] * num_channels) #  'min_values': list of minimum physical values (list of floats, # default: [''] * num_channels) #  'max_values': list of maximum physical values (list of floats, # default: [''] * num_channels) #  'chan_dig_min': List of minimum digital values (list of strings, # max 8 characters each, # default: [str(-EDF_SIG_MAXVAL)] * num_channels) #  'chan_dig_max': list of maximum digital values (list of strings, # max 8 characters each, # default: [str(EDF_SIG_MAXVAL)] * num_channels) #  'chan_prefilt': list of prefiltering information (list of strings, # max 80 characters each, default: [''] * num_channels) #  'samples_per_channel': number of samples in each data record (integer) #       # In an EDF file, the signal data is split into records of equal # duration, specified by the 'rec_dur' field in the metadata # dictionary. The number of records, specified by the 'num_rec' field, # is calculated by dividing the total duration of the recording # (total_samples / sample_frequency) by the record duration and # rounding up to the nearest integer. Each record contains a fixed # number of samples for each channel, determined by multiplying the # record duration by the sampling frequency (rec_dur * # sample_frequency). #       # This signal dictionary is defined as follows: # # the signal should be a 2D numpy array of integers # with dimensions [num_channels, total_samples], where: # # num_channels is the number of channels in the recording # total_samples is the total number of samples per channel # # In the EDF format, the signal data is stored as unscaled integer # values. The physical range of each channel is defined in the header by # the 'phys_min' and 'phys_max' fields, while the corresponding digital # range is defined by the 'dig_min' and 'dig_max' fields. When preparing # the signal data for writing to an EDF file, you should ensure that the # values in the 2D array are unscaled integers that fall within the # digital range specified in the header. # # To convert the signal data from physical units (e.g., microvolts) # to unscaled integer values, you can use the following formula: # # DigVal = (PhysVal - PhysMin) * (DigMax - DigMin) # / (PhysMax - PhysMin) + DigMin # # The code in this library must parallel the C++ version of this code. #------------------------------------------------------------------------------ # import required system modules # from collections import OrderedDict import numpy as np import os import re import scipy.signal as signal import struct import sys # import NEDC modules # import nedc_debug_tools as ndt import nedc_file_tools as nft #------------------------------------------------------------------------------ # # global variables are listed here # #------------------------------------------------------------------------------ # set the filename using basename # __FILE__ = os.path.basename(__file__) #------------------------------------------------------------------------------ # # this special section defines an Edf header byte by byte # #------------------------------------------------------------------------------ # section (1): version information # EDF_VERS_NAME = "version" EDF_VERS_BSIZE = int(8) EDF_VERS = b"0 " # section (2): patient information # EDF_LPTI_BSIZE = int(80) EDF_LPTI_TSIZE = int(119) EDF_LPTI_PATIENT_ID_NAME = "ltpi_patient_id" EDF_LPTI_GENDER_NAME = "ltpi_gender" EDF_LPTI_DOB_NAME = "ltpi_dob" EDF_LPTI_FULL_NAME_NAME = "ltpi_full_name" EDF_LPTI_AGE_NAME = "ltpi_age" # section (3): local recording information # EDF_LRCI_BSIZE = int(80) EDF_LRCI_TSIZE = EDF_LPTI_TSIZE EDF_LRCI_RSIZE = EDF_LPTI_BSIZE EDF_LRCI_START_DATE_LABEL = "lrci_start_date_label" EDF_LRCI_START_DATE_LABEL_META = "Startdate" EDF_LRCI_START_DATE = "lrci_start_date" EDF_LRCI_EEG_ID = "lrci_eeg_id" EDF_LRCI_TECH = "lrci_tech" EDF_LRCI_MACHINE = "lrci_machine" # section (4): general header information # EDF_GHDI_BSIZE = int(8 + 8 + 8 + 5 + 39 + 8 + 8 + 4) EDF_GHDI_TSIZE = EDF_LPTI_TSIZE EDF_GHDI_START_DATE = "ghdi_start_date" EDF_GHDI_START_TIME = "ghdi_start_time" EDF_GHDI_HSIZE = "ghdi_hsize" EDF_GHDI_FILE_TYPE = "ghdi_file_type" EDF_GHDI_RESERVED = "ghdi_reserved" EDF_GHDI_NUM_RECS = "ghdi_num_recs" EDF_GHDI_DUR_REC = "ghdi_dur_rec" EDF_GHDI_NSIG_REC = "ghdi_nsig_rec" # section (5): channel-specific information # EDF_LABL_BSIZE = int(16) EDF_TRNT_BSIZE = int(80) EDF_PDIM_BSIZE = int( 8) EDF_PMIN_BSIZE = int( 8) EDF_PMAX_BSIZE = int( 8) EDF_DMIN_BSIZE = int( 8) EDF_DMAX_BSIZE = int( 8) EDF_PREF_BSIZE = EDF_TRNT_BSIZE EDF_RECS_BSIZE = int( 8) EDF_CHAN_LABELS = "chan_labels" EDF_CHAN_TRANS_TYPE = "chan_trans_type" EDF_CHAN_PHYS_DIM = "chan_phys_dim" EDF_CHAN_PHYS_MIN = "chan_phys_min" EDF_CHAN_PHYS_MAX = "chan_phys_max" EDF_CHAN_DIG_MIN = "chan_dig_min" EDF_CHAN_DIG_MAX = "chan_dig_max" EDF_CHAN_PREFILT = "chan_prefilt" EDF_CHAN_REC_SIZE = "chan_rec_size" # section (6): derived values # EDF_SAMPLE_FREQUENCY = "sample_frequency" EDF_NUM_CHANNELS_SIGNAL = "num_channel_signal" EDF_NUM_CHANNELS_ANNOTATION = "num_channels_annotation" # other important definitions # EDF_BSIZE = int(256) EDF_ANNOTATION_KEY = "ANNOTATION" EDF_FTYP_NAME = "ftype" EDF_FTYP_BSIZE = int(5) EDF_FTYP = "EDF " EDF_SIG_MAXVAL = int(32767) EDF_SIZEOF_SHORT = int(2) # define constants related to downsampling # EDF_DEF_AXIS = int(1) EDF_DEF_DOWSAMPLE_FILTER_ORDER = int(16) EDF_DEF_FTYPE = "fir" EDF_DEF_ZPHASE = True # define default values # EDF_DEF_CHAN = int(-1) EDF_DEF_DBG_NF = int(10) #------------------------------------------------------------------------------ # # functions are listed here # #------------------------------------------------------------------------------ def set_limits(f1, f2, fmax): """ function: set_limits arguments: long f1: desired first index (input) long f2: desired number of items (input) long fmax: maximum number available (input) return: a boolean value indicating status long& n1: first index (output) long& n2: last_index (output) description: This method returns a range [n1, n2] that is clipped based on the inputs. """ # initial the outuput to the max range # n1 = int(0) n2 = int(fmax) # clip n1 # if f1 > int(0): n1 = min(f1, fmax - 1) # clip n2 # if f2 == int(0): n2 = n1 return(n1, n2) elif f2 > int(0): n2 = min(n1 + f2, n2) # exit gracefully # return(n1, n2) #------------------------------------------------------------------------------ # # classes are listed here # #------------------------------------------------------------------------------ # class: Edf # class Edf: """ class: Edf arguments: none description: This is class is a Python implementation of the C++ class Edf. Its interface parallels that class. """ # define static variables for debug and verbosity # dbgl_d = ndt.Dbgl() vrbl_d = ndt.Vrbl() # define a dictionary to hold header information # h_d = {} #-------------------------------------------------------------------------- # # constructors are listed here # #-------------------------------------------------------------------------- def __init__(self): """ method: constructor arguments: none return: none description: This simple method initializes the class """ # set the class name # Edf.__CLASS_NAME__ = self.__class__.__name__ # # end of method #-------------------------------------------------------------------------- # # print methods are listed here # #-------------------------------------------------------------------------- def print_header(self, fp, prefix = nft.DELIM_TAB): """ method: print_header arguments: fp: stream to be used for printing prefix: a prefix character to use for printing return: a boolean value indicating status description: This method assumes the header has been loaded and prints it. """ # display debug information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: printing Edf header" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) # (1) version information # # note we conver this to a string to be compatible with the c++ # version of this code # fp.write("%sBlock 1: Version Information\n" % (prefix)) fp.write("%s version = [%s]\n\n" % (prefix, str(self.h_d[EDF_VERS_NAME], nft.DEF_CHAR_ENCODING))) # (2) local patient information # fp.write("%sBlock 2: Local Patient Information\n" % (prefix)) fp.write("%s lpti_patient_id = [%s]\n" % (prefix, self.h_d[EDF_LPTI_PATIENT_ID_NAME])) fp.write("%s lpti_gender = [%s]\n" % (prefix, self.h_d[EDF_LPTI_GENDER_NAME])) fp.write("%s lpti_dob = [%s]\n" % (prefix, self.h_d[EDF_LPTI_DOB_NAME])) fp.write("%s lpti_full_name = [%s]\n" % (prefix, self.h_d[EDF_LPTI_FULL_NAME_NAME])) fp.write("%s lpti_age = [%s]\n\n" % (prefix, self.h_d[EDF_LPTI_AGE_NAME])) # (3) local recording information # fp.write("%sBlock 3: Local Recording Information\n" % (prefix)) fp.write("%s lrci_start_date_label = [%s]\n" % (prefix, self.h_d[EDF_LRCI_START_DATE_LABEL])) fp.write("%s lrci_start_date = [%s]\n" % (prefix, self.h_d[EDF_LRCI_START_DATE])) fp.write("%s lrci_eeg_id = [%s]\n" % (prefix, self.h_d[EDF_LRCI_EEG_ID])) fp.write("%s lrci_tech = [%s]\n" % (prefix, self.h_d[EDF_LRCI_TECH])) fp.write("%s lrci_machine = [%s]\n\n" % (prefix, self.h_d[EDF_LRCI_MACHINE])) # (4) general header information # fp.write("%sBlock 4: General Header Information\n" % (prefix)) fp.write("%s ghdi_start_date = [%s]\n" % (prefix, self.h_d[EDF_GHDI_START_DATE])) fp.write("%s ghdi_start_time = [%s]\n" % (prefix, self.h_d[EDF_GHDI_START_TIME])) fp.write("%s ghdi_hsize = [%ld]\n" % (prefix, self.h_d[EDF_GHDI_HSIZE])) fp.write("%s ghdi_file_type = [%s]\n" % (prefix, self.h_d[EDF_GHDI_FILE_TYPE])) fp.write("%s ghdi_reserved = [%s]\n" % (prefix, self.h_d[EDF_GHDI_RESERVED])) fp.write("%s ghdi_num_recs = [%ld]\n" % (prefix, self.h_d[EDF_GHDI_NUM_RECS])) fp.write("%s ghdi_dur_rec = [%lf]\n" % (prefix, self.h_d[EDF_GHDI_DUR_REC])) fp.write("%s ghdi_nsig_rec = [%ld]\n\n" % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) # (5) channel-specific information # fp.write("%sBlock 5: Channel-Specific Information\n" % (prefix)) fp.write("%s chan_labels (%ld) = " % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) last_chan = self.h_d[EDF_GHDI_NSIG_REC] - 1 for i in range(0, last_chan): fp.write("[%s], " % (self.h_d[EDF_CHAN_LABELS][i])) fp.write("[%s]\n" % ((self.h_d[EDF_CHAN_LABELS])[last_chan])) fp.write("%s chan_trans_type (%ld) = " % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) for i in range(0, last_chan): fp.write("[%s], " % (self.h_d[EDF_CHAN_TRANS_TYPE][i])) fp.write("[%s]\n" % (self.h_d[EDF_CHAN_TRANS_TYPE][last_chan])) fp.write("%s chan_phys_dim (%ld) = " % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) for i in range(0, last_chan): fp.write("[%s], " % (self.h_d[EDF_CHAN_PHYS_DIM][i])) fp.write("[%s]\n" % (self.h_d[EDF_CHAN_PHYS_DIM][last_chan])) fp.write("%s chan_phys_min (%ld) = " % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) for i in range(0, last_chan): fp.write("[%10.3f], " % (self.h_d[EDF_CHAN_PHYS_MIN][i])) fp.write("[%10.3f]\n" % (self.h_d[EDF_CHAN_PHYS_MIN][last_chan])) fp.write("%s chan_phys_max (%ld) = " % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) for i in range(0, last_chan): fp.write("[%10.3f], " % (self.h_d[EDF_CHAN_PHYS_MAX][i])) fp.write("[%10.3f]\n" % (self.h_d[EDF_CHAN_PHYS_MAX][last_chan])) fp.write("%s chan_dig_min (%ld) = " % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) for i in range(0, last_chan): fp.write("[%10ld], " % (self.h_d[EDF_CHAN_DIG_MIN][i])) fp.write("[%10ld]\n" % (self.h_d[EDF_CHAN_DIG_MIN][last_chan])) fp.write("%s chan_dig_max (%ld) = " % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) for i in range(0, last_chan): fp.write("[%10ld], " % (self.h_d[EDF_CHAN_DIG_MAX][i])) fp.write("[%10ld]\n" % (self.h_d[EDF_CHAN_DIG_MAX][last_chan])) fp.write("%s chan_prefilt (%ld) = " % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) for i in range(0, last_chan): fp.write("[%s], " % (self.h_d[EDF_CHAN_PREFILT][i])) fp.write("[%s]\n" % (self.h_d[EDF_CHAN_PREFILT][last_chan])) fp.write("%s chan_rec_size (%ld) = " % (prefix, self.h_d[EDF_GHDI_NSIG_REC])) for i in range(0, last_chan): fp.write("[%10ld], " % (self.h_d[EDF_CHAN_REC_SIZE][i])) fp.write("[%10ld]\n" % (self.h_d[EDF_CHAN_REC_SIZE][last_chan])) fp.write("%s\n" % (prefix)) # (6) derived values # fp.write("%sBlock 6: Derived Values\n" % (prefix)) fp.write("%s hdr_sample_frequency = %10.1f\n" % (prefix, self.h_d[EDF_SAMPLE_FREQUENCY])) fp.write("%s hdr_num_channels_signal = %10ld\n" % (prefix, self.h_d[EDF_NUM_CHANNELS_SIGNAL])) fp.write("%s hdr_num_channels_annotation = %10ld\n" % (prefix, self.h_d[EDF_NUM_CHANNELS_ANNOTATION])) fp.write("%s duration of recording (secs) = %10.1f\n" % (prefix, (float)(self.h_d[EDF_GHDI_DUR_REC] * self.h_d[EDF_GHDI_NUM_RECS]))) fp.write("%s per channel sample frequencies:\n" % (prefix)) for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): fp.write("%s channel[%4ld]: %10.1f Hz (%s)\n" % (prefix, i, self.get_sample_frequency(i), self.h_d[EDF_CHAN_LABELS][i])) # exit gracfully # return True # #end of method def print_header_from_file(self, fname, fp, prefix = nft.DELIM_TAB): """ method: print_header_from_file arguments: fname: input file fp: stream to be used for printing prefix: a prefix character to use for printing return: a boolean value indicating status description: This opens a file, reads the header, and pretty prints it. """ # declare local variables # # display debug information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: printing Edf header (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, fname)) # make sure this is an edf file # if nft.is_edf(fname) == False: print("Error: %s (line: %s) %s::%s: not an Edf file (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, fname)) return False # open the file # fp_edf = open(fname, "rb") if fp_edf == None: print("Error: %s (line: %s) %s::%s: error opening (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, fname)) # read the header from a file: # note that we will ignore the signal data # if self.get_header(fp_edf) == False: print("Error: %s (line: %s) %s::%s: error opening (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, fname)) return False # print the header # self.print_header(fp, prefix) # exit gracefully # return True # # end of method #-------------------------------------------------------------------------- # # get methods are listed here # #-------------------------------------------------------------------------- def get_header_from_file(self, fname): """ method: get_header_from_file arguments: fname: input filename return: a boolean value indicating status description: This method reads the header of an edf file given a filename. """ # open the file # fp_edf = open(fname, "rb") if fp_edf == None: print("Error: %s (line: %s) %s::%s: error opening (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, fname)) return False # read the header from a file: # note that we will ignore the signal data # if self.get_header(fp_edf) == False: print("Error: %s (line: %s) %s::%s: error opening (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, fname)) return False # exit gracefully # return True # # end of method def get_header(self, fp): """ method: get_header arguments: fp: an open file pointer return: a logical value indicating the status of the get operation description: This method reads the header of an edf file. """ # declare local variables # nbytes = int(0) num_items = int(0) # display debug information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching an Edf header" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) # rewind the file # fp.seek(0, os.SEEK_SET) # (1) version information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (1)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) self.h_d[EDF_VERS_NAME] = fp.read(EDF_VERS_BSIZE) if self.h_d[EDF_VERS_NAME] != EDF_VERS: return False # (2) local patient information # # unfortunately, some edf files don't contain all the information # they should. this often occurs because the deidenitification # process overwrites this information. so we zero out the buffers # that won't be filled if the information is missing. # # note also that sometimes this field is blank, so split might # not return an adequate number of fields. # # finally, we want these stored as strings, not bytes # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (2)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) fields = (fp.read(EDF_LPTI_BSIZE)).split() if len(fields) > int(0): self.h_d[EDF_LPTI_PATIENT_ID_NAME] = str(fields[0], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LPTI_PATIENT_ID_NAME] = nft.STRING_EMPTY if len(fields) > int(1): self.h_d[EDF_LPTI_GENDER_NAME] = str(fields[1], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LPTI_GENDER_NAME] = nft.STRING_EMPTY if len(fields) > int(2): self.h_d[EDF_LPTI_DOB_NAME] = str(fields[2], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LPTI_DOB_NAME] = nft.STRING_EMPTY if len(fields) > int(3): self.h_d[EDF_LPTI_FULL_NAME_NAME] = str(fields[3], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LPTI_FULL_NAME_NAME] = nft.STRING_EMPTY if len(fields) > int(4): self.h_d[EDF_LPTI_AGE_NAME] = str(fields[4], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LPTI_AGE_NAME] = nft.STRING_EMPTY # (3) local recording information # # unfortunately, some edf files don't contain all the information # they should. this often occurs because the deidenitification # process overwrites this information. so we zero out the buffers # that won't be filled if the information is missing. # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (3)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) fields = (fp.read(EDF_LRCI_BSIZE)).split() if len(fields) > int(0): self.h_d[EDF_LRCI_START_DATE_LABEL] = str(fields[0], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LRCI_START_DATE_LABEL] = nft.STRING_EMPTY if len(fields) > int(1): self.h_d[EDF_LRCI_START_DATE] = str(fields[1], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LRCI_START_DATE] = nft.STRING_EMPTY if len(fields) > int(2): self.h_d[EDF_LRCI_EEG_ID] = str(fields[2], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LRCI_EEG_ID] = nft.STRING_EMPTY if len(fields) > int(3): self.h_d[EDF_LRCI_TECH] = str(fields[3], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LRCI_TECH] = nft.STRING_EMPTY if len(fields) > int(4): self.h_d[EDF_LRCI_MACHINE] = str(fields[4], nft.DEF_CHAR_ENCODING) else: self.h_d[EDF_LRCI_MACHINE] = nft.STRING_EMPTY # (4) general header information # # get the fourth block of data (non-local information) # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (4)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) try: byte_buf = fp.read(EDF_GHDI_BSIZE) buf = str(byte_buf, nft.DEF_CHAR_ENCODING) except: print("Error: %s (line: %s) %s::%s: char encoding (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, byte_buf)) return False self.h_d[EDF_GHDI_START_DATE] = buf[0:8] self.h_d[EDF_GHDI_START_TIME] = buf[8:8+8] self.h_d[EDF_GHDI_HSIZE] = nft.atoi(buf[16:16+8]) self.h_d[EDF_GHDI_FILE_TYPE] = buf[24:24+5] self.h_d[EDF_GHDI_RESERVED] = buf[29:29+39] self.h_d[EDF_GHDI_NUM_RECS] = nft.atoi(buf[68:68+8]) self.h_d[EDF_GHDI_DUR_REC] = nft.atof(buf[76:76+8]) self.h_d[EDF_GHDI_NSIG_REC] = nft.atoi(buf[84:84+4]) # (5) channel-specific information # # get the fifth block of data (channel-specific information) # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (4)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) # (5a) read channel labels # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5a)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) buf = fp.read(EDF_LABL_BSIZE * self.h_d[EDF_GHDI_NSIG_REC]) self.h_d[EDF_NUM_CHANNELS_ANNOTATION] = int(0) self.h_d[EDF_CHAN_LABELS] = [] for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): # grab the channel label # offset = EDF_LABL_BSIZE * i tstr = (str(buf[offset:offset+EDF_LABL_BSIZE], nft.DEF_CHAR_ENCODING)).upper() self.h_d[EDF_CHAN_LABELS].append(nft.trim_whitespace(tstr)) # look for the annotation labels: # note that the label is already upper case # if EDF_ANNOTATION_KEY in self.h_d[EDF_CHAN_LABELS][i]: self.h_d[EDF_NUM_CHANNELS_ANNOTATION] += int(1) # (5b) read the transducer type # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5b)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) buf = fp.read(EDF_TRNT_BSIZE * self.h_d[EDF_GHDI_NSIG_REC]) self.h_d[EDF_CHAN_TRANS_TYPE] = [] for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): offset = EDF_LABL_BSIZE * i tstr = str(buf[offset:offset+EDF_TRNT_BSIZE], nft.DEF_CHAR_ENCODING) self.h_d[EDF_CHAN_TRANS_TYPE].append(nft.trim_whitespace(tstr)) # (5c) read the physical dimension # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5c)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) buf = fp.read(EDF_PDIM_BSIZE * self.h_d[EDF_GHDI_NSIG_REC]) self.h_d[EDF_CHAN_PHYS_DIM] = [] for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): offset = EDF_PDIM_BSIZE * i tstr = str(buf[offset:offset+EDF_PDIM_BSIZE], nft.DEF_CHAR_ENCODING) self.h_d[EDF_CHAN_PHYS_DIM].append(nft.trim_whitespace(tstr)) # (5d) read the physical minimum # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5d)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) buf = fp.read(EDF_PMIN_BSIZE * self.h_d[EDF_GHDI_NSIG_REC]) self.h_d[EDF_CHAN_PHYS_MIN] = [] for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): offset = EDF_PMIN_BSIZE * i tstr = str(buf[offset:offset+EDF_PMIN_BSIZE], nft.DEF_CHAR_ENCODING) self.h_d[EDF_CHAN_PHYS_MIN].\ append(nft.atof(nft.trim_whitespace(tstr))) # (5e) read the physical maximum # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5e)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) buf = fp.read(EDF_PMAX_BSIZE * self.h_d[EDF_GHDI_NSIG_REC]) self.h_d[EDF_CHAN_PHYS_MAX] = [] for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): offset = EDF_PMAX_BSIZE * i tstr = str(buf[offset:offset+EDF_PMAX_BSIZE], nft.DEF_CHAR_ENCODING) self.h_d[EDF_CHAN_PHYS_MAX].\ append(nft.atof(nft.trim_whitespace(tstr))) # (5f) read the digital minimum # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5f)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) buf = fp.read(EDF_DMIN_BSIZE * self.h_d[EDF_GHDI_NSIG_REC]) self.h_d[EDF_CHAN_DIG_MIN] = [] for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): offset = EDF_DMIN_BSIZE * i tstr = str(buf[offset:offset+EDF_DMIN_BSIZE], nft.DEF_CHAR_ENCODING) self.h_d[EDF_CHAN_DIG_MIN].\ append(nft.atoi(nft.trim_whitespace(tstr))) # (5g) read the digital maximum # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5g)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) buf = fp.read(EDF_DMAX_BSIZE * self.h_d[EDF_GHDI_NSIG_REC]) self.h_d[EDF_CHAN_DIG_MAX] = [] for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): offset = EDF_DMAX_BSIZE * i tstr = str(buf[offset:offset+EDF_DMAX_BSIZE], nft.DEF_CHAR_ENCODING) self.h_d[EDF_CHAN_DIG_MAX].\ append(nft.atoi(nft.trim_whitespace(tstr))) # (5h) read the prefilt labels # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5h)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) buf = fp.read(EDF_PREF_BSIZE * self.h_d[EDF_GHDI_NSIG_REC]) self.h_d[EDF_CHAN_PREFILT] = [] for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): offset = EDF_PREF_BSIZE * i tstr = str(buf[offset:offset+EDF_PREF_BSIZE], nft.DEF_CHAR_ENCODING) self.h_d[EDF_CHAN_PREFILT].append(nft.trim_whitespace(tstr)) # (5i) read the rec sizes # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5i)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) buf = fp.read(EDF_RECS_BSIZE * self.h_d[EDF_GHDI_NSIG_REC]) self.h_d[EDF_CHAN_REC_SIZE] = [] for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): offset = EDF_RECS_BSIZE * i tstr = str(buf[offset:offset+EDF_RECS_BSIZE], nft.DEF_CHAR_ENCODING) self.h_d[EDF_CHAN_REC_SIZE].\ append(nft.atoi(nft.trim_whitespace(tstr))) # (5j) the last chunk of the header is reserved space # that we don't need to read. however, we need to advance the # file pointer to be safe. # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (5j)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) fp.seek(self.h_d[EDF_GHDI_HSIZE], os.SEEK_SET) # (6) compute some derived values # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: fetching (6)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) self.h_d[EDF_SAMPLE_FREQUENCY] = \ (float(self.h_d[EDF_CHAN_REC_SIZE][0]) / float(self.h_d[EDF_GHDI_DUR_REC])) self.h_d[EDF_NUM_CHANNELS_SIGNAL] = \ int(self.h_d[EDF_GHDI_NSIG_REC] - self.h_d[EDF_NUM_CHANNELS_ANNOTATION]) # exit gracefully # return True # # end of method def get_sample_frequency(self, chan = EDF_DEF_CHAN): """ method: get_sample_frequency arguments: chan: the input channel index return: a floating point value containing the sample frequency description: none """ if chan == EDF_DEF_CHAN: return self.sample_frequency_d else: return (float(self.h_d[EDF_CHAN_REC_SIZE][chan]) / float(self.h_d[EDF_GHDI_DUR_REC])) def get_num_samples(self, chan = EDF_DEF_CHAN): """ method: get_num_samples arguments: chan: the input channel index return: an integer value containing the number of samples description: none """ return int(self.h_d[EDF_CHAN_REC_SIZE][chan] * self.h_d[EDF_GHDI_NUM_RECS]) def get_duration(self): """ method: get_duration arguments: none return: a float containing the duration in secs description: none """ return (float(self.h_d[EDF_GHDI_DUR_REC] * float(self.h_d[EDF_GHDI_NUM_RECS]))) #-------------------------------------------------------------------------- # # put methods are listed here # #-------------------------------------------------------------------------- def put_header(self, ofile, metadata, num_chan): """ method:put_header arguments: ofile: address of the output file metadata: dictionary with metadata num_chan: number of channels return: a boolean value indicating status description: This method writes the header from the metadata dictionary for edf file. """ # display a debug message # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s: creating an Edf Header [%s]" % (__FILE__, ndt.__LINE__, ndt.__NAME__, ofile)) # open file for writing # with open(ofile, 'wb') as fp: # declare the bytearray # header = bytearray() # section (1): version information # # write version # header.extend(EDF_VERS) # section (2): patient information # # write lpti_patient_id # patient_id = metadata.get('id_patient', 'X').ljust(16).encode() header.extend(patient_id) # write lpti_gender # sex = metadata.get('sex', 'X').ljust(16).encode() header.extend(sex) # write lpti_dob # date_of_birth = metadata.get('date_of_birth', 'X').\ ljust(16).encode() header.extend(date_of_birth) # write lpti_full_name # full_name = metadata.get('full_name', 'X').ljust(16).encode() header.extend(full_name) # write lpti_age # age = metadata.get('age', 'X').ljust(16).encode() header.extend(age) # section (3): local recording information # # write lrci_start_date_label # header.extend(EDF_LRCI_START_DATE_LABEL_META.encode().ljust(16)) # write lrci_start_date # header.extend(metadata.get('date_exam_1', '').ljust(16).encode()) # write lrci_eeg_id # header.extend(metadata.get('id_exam', 'X').ljust(16).encode()) # write lrci_tech # header.extend(metadata.get('lrci_tech', '').ljust(16).encode()) # write lrci_machine # header.extend(metadata.get('lrci_machine', '').ljust(16).encode()) # section (4): general header information # # write ghdi_start_date # header.extend(metadata.get('date_exam_2', '').ljust(8).encode()) # write ghdi_start_time # header.extend(metadata.get('start_time', '').ljust(8).encode()) # write ghdi_hsize # hsize = num_chan * EDF_BSIZE + EDF_BSIZE header.extend(str(hsize).ljust(8).encode()) # write ghdi_file_type # header.extend(metadata.get(EDF_GHDI_FILE_TYPE, ' ').\ ljust(5).encode()) # write ghdi_reserved # header.extend(metadata.get(EDF_GHDI_RESERVED,\ ' ').ljust(39).encode()) # write ghdi_num_recs # header.extend(str(metadata.get('num_rec')).ljust(8).encode()) # write ghdi_dur_rec # header.extend(str(metadata.get('rec_dur')).ljust(8).encode()) # write ghdi_nsig_rec # header.extend(str(num_chan).ljust(4).encode()) # section (5): channel-specific information # # write chan_labels # for label in metadata.get('chan_labels', [''] * num_chan): header.extend(label.ljust(16).encode()) # write chan_trans_type # for trans_type in metadata.get('chan_trans_type', [''] * num_chan): header.extend(trans_type.ljust(80).encode()) # write chan_phys_dim # for phys_dim in metadata.get('chan_phys_dim', ['mV'] * num_chan): header.extend(phys_dim.ljust(8).encode()) # write chan_phys_min # for phys_min in metadata.get('min_values', [''] * num_chan): if phys_min == '': phys_min_str = ''.ljust(8) else: try: phys_min_float = float(phys_min) phys_min_str = f"{phys_min_float:.6f}".rjust(8)[:8] except ValueError: phys_min_str = ''.ljust(8) header.extend(phys_min_str.encode()) # write chan_phys_max # for phys_max in metadata.get('max_values', [''] * num_chan): if phys_max == '': phys_max_str = ''.ljust(8) else: try: phys_max_float = float(phys_max) phys_max_str = f"{phys_max_float:.6f}".rjust(8)[:8] except ValueError: phys_max_str = ''.ljust(8) header.extend(phys_max_str.encode()) # write chan_dig_min # for dig_min in metadata.get('chan_dig_min', [str(-EDF_SIG_MAXVAL)] * num_chan): header.extend(dig_min.ljust(8).encode()) # write chan_dig_max # for dig_max in metadata.get('chan_dig_max', [str(EDF_SIG_MAXVAL)] * num_chan): header.extend(dig_max.ljust(8).encode()) # write chan_prefilt # for prefilt in metadata.get('chan_prefilt', [''] * num_chan): header.extend(prefilt.ljust(80).encode()) # write chan_rec_size # rec_len = str(metadata['samples_per_channel']) rec_len_strings = [rec_len.ljust(8) for _ in range(num_chan)] for rec_len_str in rec_len_strings: header.extend(rec_len_str.encode()) # write remainder of the header as spaces # extra = ' ' * EDF_BSIZE header.extend(extra.encode()) with open(ofile, 'wb') as fp: fp.write(header) fp.close() # display a debug message # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: done writing the edf header" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) # exit gracefully # return True # # end of method def put_signal(self, ofile, metadata, signal, num_chan): """ method:put_signal arguments: ofile: address of output file metadata: dictionary with metadata signal: array of arrays with signal samplings num_chan: number of channels return: a boolean value indicating status description: This method writes the signal from the 2D array for edf file. """ # display a debug message # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s: writing the data to an Edf file [%s]" % (__FILE__, ndt.__LINE__, ndt.__NAME__, ofile)) # open the file and put file pointer at the end of the header # fp = open(ofile, nft.MODE_READ_WRITE_BINARY) header_offset = EDF_BSIZE + EDF_BSIZE * num_chan fp.seek(header_offset) # get number and length of records from metadata # num_rec = metadata['num_rec'] rec_len = int(round(metadata['rec_dur'] * metadata['sample_frequency'])) # write the unscaled signal field as interleaved records # for record in range(num_rec): for channel in range(num_chan): start_index = record * rec_len end_index = start_index + rec_len channel_data = signal[channel][start_index:end_index] for value in channel_data: value = max(-EDF_SIG_MAXVAL, min(EDF_SIG_MAXVAL, value)) fp.write(struct.pack(' ndt.BRIEF: print("%s (line: %s) %s: done writing the data to an Edf file" % (__FILE__, ndt.__LINE__, ndt.__NAME__)) # exit gracefully # return True # # end of method #-------------------------------------------------------------------------- # # edf file methods are listed here # #-------------------------------------------------------------------------- def read_edf(self, fname, scale, sflag = True): """ method: read_edf arguments: fname: input filename scale: if true, scale the signal based on the header data sflag: if true, read the signal data return: the header and the signal data as dictionaries description: This method reads an edf file, and returns the raw signal data. """ # delcare local variables # sig = OrderedDict() # display debug information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: opening an EDF file (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, fname)) # open the file # fp = open(fname, nft.MODE_READ_BINARY) if fp is None: print("Error: %s (line: %s) %s::%s: error opening file (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, fname)) return (None, None) # get the size of the file on disk # fp.seek(0, os.SEEK_END) file_size_in_bytes = fp.tell() fp.seek(0, os.SEEK_SET) if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: file size = %ld bytes" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, file_size_in_bytes)) # load the header # if self.get_header(fp) == False: print("Error: %s (line: %s) %s::%s: error in get_header (%s)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, fname)) return (None, None) # exit if necessary # if sflag == False: fp.close() return (self.h_d, None) # display debug information # if self.dbgl_d > ndt.BRIEF: self.print_header(sys.stdout) # position the file to the beginning of the data # using the header information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: positioning file pointer" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) fp.seek(self.h_d[EDF_GHDI_HSIZE], os.SEEK_SET) # create space to hold the entire signal: # in python, we only need to size the numpy arrays # for i in range(0, self.h_d[EDF_GHDI_NSIG_REC]): sz = int(self.h_d[EDF_GHDI_NUM_RECS] * self.h_d[EDF_CHAN_REC_SIZE][i]) sig[self.h_d[EDF_CHAN_LABELS][i]] = \ np.empty(shape = sz, dtype = np.float64) if (self.dbgl_d == ndt.FULL) and (i < EDF_DEF_DBG_NF): print("%s (line: %s) %s::%s %s (%s: %ld row, %ld cols)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, "sig dimensions", self.h_d[EDF_CHAN_LABELS][i], i, sig[self.h_d[EDF_CHAN_LABELS][i]].shape[0])) if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s signal vector resized" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) # loop over all records # ns_read = np.zeros(shape = self.h_d[EDF_GHDI_NSIG_REC], dtype = int) for i in range(0, self.h_d[EDF_GHDI_NUM_RECS]): # loop over all channels # for j in range(0, self.h_d[EDF_GHDI_NSIG_REC]): # display debug message # if (self.dbgl_d == ndt.FULL) and (i < EDF_DEF_DBG_NF) and \ (j < EDF_DEF_DBG_NF): print("%s (line: %s) %s::%s: %s [%ld %ld]" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, "reading record no.", i, j)) # read the data: # store the data after the last sample read # num_samps = self.h_d[EDF_CHAN_REC_SIZE][j] data = fp.read(num_samps * EDF_SIZEOF_SHORT) buf = np.frombuffer(data, dtype = "short", count = num_samps) \ .astype(np.float64) ns_read[j] += num_samps if num_samps != int(len(data) / EDF_SIZEOF_SHORT): print("Error: %s (line: %s) %s::%s: %s [%d][%d]" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, "read error", num_samps, int(len(data)/EDF_SIZEOF_SHORT))) return (None, None) # compute scale factors: # this code is identical to the C++ version sum_n = float(self.h_d[EDF_CHAN_PHYS_MAX][j] - \ self.h_d[EDF_CHAN_PHYS_MIN][j]) sum_d = float(self.h_d[EDF_CHAN_DIG_MAX][j] - self.h_d[EDF_CHAN_DIG_MIN][j]) sum = float(1.0) dc = float(0.0) if sum_d != float(0.0): sum = sum_n / sum_d dc = float(self.h_d[EDF_CHAN_PHYS_MAX][j] - sum * \ float(self.h_d[EDF_CHAN_DIG_MAX][j])) if (self.dbgl_d == ndt.FULL) and (i < EDF_DEF_DBG_NF) and \ (j < EDF_DEF_DBG_NF): print("%s (line: %s) %s::%s:" "%s [%ld %ld] %f (%f, %f, %f)" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__, "dc offset = ", i, j, dc, sum_n, sum_d, sum)) # scale the data # if scale == True: buf = buf * sum buf = buf + dc offset = i * self.h_d[EDF_CHAN_REC_SIZE][j] sig[self.h_d[EDF_CHAN_LABELS][j]][offset:offset+num_samps] = \ buf # display debug information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s closing an EDF file" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) # close the file # fp.close() # display debug information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: done closing an EDF file" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) # exit gracefully # return (self.h_d, sig) # # end of method def write_edf(self, ofile, metadata, signal): """ method: write_edf arguments: ofile: output filename metadata: a dictionary containing the metadata for the header signal: the signal data (a vector of vectors) return: a boolean value indicating status description: This method writes an edf file from the metadata and the signal data. """ # find number of channels # num_chan = len(metadata['chan_labels']) # write header # self.put_header(ofile, metadata, num_chan) # write signal # self.put_signal(ofile, metadata, signal, num_chan) # exit gracefully # return True # # end of method #-------------------------------------------------------------------------- # # miscellaneous methods are listed here # #-------------------------------------------------------------------------- def cleanup(self): """ method: cleanup arguments: none return: a boolean value indicating status description: This method cleans up memory. """ # display debug information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: starting clean up of memory" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) # clear the header structure # if self.h_d != None: self.h_d = {} # display debug information # if self.dbgl_d > ndt.BRIEF: print("%s (line: %s) %s::%s: done cleaning up memory" % (__FILE__, ndt.__LINE__, Edf.__CLASS_NAME__, ndt.__NAME__)) # exit gracefully # return True def channels_data(self, edf_file, channels_order): """ method: channels_data arguments: channels_order: a list of channel labels to be extracted. the channels will be extracted in this order. return: max_samp_rate: the sample rate of signals sigbufs: a 2D numpy matrix that contains the extracted signals as rows of the matrix description: This method extracts a set of channels from an Edf file. Channels_order is a list of channels to be extracted from edf file. We return the maximum sample rate of the channels and a numpy matrix containing the signal data. """ # read the EDF file # header, signal = self.read_edf(edf_file, False) # find the appropriate sample rate # samp_rates = np.zeros((len(channels_order), )) num_samples = samp_rates.copy() for ch_counter, channel in enumerate(channels_order): # find the specified channel in all the channels in edf file. # if nothing found, something is wrong. stop the whole process to # find the reason. # found = False # loop through the return signal data where signal is an # OrderedDictionary: # Ex: {'EEG FP1-LE' : array([-118., ... , -88. ]) # # If channel exist within the EDF file, we get its sample frequency # along its sample number # for sig_counter, signal_label in enumerate(signal): if (channel in signal_label): samp_rates[ch_counter] = \ self.get_sample_frequency(sig_counter) num_samples[ch_counter] = self.get_num_samples(sig_counter) found = True break # if we reach this, it means that we couldn't find the channel # in the EDF file # assert found, \ (f"Error: {__FILE__} (line: {ndt.__LINE__}) \ {ndt.__NAME__}: Channel {channel} wasn't found \ in {edf_file}") max_samp_rate = int(samp_rates.max()) nSamples = int(num_samples.max()) # now sample rate and number of samples are found, # just fill the matrix. # sigbufs = np.zeros((len(channels_order), nSamples)) for ch_counter, channel in enumerate(channels_order): # Find the index of channel # for sig_counter, signal_label in enumerate(signal): if (channel in signal_label): step = int(nSamples / self.get_num_samples(ch_counter)) sigbufs[ch_counter, 0::step] = signal[channel] # exit gracefully # return max_samp_rate, sigbufs # # end of method # # end of Edf # class: ApplyMontage # class ApplyMontage: """ class: ApplyMontage arguments: none description: This is a simple class meant to compute the montage for edf signals. """ #-------------------------------------------------------------------------- # # constructors are listed here # #-------------------------------------------------------------------------- def __init__(self, channel_order, montage_order): """ method: constructor arguments: channel_order: the channels that occur in the signal montage_order: the montage to apply to the signal return: none description: This simple method initializes the class arguments """ # set the class name # ApplyMontage.__CLASS_NAME__ = self.__class__.__name__ # set the necessary values # self.ch_order = channel_order self.mon_order = montage_order # exit gracefully # return None # # end of method #-------------------------------------------------------------------------- # # processing methods are listed here # #-------------------------------------------------------------------------- # method: ApplyMontage::apply_montage # # arguments: # sig: signal to apply the montage as a dictionary with channel names as # keys and a list of data as the values: # sig = {'channel1name': [data], 'channel2name': [data] ... } # # returns: the signal after applying the montage in the same form as the # input signal: {'montage1name': [data], 'montage2name': [data] ... } # # This method is the main method of this class and will compute the montage # def apply_montage(self, sig): """ method: apply_montage arguments: sig: signal to apply the montage as a dictionary with channel names as keys and a list of data as the values: sig = {'channel1name': [data], 'channel2name': [data] ... } return: the signal after applying the montage in the same form as the input signal: {'montage1name': [data], 'montage2name': [data] ... description: This method is the main method of this class and computes the montage. """ # make sure mon_order exists # if self.mon_order is None: print("Error: %s (line: %s) %s::%s: montage order is none" % (__FILE__, ndt.__LINE__, ApplyMontage.__CLASS_NAME__, ndt.__NAME__)) sys.exit(0) # set dictionary to hold montage signals # monsig = {} # iterate through each montage # for mcounter, montage in enumerate(self.mon_order): # set the raw channels needed and reset index values # rawch1, rawch2 = montage.split(nft.DELIM_DASH) index1, index2 = -1, -1 monsig[montage] = [] # go through all the channels # for ch_counter, channel in enumerate(self.ch_order): # set the indexes of the needed channels # if rawch1 in channel: index1 = ch_counter if rawch2 in channel: index2 = ch_counter # if the channels aren't found print an error message # assert not(index1 == -1 or index2 == -1 or index1 == index2),\ (f"Error: {__FILE__} (line: {ndt.__LINE__}) \ {ndt.__NAME__}: channels({rawch1},{rawch2}) \ hasn't been found or the indices are incorrect") # apply the montage to all signal values in buff # for count in range(len(sig[channel])): monsig[montage].append(sig[self.ch_order[index1]][count] - sig[self.ch_order[index2]][count]) # exit gracfully # return monsig # # end of method # # end of ApplyMontage # # end of file