Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions btplotting/analyzers/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ class LivePlotAnalyzer(bt.Analyzer):
params = (
('scheme', Blackly()),
('style', 'bar'),
('lookback', 23),
('lookback', 0),
('address', 'localhost'),
('port', 80),
('port', 1234),
('title', None),
('interval', 0.2),
('interval', 0.1),
('paused_at_beginning', False),
('debug', False)
)

def __init__(self, iplot=True, autostart=False, **kwargs):
Expand Down
186 changes: 115 additions & 71 deletions btplotting/live/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from bokeh.layouts import column, row, layout
from bokeh.models import Select, Spacer, Tabs, Button, Slider
from bokeh.document import without_document_lock

from .datahandler import LiveDataHandler
from ..tabs import ConfigTab
Expand All @@ -14,14 +15,13 @@


class LiveClient:

'''
LiveClient provides live plotting functionality.
'''

NAV_BUTTON_WIDTH = 35

def __init__(self, doc, app, strategy, lookback, paused_at_beginning, interval=0.5):
def __init__(self, doc, app, strategy, lookback, paused_at_beginning, interval=0.01):
self._app = app
self._doc = doc
self._strategy = strategy
Expand Down Expand Up @@ -67,10 +67,42 @@ def _t_thread(self):
if len(self._strategy) == self._lastlen:
continue
self._lastlen = len(self._strategy)
self._datahandler.update()
self.refresh()
if self._doc.session_context:
self._doc.add_next_tick_callback(self.safe_update)
time.sleep(self._interval)

def rate_limited(max_per_second):
min_interval = 1.0 / float(max_per_second)
def decorate(func):
last_time_called = [0.0]
@wraps(func)
def rate_limited_function(*args, **kwargs):
elapsed = time.time() - last_time_called[0]
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
ret = func(*args, **kwargs)
last_time_called[0] = time.time()
return ret
return rate_limited_function
return decorate


@without_document_lock
def safe_update(self):
try:
# Instead of directly updating, schedule the update for the next tick
self._doc.add_next_tick_callback(self._perform_update)
except Exception as e:
_logger.error(f"Error during safe_update: {e}")

def _perform_update(self):
try:
self._datahandler.update()
self.refresh()
except Exception as e:
_logger.error(f"Error during _perform_update: {e}")

def _createmodel(self):

def on_select_filterdata(self, a, old, new):
Expand All @@ -85,24 +117,23 @@ def on_click_nav_action(self):
self._pause()
else:
self._resume()
update_nav_buttons(self)
self._doc.add_next_tick_callback(partial(update_nav_buttons, self))

def on_click_nav_prev(self, steps=1):
self._pause()
self._set_data_by_idx(self._datahandler.get_last_idx() - steps)
update_nav_buttons(self)
self._doc.add_next_tick_callback(partial(self._set_data_by_idx, self._datahandler.get_last_idx() - steps))
self._doc.add_next_tick_callback(partial(update_nav_buttons, self))

def on_click_nav_next(self, steps=1):
self._pause()
self._set_data_by_idx(self._datahandler.get_last_idx() + steps)
update_nav_buttons(self)
self._doc.add_next_tick_callback(partial(self._set_data_by_idx, self._datahandler.get_last_idx() + steps))
self._doc.add_next_tick_callback(partial(update_nav_buttons, self))

def refresh(self, now=False):
if now:
update_nav_buttons(self)
else:
self._doc.add_next_tick_callback(
partial(update_nav_buttons, self))
self._doc.add_next_tick_callback(partial(update_nav_buttons, self))

def reset_nav_buttons(self):
btn_nav_prev.disabled = True
Expand All @@ -112,26 +143,29 @@ def reset_nav_buttons(self):
btn_nav_action.label = '❙❙'

def update_nav_buttons(self):
last_idx = self._datahandler.get_last_idx()
last_avail_idx = self._app.get_last_idx(self._figid)

if self._paused:
btn_nav_action.label = '▶'
else:
btn_nav_action.label = '❙❙'

if last_idx < self.lookback:
btn_nav_prev.disabled = True
btn_nav_prev_big.disabled = True
else:
btn_nav_prev.disabled = False
btn_nav_prev_big.disabled = False
if last_idx >= last_avail_idx:
btn_nav_next.disabled = True
btn_nav_next_big.disabled = True
else:
btn_nav_next.disabled = False
btn_nav_next_big.disabled = False
try:
last_idx = self._datahandler.get_last_idx()
last_avail_idx = self._app.get_last_idx(self._figid)

