-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDataBuffer.py
More file actions
82 lines (58 loc) · 2.21 KB
/
DataBuffer.py
File metadata and controls
82 lines (58 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from __future__ import annotations
from typing import List,Any,Dict
import os
class DataBuffer:
def __init__(self,max_size:int,filename:str=None,header:List[str]=None):
"""Creates a databuffer. Optionally if filename and header are specified
then every time a value is pushed to the buffer it will be appended to
a file with the name specified in the buffers/ folder."""
self._buffer_size = max_size
self._buffer = []
self._filepath = None
if filename and header:
if not os.path.exists(os.path.join(os.getcwd(),'buffers/')):
os.mkdir(os.path.join(os.getcwd(),'buffers/'))
self._filepath = os.path.join(os.getcwd(),'buffers/',filename)
if os.path.exists(self._filepath):
os.remove(self._filepath)
with open(self._filepath,"w+") as file:
file.write(','.join(header) + '\n')
def get(self,index:int)->Any:
"""Returns the value at the specified index without deleting. """
return self._buffer[index]
def get_all(self)->List[Any]:
"""Returns the full buffer without flushing it."""
return self._buffer
def push(self,value:Any):
"""Add a value to the buffer. Works in a FIFO basis, but deletes
elements exceeding buffer size."""
while len(self._buffer) >= self._buffer_size:
self._buffer.pop(0)
self._buffer.append(value)
if self._filepath:
with open(self._filepath,"a") as file:
if isinstance(value,List):
file.write(','.join([str(v) for v in value]) + '\n')
elif isinstance(value,Dict):
file.write(','.join([str(v) for k,v in value.items()])+'\n')
else:
file.write(str(value) + '\n')
def pop(self)->Any:
"""Returns the top value in the buffer and deletes it."""
return self._buffer.pop(-1)
def flush(self)->List[Any]:
"""Return the whole buffer and clear it."""
temp = self._buffer.copy()
self._buffer = []
return temp
def is_full(self)->bool:
return len(self._buffer) == self._buffer_size
def current_size(self)->int:
return len(self._buffer)
def __str__(self)->str:
"""Copies the numpy array representation style of only showing the first
and last three values."""
if len(self._buffer) > 6:
return F"[{''.join(self._buffer[:3])} ... {','.join(self._buffer[-3:])}]"
else:
return str(self._buffer)