-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplotter.py
More file actions
132 lines (99 loc) · 4.06 KB
/
plotter.py
File metadata and controls
132 lines (99 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""Created to be a plotting thread, receving signals over a queue
Stores data in a deque for efficiency, plots with pyqtgraph
Created by Sophie Chang, Jackson Bremen, Summer 2021
"""
from PyQt5 import QtWidgets, QtCore
import pyqtgraph as pg
import sys # We need sys so that we can pass argv to QApplication
from multiprocessing import Queue
from collections import deque
import warnings
warnings.simplefilter("ignore", UserWarning)
sys.coinit_flags = 2
class MainWindow2(QtWidgets.QMainWindow):
"""MainWindow inherits from QMainWindow and holds the values for the plotting
window
Args:
QtWidgets (QMainWindow): QMainWindow object
"""
def __init__(self, communication_queue=None, app=None, *args, **kwargs):
super(MainWindow2, self).__init__(*args, **kwargs)
self.window = pg.GraphicsWindow(title="Torque and Force Plots")
self.window.show()
self.app = app
# TODO: Set num channels automatically based on length of data intake
self.num_channels = 10
self.update_speed_ms = 1
self.window_size = 4
self.num_points = 500
#Titles to be adjusted depending on what variables are graphed in experiment_main
self.plot_titles = ["Elbow Torque(Nm)", "Shoulder Torque (Nm)",
"Bicep (EMG_1)", "Tricep lateral (EMG_2)", "Anterior Deltoid (EMG_3)","Medial Deltoid (EMG_4)",
"Posterior Deltoid (EMG_5)", "Pectoralis Major (EMG_6)", "Lower Trapezius (EMG_7)", "Middle Trapezius (EMG_8)"]
self._init_timeseries()
self.timer = QtCore.QTimer()
self.timer.setInterval(self.update_speed_ms)
self.timer.timeout.connect(lambda: self.update_plot_data(communication_queue))
self.timer.start()
def _init_timeseries(self):
# Plots will be a list of plots
self.plots = list()
# Parameters will be a list of deques
self.parameters = list()
# Times will be the relevant times
self.times = deque([], maxlen=self.num_points)
for i in range(self.num_channels):
p = self.window.addPlot(row=i % 4, col=i//4)
p.showAxis("left", True)
p.setMenuEnabled("left", False)
p.showAxis("bottom", True)
p.setMenuEnabled("bottom", False)
p.setTitle(self.plot_titles[i])
self.plots.append(p)
self.parameters.append(deque([], maxlen=self.num_points))
def update_plot_data(self, comm_queue):
data = []
val = None
while not comm_queue.empty():
val = comm_queue.get_nowait()
if val == "EXIT":
self.timer.stop()
self.window.close()
self.app.quit()
return
data = val
if not val:
return
data, titles = val
# First value will be time, next 8 will be datapoints
# print(data)
current_time = data[0]
data_sensors = data[1:]
self.times.append(current_time)
if len(data_sensors) != self.num_channels:
print("Data length is invalid!!!")
return
self.plot_titles = titles
for count, datum in enumerate(data_sensors):
self.parameters[count].append(datum)
# This might not be the most optimal way, but it works fairly well
# with the deques. Clearing is essential
self.plots[count].plot(self.times, self.parameters[count], clear=True)
self.plots[count].setTitle(self.plot_titles[count])
def animation_control(comm_queue):
"""animation_control: spawns the plotter window, accept the queue
Args:
comm_queue (Multiproccesing.Queue): Allows for data and exit signal to
be transmitted
"""
app = QtWidgets.QApplication(sys.argv)
w = MainWindow2(comm_queue, app)
w.show()
sys.exit(app.exec_())
def main():
app = QtWidgets.QApplication(sys.argv)
w = MainWindow2(app)
w.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()