if self._paused:
btn_nav_action.label = '▶'
else:
btn_nav_action.label = '❙❙'

if last_idx < self.lookback:
btn_nav_prev.disabled = True
btn_nav_prev_big.disabled = True
else:
btn_nav_prev.disabled = False
btn_nav_prev_big.disabled = False
if last_idx >= last_avail_idx:
btn_nav_next.disabled = True
btn_nav_next_big.disabled = True
else:
btn_nav_next.disabled = False
btn_nav_next_big.disabled = False
except Exception as e:
_logger.error(f"Error during update_nav_buttons: {e}")

# filter selection
datanames = get_datanames(self._strategy)
Expand Down Expand Up @@ -190,7 +224,7 @@ def update_nav_buttons(self):
],
sizing_mode='stretch_width')

# return model and a refrash function
# return model and a refresh function
return model, partial(refresh, self)

def _get_filterdata(self):
Expand All @@ -202,26 +236,24 @@ def _get_filterdata(self):
return res

def _get_tabs(self):
# return self.model.select_one({'id': 'tabs'})
return self.model.select_one({'type': Tabs})

def _set_data_by_idx(self, idx=None):
# if a index is provided, ensure that index is within data range
if idx:
# don't allow idx to be smaller than lookback - 1
idx = max(idx, self.lookback - 1)
# don't allow idx to be bigger than max idx
last_avail_idx = self._app.get_last_idx(self._figid)
idx = min(idx, last_avail_idx)

clk = self._figurepage.data_clock._get_clk()
# create DataFrame based on last index with length of lookback
end = self._figurepage.data_clock.get_dt_at_idx(clk, idx)
df = self._app.get_data(
end=end,
figid=self._figid,
back=self.lookback)
self._datahandler.set_df(df)
try:
if idx:
idx = max(idx, self.lookback - 1)
last_avail_idx = self._app.get_last_idx(self._figid)
idx = min(idx, last_avail_idx)

clk = self._figurepage.data_clock._get_clk()
end = self._figurepage.data_clock.get_dt_at_idx(clk, idx)
df = self._app.get_data(
end=end,
figid=self._figid,
back=self.lookback)
self._datahandler.set_df(df)
except Exception as e:
_logger.error(f"Error during _set_data_by_idx: {e}")

def _pause(self):
self._paused = True
Expand All @@ -247,31 +279,43 @@ def is_paused(self):
return self._paused

def refresh(self):
if self._refresh_fnc:
self._refresh_fnc(False)
try:
if self._refresh_fnc:
self._refresh_fnc(False)
except Exception as e:
_logger.error(f"Error during refresh: {e}")

def refreshmodel(self):
if self._datahandler is not None:
self._datahandler.stop()
self._app.update_figurepage(filterdata=self._get_filterdata())
self._datahandler = LiveDataHandler(self)
tab_panels = self._app.generate_bokeh_model_tab_panels()
for t in self._app.tabs:
tab = t(self._app, self._figurepage, self)
if tab.is_useable():
tab_panels.append(tab.get_tab_panel())
self._get_tabs().tabs = list(filter(None.__ne__, tab_panels))
self.refresh()
try:
if self._datahandler is not None:
self._datahandler.stop()
self._app.update_figurepage(filterdata=self._get_filterdata())
self._datahandler = LiveDataHandler(self)
tab_panels = self._app.generate_bokeh_model_tab_panels()
for t in self._app.tabs:
tab = t(self._app, self._figurepage, self)
if tab.is_useable():
tab_panels.append(tab.get_tab_panel())
self._get_tabs().tabs = list(filter(None.__ne__, tab_panels))
self.refresh()
except Exception as e:
_logger.error(f"Error during refreshmodel: {e}")

def next(self):
if self._interval != 0:
return
if len(self._strategy) == self._lastlen:
return
self._lastlen = len(self._strategy)
self._datahandler.update()
try:
if self._interval != 0:
return
if len(self._strategy) == self._lastlen:
return
self._lastlen = len(self._strategy)
self._datahandler.update()
except Exception as e:
_logger.error(f"Error during next: {e}")

def stop(self):
self._running = False
self._thread.join()
self._datahandler.stop()
try:
self._running = False
self._thread.join()
self._datahandler.stop()
except Exception as e:
_logger.error(f"Error during stop: {e}")
Loading