Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""
Dispatch
========
"""Dispatch

Contains factory method for selecting dispatcher type based on Simvue Configuration
"""
Expand All @@ -20,11 +18,11 @@

def Dispatcher(
mode: typing.Literal["direct", "queued"],
callback: typing.Callable[[list[typing.Any], str, dict[str, typing.Any]], None],
callback: typing.Callable[[list[typing.Any], str], None],
object_types: list[str],
termination_trigger: "Event",
name: str | None = None,
**kwargs,
thresholds: dict[str, int | float] | None = None,
) -> "DispatcherBaseClass":
"""Returns instance of dispatcher based on configuration

Expand All @@ -46,6 +44,10 @@ def Dispatcher(
event which triggers termination of the dispatcher
name : str | None, optional
name for the underlying thread, default None
thresholds: dict[str, int | float] | None, default None
if metadata is provided during item addition, specify
thresholds under which dispatch of an item is permitted,
default is None

Returns
-------
Expand All @@ -58,7 +60,7 @@ def Dispatcher(
callback=callback,
object_types=object_types,
termination_trigger=termination_trigger,
**kwargs,
thresholds=thresholds,
)
else:
logger.debug("Using queued dispatch for metric and queue sending")
Expand All @@ -67,5 +69,5 @@ def Dispatcher(
object_types=object_types,
termination_trigger=termination_trigger,
name=name,
**kwargs,
thresholds=thresholds,
)
103 changes: 103 additions & 0 deletions simvue/dispatch/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import threading
import abc
import typing

from simvue.exception import ObjectDispatchError


class DispatcherBaseClass(abc.ABC):
"""Base class to all dispatchers.

A dispatcher is an object which sends data to a location,
in this case it executes a callback based on criteria.
"""

def __init__(
self,
*,
callback: typing.Callable[[list[typing.Any], str], None],
object_types: list[str],
termination_trigger: threading.Event,
thresholds: dict[str, int | float] | None = None,
) -> None:
"""Initialise a dispatcher.

Parameters
----------
callback : Callable[[list[Any]], str] | None
callback to execute on data.
object_types : list[str]
categories of items for separate handling
termination_trigger : Event
trigger for closing this dispatcher
thresholds : dict[str, int | float] | None, optional
any additional thresholds to consider when handling items.
This assumes metadata defining the values to compare to
such thresholds is included when appending.
"""
super().__init__()
self._thresholds: dict[str, int | float] = thresholds or {}
self._object_types: list[str] = object_types
self._termination_trigger = termination_trigger
self._callback = callback

def add_item(
self,
item: typing.Any,
*,
object_type: str,
metadata: dict[str, int | float] | None = None,
**__,
) -> None:
"""Add an item to the dispatcher.

Parameters
----------
item : Any
item to add to dispatch
object_type : str
category of item
metadata : dict[str, int | float] | None, optional
additional metadata relating to the item to be
used for threshold comparisons
"""
_ = item
_ = object_type
if not metadata:
return
for key, threshold in self._thresholds.items():
if key in metadata and metadata[key] > threshold:
raise ObjectDispatchError(
label=key, threshold=threshold, value=metadata[key]
)

@abc.abstractmethod
def run(self) -> None:
"""Start the dispatcher."""
pass

@abc.abstractmethod
def start(self) -> None:
"""Not used, this allows the class to be similar to a thread."""
pass

@abc.abstractmethod
def join(self) -> None:
"""Not used, this allows the class to be similar to a thread."""
pass

@abc.abstractmethod
def purge(self) -> None:
"""Clear the dispatcher of items."""
pass

@abc.abstractmethod
def is_alive(self) -> bool:
"""Whether the dispatcher is operating correctly."""
pass

@property
@abc.abstractmethod
def empty(self) -> bool:
"""Whether the dispatcher is empty."""
pass
18 changes: 16 additions & 2 deletions simvue/factory/dispatch/direct.py → simvue/dispatch/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ class DirectDispatcher(DispatcherBaseClass):

def __init__(
self,
*,
callback: typing.Callable[[list[typing.Any], str], None],
object_types: list[str],
termination_trigger: threading.Event,
**_,
thresholds: dict[str, int | float] | None = None,
) -> None:
"""Initialise a new DirectDispatcher instance

