The events application is created upon a Kafka StatefulSet.
The Kafka Manager is preconfigured and accessible at the address https://events.MYDOMAIN.
Override the broker server configuration file to change most of the relevant configurations.
The following allows to call/test to Kafka locally. It is useful to test and debug an application which listens/writes to the queue
Kafka broker to local 9092
kubectl port-forward --namespace mnp $(kubectl get po -n mnp | grep kafka-0 | \awk '{print $1;}') 9092:9092
Also add to your hosts file
127.0.0.1 kafka-0.broker.mnp.svc.cluster.local bootstrap.mnp.svc.cluster.local
Data change events are a special kind of event used to notify the system that some data is created/changed/deleted.
The best way to send a CDC Event is by a decorator in a service function:
from cloudharness.events.decorators import send_event
@send_event(message_type="my_object", operation="create")
def create_myobject(self, body):
created_object = ... # database logic
return created_objectThe above event can be consumed as:
from cloudharness.events.client import EventClient
from cloudharness.models import CDCEvent
def handler(app, event_client, message: CDCEvent):
...
event_client = EventClient("my_object")
event_client.async_consume(handler=handler, group_id="ch-notifications")For a concrete code example of the CDC events, see the notification application
from cloudharness.events.client import EventClient
def my_callback(event_client, message):
...
client = EventClient("my-topic")
client.async_consume(group_id="my-group", handler=my_callback)from cloudharness.workflows.utils import notify_queue
my_message = {"a": "b"}
notify_queue("my-topic", my_message)