Heart Rate Calculation
Lab 3
Lab 3
In this lab, we use machine learning and a new mode of measuring heart rate (EMG nodes) in order to calculate heart rate. The measuring will be done through IR signals and the EMG signals, but we will also process the obtained signals using a variety of filters. Once the signals are filtered out, we will use unsupervised machine learning in order to teach our program to predict heart beats based on a training data set that we will obtain. Once all of these objectives are met, we move the prototype to a more permanent form by soldering components onto a protoboard, and in doing so we will learn various soldering techniques such as bridging, solder-sucking, etc. Finally, we will implement the functions, such as filtering, collectively as a single software unit. By doing so, we will produce a more "finalized" product with both hardware and software components completed.
1.) First, we connect the EMG to the Arduino and the forearm. The black electrode is placed on the elbow, the white electrode is placed on the tendon of the forearm muscle, and the red electrode is placed on the bulk of the forearm muscle. Refer below for a reference of electrode locations:
Figure 1. Electrode Placement Diagram
Figure 2. Example of Electrode Set Up
2.) Next, we create an Arduino sketch that is capable of reading data from the electrodes and print it out to Serial. In order to do so, we will rely on an interrupt handler. The code is below:
#include <TimerOne.h>
#define signalPin A0
void setup() {
Timer1.initialize(5000);
Timer1.attachInterrupt(measure);
Serial.begin(9600);
while(!Serial) {};
Serial.flush();
}
volatile unsigned long EMGvalue;
unsigned long timestamp = 0;
void measure()
{
EMGvalue = analogRead(signalPin);
}
void loop() {
unsigned long EMGvalue_copy;
unsigned long timestamp = millis();
noInterrupts();
EMGvalue_copy = EMGvalue;
interrupts();
// Serial.print(timestamp);
// Serial.print(' ');
Serial.println(EMGvalue_copy);
}
3.) Now we can open Serial plotter and visualize our data from the electrodes. The resulting graph is shown below:
Figure 3. Serial Plotter of EMG Readings
1. First, we set up the EMG as in Objective 1 before.
2. Next, we will use the sketch from Objective 1 to sample the EMG signal.
3. Write a Python module that will plot the live EMG signal as it is captured over Serial. The figure should be well-formatted. The Python module is pasted below:
# ==================== Imports ====================
import serial
import numpy as np
from matplotlib import pyplot as plt
from matplotlib import animation
from time import sleep
# ==================== Globals ====================
N = 400 # number of samples plotted
NS = 20 # number of samples to read per iteration
sample_count = 0 # current sample count
times, values = np.zeros((2, N)) # data vectors
serial_port = '/dev/cu.usbmodem1411' # 'COM3' # the serial port to use
serial_baud = 9600 # baudrate
# ==================== Grab 'count' samples from Serial ====================
# # ===== This function reads n_samples from serial and updates the global variables times/values =====
def grab_samples(n_samples) :
global sample_count
# Create local arrays of size(n_samples) to store new data
times, values = np.zeros((2,n_samples))
i = 0
while i < n_samples : # while we still need samples, keep reading
try:
data = ser.readline().strip().decode('utf-8')
t, x = data.split(' ')
t = float(t)
x = float(x)
except : # report error if we failed
print('Invalid data: ', data)
continue
# Store the new values into appropriate local arrays
times[i] = t
values[i] = x
# times[:-1] = times[1:] # shift and update our time/values arrays
# times[-1] = t
# values[:-1] = values[1:]
# values[-1] = x
i += 1
sample_count += n_samples # Increment sample_count by amount of n_samples
return times, values # Return two arrays (size n_sample) with new values
def update_plots(i):
global times, values
# shift samples left by 'NS'
times[:N-NS] = times[NS:]
values[:N-NS] = values[NS:]
# grab new samples
times[N-NS:], values[N-NS:] = grab_samples(NS)
# plot
axes.set_xlim(times[0],times[N-1])
live_plot.set_data(times, values)
return live_plot
# ==================== Main ====================
# Note: this assumes data is being sent by the Arduino in the format of "<time> <value>"
# You will need to modify it to suit your needs!
if (__name__ == "__main__") :
# Open Serial
with serial.Serial(port=serial_port, baudrate=serial_baud, timeout=1) as ser:
try:
ser.flush()
sleep(2)
ser.write(b'$') # tell Arduino to start sending data. NOTE!!: you should change this for your setup
# initialize the figure
fig, axes = plt.subplots()
times, values = grab_samples(N)
live_plot = axes.plot(times, values, lw=2)[0]
# initialize the y-axis limits and labels
axes.set_ylim(0, 1200)
axes.set_title('EMG Readings')
axes.set_xlabel('Time (ms)')
axes.set_ylabel('Amplitude')
# set and start the animation and update at 1ms interval (if possible)
anim = animation.FuncAnimation(fig, update_plots, interval= 1)
plt.show()
finally:
ser.flush()
# sleep(2)
ser.write(b'@') # ask to stop sending data. NOTE!!: you should change this for your setup
# grab_samples(N) # initially grab N samples since we have nothing
# plt.plot(times, values) # plot it
# plt.ion() # turn on interactive mode
# while True: # forever keep grabbing NS samples and plot them
# plt.pause(0.0001) # wait 0.0001 seconds before updating the figure
# grab_samples(NS) # grab NS samples
# plt.cla() # clear the figure axes
# plt.plot(times, values) # plot our data
Using this Python script, we can plot a figure of our signal received from Serial. An example is shown below:
Figure 4. Live Plot of EMG Signal
4.) Next, we will write a similar Python module, except this one will live plot a 2-subplot figure. One subplot will display the original EMG signal, and the other will display a transformed signal. In our case, the transformed signal will be a simple inverted signal. Refer to the code below:
# ==================== Imports ====================
import serial
import numpy as np
from matplotlib import pyplot as plt
from matplotlib import animation
from time import sleep
# ==================== Globals ====================
N = 400 # number of samples plotted
NS = 20 # number of samples to read per iteration
sample_count = 0 # current sample count
times, values, transforms = np.zeros((3, N)) # data vectors
serial_port = '/dev/cu.usbmodem1411' # 'COM3' # the serial port to use
serial_baud = 9600 # baudrate
# ==================== Grab 'count' samples from Serial ====================
# # ===== This function reads n_samples from serial and updates the global variables times/values =====
def grab_samples(n_samples) :
global sample_count
# Create local arrays of size(n_samples) to store new data
times, values, transforms = np.zeros((3,n_samples))
i = 0
while i < n_samples : # while we still need samples, keep reading
try:
data = ser.readline().strip().decode('utf-8')
t, x = data.split(' ')
t = float(t)
x = float(x)
y = x * -1
except : # report error if we failed
print('Invalid data: ', data)
continue
# Store the new values into appropriate local arrays
times[i] = t / 1000
values[i] = x
transforms[i] = y
# times[:-1] = times[1:] # shift and update our time/values arrays
# times[-1] = t
# values[:-1] = values[1:]
# values[-1] = x
i += 1
sample_count += n_samples # Increment sample_count by amount of n_samples
return times, values, transforms # Return two arrays (size n_sample) with new values
def update_plots(i):
global times, values, transforms
# shift samples left by 'NS'
times[:N-NS] = times[NS:]
values[:N-NS] = values[NS:]
transforms[:N-NS] = transforms[NS:]
# grab new samples
times[N-NS:], values[N-NS:], transforms[N-NS:] = grab_samples(NS)
# plot
[ax.set_xlim(times[0],times[N-1]) for ax in axes]
live_plots[0].set_data(times, values)
live_plots[1].set_data(times, transforms)
return live_plots
# ==================== Main ====================
# Note: this assumes data is being sent by the Arduino in the format of "<time> <value>"
# You will need to modify it to suit your needs!
if (__name__ == "__main__") :
# Open Serial
with serial.Serial(port=serial_port, baudrate=serial_baud, timeout=10) as ser:
try:
ser.flush()
sleep(2)
ser.write(b'$') # tell Arduino to start sending data. NOTE!!: you should change this for your setup
# initialize the figure
fig, axes = plt.subplots(1,2)
times, values, transforms = grab_samples(N)
live_plots = []
live_plots.append(axes[0].plot(times, values, lw=2)[0])
live_plots.append(axes[1].plot(times, transforms, lw=2)[0])
# initialize the y-axis limits and labels
[ax.set_ylim(-1200, 1200) for ax in axes]
axes[0].set_title('EMG Readings')
axes[0].set_xlabel('Time (s)')
axes[0].set_ylabel('Amplitude')
axes[1].set_title('Inverse Transform')
axes[1].set_ylabel('Amplitude')
axes[1].set_xlabel('Time (s)')
# set and start the animation and update at 1ms interval (if possible)
anim = animation.FuncAnimation(fig, update_plots, interval=1)
plt.show()
finally:
ser.flush()
# sleep(2)
ser.write(b'@') # ask to stop sending data. NOTE!!: you should change this for your setup
# grab_samples(N) # initially grab N samples since we have nothing
# plt.plot(times, values) # plot it
# plt.ion() # turn on interactive mode
# while True: # forever keep grabbing NS samples and plot them
# plt.pause(0.0001) # wait 0.0001 seconds before updating the figure
# grab_samples(NS) # grab NS samples
# plt.cla() # clear the figure axes
# plt.plot(times, values) # plot our data
The code will allow us to display the following graph as well:
Figure 5. Live Subplots of EMG Signals
1.) First, we attach the electrodes similar to Objective 1, but refer to the following diagram in Figure 6 instead:
Figure 6. ECG Electrode Placement Diagram
2.) Next, we reuse the Arduino sketch from Objective 1 to write the ECG signal and timestamp to Serial for our Python script to read. Refer back to Objective 1 for the Arduino sketch.
3.) Write a Python module, live_ecg_plot.py, that will live plot the ECCG signal from Serial and the Heartbeat locations. For the heartbeat locations, we import from the hr_ecg library a method called calculate_hr(). The code is pasted below:
# ==================== Imports ====================
import serial
import numpy as np
from matplotlib import pyplot as plt
from matplotlib import animation
from time import sleep
from hr_ecg import calculate_hr
# ==================== Globals ====================
N = 400 # number of samples plotted
NS = 20 # number of samples to read per iteration
sample_count = 0 # current sample count
heartrate = 0 # current heart rate
times, values, locations = np.zeros((3, N)) # data vectors
ecg_signal = np.ndarray((2,N)) # ecg_signal to send to hr_ecg function
serial_port = '/dev/cu.usbmodem1411' # 'COM3' # the serial port to use
serial_baud = 9600 # baudrate
# ==================== Grab 'count' samples from Serial ====================
# # ===== This function reads n_samples from serial and updates the global variables times/values =====
def grab_samples(n_samples) :
global sample_count
# Create local arrays of size(n_samples) to store new data
times, values = np.zeros((2,n_samples))
i = 0
while i < n_samples : # while we still need samples, keep reading
try:
ser.write(b'$')
data = ser.readline().strip().decode('utf-8')
t, x = data.split(' ')
t = float(t)
x = float(x)
except : # report error if we failed
print('Invalid data: ', data)
continue
# Store the new values into appropriate local arrays
times[i] = t / 1000 # convert time to seconds from ms
values[i] = x
# times[:-1] = times[1:] # shift and update our time/values arrays
# times[-1] = t
# values[:-1] = values[1:]
# values[-1] = x
i += 1
ser.write(b'@')
sample_count += n_samples # Increment sample_count by amount of n_samples
return times, values # Return two arrays (size n_sample) with new values
def update_plots(i):
global times, values, locations
# shift samples left by 'NS'
times[:N-NS] = times[NS:]
values[:N-NS] = values[NS:]
locations[:N-NS] = locations[NS:]
# grab new samples
times[N-NS:], values[N-NS:] = grab_samples(NS)
# store times and values into ecg_signal (ndarray)
ecg_signal[0, :] = times[:]
ecg_signal[1, :] = values[:]
# calculate hr and locations
heartrate, locations = calculate_hr(ecg_signal)
heartrate_str = str(int(heartrate))
# send heartrate to Arduino via Serial
ser.write(b'#')
ser.write(heartrate_str.encode())
ser.write(b'>')
ser.write(b'@')
# plot
axes.set_xlim(times[0],times[N-1])
live_plots[0].set_data(times, values )
live_plots[1].set_data(times, locations)
axes.set_title('EMG Readings, HR = %f' %heartrate)
return live_plots
# ==================== Main ====================
# Note: this assumes data is being sent by the Arduino in the format of "<time> <value>"
# You will need to modify it to suit your needs!
if (__name__ == "__main__") :
# Open Serial
with serial.Serial(port=serial_port, baudrate=serial_baud, timeout=1) as ser:
try:
ser.flush()
sleep(2)
# ser.write(b'$') # tell Arduino to start sending data. NOTE!!: you should change this for your setup
# initialize the figure
fig, axes = plt.subplots()
times, values = grab_samples(N)
ecg_signal[0, :] = times
ecg_signal[1, :] = values
heartrate, locations = calculate_hr(ecg_signal)
# send heartrate to Arduino via Serial
heartrate_str = str(int(heartrate))
ser.write(b'#')
ser.write(heartrate_str.encode())
ser.write(b'>')
ser.write(b'@')
live_plots = []
live_plots.append(axes.plot(times, values, lw=2)[0])
live_plots.append(axes.plot(times, locations, lw=2)[0])
# initialize the y-axis limits and labels
axes.set_ylim(-10, 1100)
axes.set_title('EMG Readings')
axes.set_xlabel('Time (s)')
axes.set_ylabel('Amplitude')
# set and start the animation and update at 1ms interval (if possible)
anim = animation.FuncAnimation(fig, update_plots, interval= 1)
plt.show()
finally:
ser.flush()
# sleep(2)
ser.write(b'@') # ask to stop sending data. NOTE!!: you should change this for your setup
# grab_samples(N) # initially grab N samples since we have nothing
# plt.plot(times, values) # plot it
# plt.ion() # turn on interactive mode
# while True: # forever keep grabbing NS samples and plot them
# plt.pause(0.0001) # wait 0.0001 seconds before updating the figure
# grab_samples(NS) # grab NS samples
# plt.cla() # clear the figure axes
# plt.plot(times, values) # plot our data
Using the code above, we can now plot a live plot of our heart beat signal as well as the heartbeat locations. Figure 7 shows an example of this feature.
Figure 7. Plot of ECG Readings with Heartbeat Locations
4.) The live_ecg_plot.py script also writes the heart rate measurement out to Serial for the Arduino sketch to read. Using the code below, create an Arduino sketch, HR_on_OLED.ino, to read the heart rate from Serial and print it the OLED screen. The OLED screen is used in previous labs, so refer back to Lab 1 and Lab 2 for a refresher on how to use the OLED screen.
/********************** Libraies *********************/
#include <TimerOne.h>
#include <SPI.h>
#include <Wire.h>
#include <Adafruit_GFX.h>
#include <Adafruit_SSD1306.h>
/********************** Definitions *****************/
#define signalPin A0
#define SCALING 32/100 //Ratio of OLED height to IR signal
#define OLED_RESET 4
Adafruit_SSD1306 display(OLED_RESET);
#define NUMFLAKES 10
#define XPOS 0
#define YPOS 1
#define DELTAY 2
#define LOGO16_GLCD_HEIGHT 16
#define LOGO16_GLCD_WIDTH 16
static const unsigned char PROGMEM logo16_glcd_bmp[] =
{ B00000000, B11000000,
B00000001, B11000000,
B00000001, B11000000,
B00000011, B11100000,
B11110011, B11100000,
B11111110, B11111000,
B01111110, B11111111,
B00110011, B10011111,
B00011111, B11111100,
B00001101, B01110000,
B00011011, B10100000,
B00111111, B11100000,
B00111111, B11110000,
B01111100, B11110000,
B01110000, B01110000,
B00000000, B00110000 };
#if (SSD1306_LCDHEIGHT != 32)
#error("Height incorrect, please fix Adafruit_SSD1306.h!");
#endif
/*********************Global Variables****************/
volatile unsigned long EMGvalue;
unsigned long EMGvalue_copy;
unsigned long timestamp = 0;
String inString = "";
char pycmd = ' ';
int HR = 0;
boolean done = false;
/******************* Setup ***************************/
void setup() {
Timer1.initialize(5000);
Timer1.attachInterrupt(measure);
Serial.begin(9600);
while(!Serial) {
;
}
// Initializing OLED
display.begin(SSD1306_SWITCHCAPVCC, 0x3C);
display.clearDisplay();
display.display();
display.setTextSize(1);
display.setTextColor(WHITE);
display.setCursor(0,0);
}
void loop() {
timestamp = millis();
if (Serial.available() > 0) {
// Read from Serial
pycmd = Serial.read();
// Check the command: #, $, or @
if (pycmd == '#') {
readHeartrate();
printOLED(HR);
}
else if (pycmd == '$') {
sendData();
}
else if (pycmd == '@') {
// Do nothing
}
pycmd = ' ';
}
noInterrupts();
EMGvalue_copy = EMGvalue;
interrupts();
}
void measure() { // Measures EMG values from signalPin
EMGvalue = analogRead(signalPin);
}
void printOLED( int HR ) {
display.clearDisplay();
display.setCursor(0,0);
display.print("Heartrate: ");
display.println(HR);
display.display();
}
void sendData() { // Prints the EMG values to Serial
Serial.print(timestamp);
Serial.print(' ');
Serial.println(EMGvalue_copy);
Serial.flush();
}
void readHeartrate() {
done = false;
while (!done) {
int inChar = Serial.read();
if (isDigit(inChar)) {
inString += (char)inChar;
}
if (inChar == '>') {
HR = inString.toInt();
inString = "";
done = true;
}
}
}
1.) For this objective, we will use two Python scripts: filtering.py and live_processed_subplots.py. The first script is the "child" module that will perform the filtering of our data values, whereas the second script is the "mother" module that will call the other script to perform the filtering. The two Python scripts are pasted below:
filtering.py
"""
This function processes the data in 3 steps
Step 1: Normalize from ADC value (0-1023) to voltage
Step 2: Apply LPF filter
Step 3: Apply HPF filter
INPUT:
data: a numpy array, with dimension (N,), contains data to be processed
filter_coeffs: a numpy array, with dimensions (4, M+1), contains filter coefficients
row 0: HPF a coefficients
row 1: HPF b coefficients
row 2: LPF a coefficients
row 3: LPF b coefficients
filter_ICs: a numpy array, with dimension (2, M), contains initial conditions
row 0: HPF initial conditions
row 1: LPF initial conditions
Note, M is the filter order number.
OUTPUT:
proc_data: a numpy array, with dimension (N,), contains processed dat
filter_ICs: a numpy array, with dimension (2, M), contains new filter initial conditions
"""
# ==================== Imports ====================
import numpy as np
import scipy.signal as sig
# ==================== Function ====================
def process_ir(data, filter_coeffs, filter_ICs, first):
# ==================== Step 1 ====================
# Convert ADC value to voltage
gain = 3600
offset = 2.5
Vcc = 5
x = np.zeros( len(data) )
for i in range(len(data)):
x[i] = (data[i]*Vcc/1024 - offset) / gain
# print (x)
# ==================== Step 2 ====================
# Apply LPF filter
if (first == True) :
LPF, zi_new_LPF = sig.lfilter(filter_coeffs[3, :], filter_coeffs[2, :], x, zi = filter_ICs[1, :]*x[0])
filter_ICs[1, :] = zi_new_LPF[:]
else :
LPF, zi_new_LPF = sig.lfilter(filter_coeffs[3, :], filter_coeffs[2, :], x, zi = filter_ICs[1, :])
filter_ICs[1, :] = zi_new_LPF[:]
# ==================== Step 3 ====================
# Apply HPF filter
if (first == True) :
proc_data, zi_new_HPF = sig.lfilter(filter_coeffs[1, :], filter_coeffs[0, :], LPF, zi = filter_ICs[0, :]*LPF[0])
filter_ICs[0, :] = zi_new_HPF[:]
else :
proc_data, zi_new_HPF = sig.lfilter(filter_coeffs[1, :], filter_coeffs[0, :], LPF, zi = filter_ICs[0, :])
filter_ICs[0, :] = zi_new_HPF[:]
return proc_data, filter_ICs
live_processed_subplots.py
import scipy.signal as sig
import serial
import matplotlib.pyplot as plt
from matplotlib import animation
from time import sleep
import numpy as np
import filtering as flt
# ==================== Globals ====================
N = 400 # number of samples plotted
NS = 20 # number of samples to read per iteration
sample_count = 0 # current sample count
filter_order = 3
LPF_cutoff = 10.0/200
HPF_cutoff = 0.5/200
gain = 3600
offset = 2.5
Vcc = 5
times, values, processed = np.zeros((3, N)) # data vectors
filter_ICs = np.zeros((2, filter_order))
filter_coeffs = np.zeros((4, filter_order + 1))
serial_port = '/dev/cu.usbmodem1411' # the serial port to use
serial_baud = 9600 # baudrate
def grab_samples(n_samples) :
global sample_count
# Create local arrays of size(n_samples) to store new data
times, values = np.zeros((2,n_samples))
i = 0
while i < n_samples : # while we still need samples, keep reading
try:
ser.write(b'$')
data = ser.readline().strip().decode('utf-8')
t, x = data.split(' ')
t = float(t)
x = float(x)
except : # report error if we failed
print('Invalid data: ', data)
continue
# Store the new values into appropriate local arrays
times[i] = t / 1000 # convert time to seconds from ms
values[i] = x
i += 1
ser.write(b'@')
sample_count += n_samples # Increment sample_count by amount of n_samples
return times, values # Return two arrays (size n_sample) with new values
def update_plots(i):
global times, values, processed, filter_coeffs, filter_ICs
# shift samples left by 'NS'
times[:N-NS] = times[NS:]
values[:N-NS] = values[NS:]
# grab new samples
times[N-NS:], values[N-NS:] = grab_samples(NS)
processed, filter_ICs = flt.process_ir(values, filter_coeffs, filter_ICs, False)
# plot
[ax.set_xlim(times[0],times[N-1]) for ax in axes]
live_plots[0].set_data(times, values )
live_plots[1].set_data(times, processed)
return live_plots
if (__name__ == "__main__"):
# Open Serial
with serial.Serial(port=serial_port, baudrate=serial_baud, timeout=1) as ser:
try:
ser.flush()
sleep(2)
ser.write(b'$') # tell Arduino to start sending data. NOTE!!: you should change this for your setup
# initialize the figure
fig, axes = plt.subplots(2, 1)
times, values = grab_samples(N)
# get HPF and LPF coefficients
filter_coeffs[3, :], filter_coeffs[2, :] = sig.butter(filter_order, LPF_cutoff, btype='lowpass', analog=False)
filter_coeffs[1, :], filter_coeffs[0, :] = sig.butter(filter_order, HPF_cutoff, btype='highpass', analog=False)
# get HPF and LPF initial conditions
filter_ICs[1, :] = sig.lfilter_zi( filter_coeffs[3, :], filter_coeffs[2, :] )
filter_ICs[0, :] = sig.lfilter_zi( filter_coeffs[1, :], filter_coeffs[0, :] )
processed, filter_ICs = flt.process_ir(values, filter_coeffs, filter_ICs, True)
live_plots = []
live_plots.append(axes[0].plot(times, values, lw=2)[0])
live_plots.append(axes[1].plot(times, processed, lw=2)[0])
# initialize the y-axis limits and labels
axes[0].set_ylim(0, 1200)
axes[0].set_title('Raw IR Readings')
# axes[0].set_xlabel('Time (s)')
axes[0].set_ylabel('Amplitude (V)')
axes[1].set_ylim(-0.0005, 0.0005)
axes[1].set_title('Filtered Signal')
axes[1].set_xlabel('Time (s)')
axes[1].set_ylabel('Amplitude (V)')
# set and start the animation and update at 1ms interval (if possible)
anim = animation.FuncAnimation(fig, update_plots, interval= 1)
plt.show()
finally:
ser.flush()
# sleep(2)
ser.write(b'@')
Using both Python scripts shown above, you should be able to plot a live feed of your IR signal detecting your heart beat. Moreover, the plots should reveal a raw, unfiltered signal on the top and a filtered signal on the bottom. An example is shown in Figure 8 below:
Figure 8. Subplots of Raw IR vs Filtered IR Signals
It is important to note that this objective shows a different quality of filtering compared to the filtering done in previous labs mainly due to the fact that it was performed inside a Python script, rather than inside the Arduino. The most notable difference is the speed in which the filter was applied: Python executes LPFs and HPFs much faster than the Arduino sketch. As far as other signal artifacts, there does not seem to be much more elements that require filtering.
1.) First, let's collect our IR signal data using the Arduino sketch from Objective 1. We will need 30~45 seconds of clear IR data with timestamps. To do so, write a Python module named collect_ir_and_ecg_data.py to read data from Serial and save it to the file ir_ecg_time_data.npy. The Python code is pasted below:
# ---------------------------------------- Import Libraries ---------------------------------------- #
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal as sig
from sklearn.mixture import GaussianMixture as GM
from filtering import process_ir
# ---------------------------------------- Load Data ---------------------------------------- #
# Load data
ir_data = np.genfromtxt(r'ir_ecg_time_data.npy', delimiter = ' ')
times = [x[0]/1000 for x in ir_data]
values = [x[1] for x in ir_data]
# Plot the first 5 seconds of raw IR signal
start_time = times[0]
new_times = [x for x in times if x - start_time <= 5]
new_values = [values[x] for x in range(len(new_times))]
plt.plot(new_times, new_values)
plt.xlabel("Time (s)")
plt.ylabel("Amplitude")
plt.title("First 5 Seconds of Raw Signal")
plt.show()
# ---------------------------------------- Process IR ---------------------------------------- #
# Sampling Frequency
SF = 200
# Cutoff frequencies for filters
LPF_cutoff = 10.0 #originally 10.0
HPF_cutoff = 0.5 #originally 0.5
# Filter order
filt_ord = 3
# Nyquist frequency
fnyq = SF / 2.0
# Cutoff frequency relative to Nyquist
LPF_cutoff = LPF_cutoff/fnyq
HPF_cutoff = HPF_cutoff/fnyq
# Filter coefficients
LPF_b, LPF_a = sig.butter(filt_ord, LPF_cutoff, btype='lowpass', analog=False) # <----- Fill this in <-----
HPF_b, HPF_a = sig.butter(filt_ord, HPF_cutoff, btype='highpass', analog=False) # <----- Fill this in <-----
# Initial conditions
LPF_zf = sig.lfilter_zi(LPF_b, LPF_a)
HPF_zf = sig.lfilter_zi(HPF_b, HPF_a)
# Save these parameters
filter_coeffs = np.zeros((4, filt_ord+1))
filter_ICs = np.zeros((2, filt_ord))
filter_coeffs[3, :] = LPF_b # <----- Store filter coefficients, HERE <-----
filter_coeffs[2, :] = LPF_a
filter_coeffs[1, :] = HPF_b
filter_coeffs[0, :] = HPF_a
filter_ICs[0, :] = HPF_zf # <----- Store filter ICs, HERE <--------------
filter_ICs[1, :] = LPF_zf
# <----- Process IR signal, HERE <----------
processed_values, filter_ICs = process_ir(new_values, filter_coeffs, filter_ICs, True)
plt.plot(new_times, processed_values)
plt.xlabel("Time (s)")
plt.ylabel("Amplitude")
plt.title("First 5 Seconds of Filtered Signal")
plt.show()
# <----- Split IR signal, HERE <------------
# # ---------------------------------------- Find GMM ---------------------------------------- #
# # <----- Plot data histogram, HERE <------------
# # Create GMM object
# gmm = GM(n_components = 2)
# # Fit 2 component Gaussian to the data
# gmm_fit = gmm.fit( # Pass correct parameters, here )
# # Retrieve Gaussian parameters
# mu0 = gmm_fit.means_[0]
# mu1 = gmm_fit.means_[1]
# std0 = np.sqrt(gmm_fit.covariances_[0])
# std1 = np.sqrt(gmm_fit.covariances_[1])
# w0 = gmm_fit.weights_[0]
# w1 = gmm_fit.weights_[1]
# # <----- Plot Gaussians sum over histogram, HERE <------------
# # <----- Plot two Gaussians over histogram, HERE <------------
# # ---------------------------------------- Predict Labels ---------------------------------------- #
# # Predict training labels
# train_pred_lbl = gmm_fit.predict( # Pass correct parameters, here )
# # Predict validation labels
# validation_pred_lbl = gmm_fit.predict( # Pass correct parameters, here )
# # <----- Plot Training Set predictions, HERE <------------
# # <----- Plot Validation Set predictions, HERE <----------
# # <----- Store Training Set predictions, HERE <------------
# # <----- Store Validation Set predictions, HERE <------------
1. First, we set up the OLED and vibrating motor according the diagram below:
Diagram of Feedback Circuit
2. Next, we write an Arduino sketch, Feedback.ino, that will generate a random integer as a timer, and use interrupt service handlers to develop our feedback functionality. The code is pasted below:
/********************** Libraies *********************/
#include <TimerOne.h>
#include <SPI.h>
#include <Wire.h>
#include <Adafruit_GFX.h>
#include <Adafruit_SSD1306.h>
/********************** Definitions *****************/
#define signalPin A0
#define blackPin 3
#define SCALING 32/100 //Ratio of OLED height to IR signal
#define OLED_RESET 4
Adafruit_SSD1306 display(OLED_RESET);
#define NUMFLAKES 10
#define XPOS 0
#define YPOS 1
#define DELTAY 2
#define LOGO16_GLCD_HEIGHT 16
#define LOGO16_GLCD_WIDTH 16
static const unsigned char PROGMEM logo16_glcd_bmp[] =
{ B00000000, B11000000,
B00000001, B11000000,
B00000001, B11000000,
B00000011, B11100000,
B11110011, B11100000,
B11111110, B11111000,
B01111110, B11111111,
B00110011, B10011111,
B00011111, B11111100,
B00001101, B01110000,
B00011011, B10100000,
B00111111, B11100000,
B00111111, B11110000,
B01111100, B11110000,
B01110000, B01110000,
B00000000, B00110000 };
#if (SSD1306_LCDHEIGHT != 32)
#error("Height incorrect, please fix Adafruit_SSD1306.h!");
#endif
/*********************Global Variables****************/
volatile int startNumber;
boolean check;
/******************* Setup ***************************/
void setup() {
Timer1.initialize(1000000);
Timer1.attachInterrupt(my_ISR);
pinMode(blackPin, OUTPUT);
digitalWrite(blackPin, HIGH);
check = false;
// Initializing OLED
display.begin(SSD1306_SWITCHCAPVCC, 0x3C);
display.clearDisplay();
display.display();
display.setTextSize(1);
display.setTextColor(WHITE);
display.setCursor(0,0);
startNumber = random(1,11);
check = true;
}
void loop() {
int startNumber_copy;
noInterrupts();
startNumber_copy = startNumber;
interrupts();
printOLED( startNumber_copy );
if (check) {
if (startNumber_copy == 0) {
// Vibrate motor twice
for (int i = 0; i < 2; i++) {
digitalWrite(blackPin, LOW);
delay(100);
digitalWrite(blackPin, HIGH);
delay(100);
}
// Reset the start number
noInterrupts();
startNumber = random(1,11);
interrupts();
}
else {
check = false;
}
}
}
void my_ISR() {
startNumber--;
check = true;
}
void printOLED( int startnumber ) {
display.clearDisplay();
display.setCursor(0,0);
display.print("Number: ");
display.println(startnumber);
display.display();
}
The code will generate a start number from 1 to 10 as a timer, then display this number on the OLED and decrease it by one every second. When the timer reaches 0, the code will vibrate the motor twice, reset the timer number, and repeat this entire process. A video of the system in action is shown below:
1. First we will require two Arduino libraries for system: I2Cdev and MPU6050. These libraries can be downloaded https://www.i2cdevlib.com/usage. The instructions are also included in the link.
2. A list of necessary components for this lab is listed below:
Stackable Header Pins (1x10pin, 3x8pin, 1x6pin, 1x4pin, 1xOpamp socket)
1 x OLED Module
1 x IMU-6050
2 x Button
1 x HM-10 BLE Module
1 x LM385 Op Amp
1 x IR Emitter (Bottom)
1 x IR Receiver (Top)
1 x Vibrating Motor
Capacitors
(1x100uF, 1x1uF)
Resistors
(1x100, 1x220, 1x1k, 1x2.2k, 1x10k Ohm)
Electrical Wires (4 assorted colors)
3. The following guide is a step-by-step tutorial for the entire soldering process: https://docs.google.com/presentation/d/1CHQusQzaYMer5Dfu2IZmVYt6EamkjQe0dUilOh1v9N8/edit#slide=id.g46fee45b64_0_83
Techniques, such as bridging, is necessary to perform this tutorial. Bridging is simply a "bridge of solder" that provides a connection from one pin to another. Below is an example of the bridging technique:
An example of the board I created is also shown below:
Completed Hardware System
Completed Hardware System