Expand All @@ -24,15 +25,28 @@ def __init__(
categories, this is mainly used for creation of queues in a QueueDispatcher
termination_trigger : Event
event which triggers termination of the dispatcher
thresholds: int | float
if metadata is provided during item addition, specify
thresholds under which dispatch of an item is permitted,
default is None
"""
super().__init__(
callback=callback,
object_types=object_types,
termination_trigger=termination_trigger,
thresholds=thresholds,
)

def add_item(self, item: typing.Any, object_type: str, *_, **__) -> None:
def add_item(
self,
item: typing.Any,
*,
object_type: str,
metadata: dict[str, int | float] | None = None,
**__,
) -> None:
"""Execute callback on the given item"""
super().add_item(item, object_type=object_type, metadata=metadata)
self._callback([item], object_type)

def run(self) -> None:
Expand Down
46 changes: 36 additions & 10 deletions simvue/factory/dispatch/queued.py → simvue/dispatch/queued.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ class QueuedDispatcher(threading.Thread, DispatcherBaseClass):

def __init__(
self,
*,
callback: typing.Callable[[list[typing.Any], str], None],
object_types: list[str],
termination_trigger: threading.Event,
name: str | None = None,
max_buffer_size: int = MAX_BUFFER_SIZE,
max_read_rate: float = MAX_REQUESTS_PER_SECOND,
thresholds: dict[str, int | float] | None = None,
) -> None:
"""
Initialise a new queue based dispatcher
Expand All @@ -58,34 +60,47 @@ def __init__(
maximum number of items allowed in created buffer.
max_read_rate : float
maximum rate at which the callback can be executed
thresholds: dict[str, int | float] | None, optional
if metadata is provided during item addition, specify
thresholds within which a single dispatch is permitted,
default is None
"""
DispatcherBaseClass.__init__(
self,
callback=callback,
object_types=object_types,
termination_trigger=termination_trigger,
thresholds=thresholds,
)
super().__init__(name=name, daemon=True)

self._termination_trigger = termination_trigger
self._callback = callback
self._queues = {label: queue.Queue() for label in object_types}
self._max_read_rate = max_read_rate
self._max_buffer_size = max_buffer_size
self._send_timer = 0
self._termination_trigger: threading.Event = termination_trigger
self._callback: typing.Callable[[list[typing.Any], str], None] = callback
self._queues: dict[str, queue.Queue[typing.Any]] = {
label: queue.Queue() for label in object_types
}
self._max_read_rate: float = max_read_rate
self._max_buffer_size: int = max_buffer_size
self._send_timer: int = 0

def add_item(
self, item: typing.Any, object_type: str, blocking: bool = True
self,
item: typing.Any,
*,
object_type: str,
blocking: bool = True,
metadata: dict[str, int | float] | None = None,
) -> None:
"""Add an item to the specified queue with/without blocking"""
super().add_item(item, object_type=object_type, metadata=metadata)
if self._termination_trigger.is_set():
raise RuntimeError(
f"Cannot append item '{item}' to queue '{object_type}', "
"termination called."
+ "termination called."
)
if object_type not in self._queues:
raise KeyError(f"No queue '{object_type}' found")
self._queues[object_type].put(item, block=blocking)
self._queues[object_type].put((item, metadata or {}), block=blocking)

@property
def empty(self) -> bool:
Expand All @@ -111,12 +126,23 @@ def _create_buffer(self, queue_label: str) -> list[typing.Any]:
The length of the buffer is constrained.
"""
_buffer: list[typing.Any] = []
_criteria: dict[str, int | float] = {}
_threshold_totals: dict[str, float] = {k: 0 for k in self._thresholds}

while (
not self._queues[queue_label].empty()
and len(_buffer) < self._max_buffer_size
and all(
_threshold_totals[key] < self._thresholds[key]
for key in _threshold_totals
)
):
_item = self._queues[queue_label].get(block=False)
_item, _metadata = typing.cast(
"tuple[typing.Any, dict[str, int | float]]",
self._queues[queue_label].get(block=False),
)
for key in _threshold_totals:
_threshold_totals[key] += _metadata.get(key, 0)
_buffer.append(_item)
self._queues[queue_label].task_done()

Expand Down
11 changes: 10 additions & 1 deletion simvue/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,13 @@ def __init__(self, obj_type: str, name: str, extra: str | None = None) -> None:
class SimvueRunError(RuntimeError):
"""A special sub-class of runtime error specifically for Simvue run errors"""

pass

class ObjectDispatchError(Exception):
"""Raised if object dispatch failed due to condition."""

def __init__(self, label: str, threshold: int | float, value: int | float) -> None:
self.msg = (
f"Object dispatch failed, {label} "
+ f"of {value} exceeds maximum permitted value of {threshold}"
)
super().__init__(self.msg)
6 changes: 0 additions & 6 deletions simvue/factory/__init__.py

This file was deleted.

46 changes: 0 additions & 46 deletions simvue/factory/dispatch/base.py

This file was deleted.

Loading
Loading