From c0aa7d1a7ba4a1780a87e808b40a2bfb409fb3ad Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 11 Nov 2025 18:24:55 +0000 Subject: [PATCH 1/9] feat(node-kafka): Port kafka node-type to C++ and simplify it Signed-off-by: Steffen Vogel --- CMakeLists.txt | 6 +- .../schemas/config/nodes/kafka.yaml | 4 +- etc/examples/nodes/kafka.conf | 36 +- lib/nodes/CMakeLists.txt | 2 +- lib/nodes/kafka.cpp | 851 +++++++----------- lib/nodes/kafka_old.cpp | 600 ++++++++++++ 6 files changed, 980 insertions(+), 519 deletions(-) create mode 100644 lib/nodes/kafka_old.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index cb64b0cce..df9dde8ea 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,7 +103,7 @@ pkg_check_modules(LIB60870 IMPORTED_TARGET lib60870>=2.3.1) pkg_check_modules(LIBCONFIG IMPORTED_TARGET libconfig>=1.4.9) pkg_check_modules(MOSQUITTO IMPORTED_TARGET libmosquitto>=1.6.9) pkg_check_modules(MODBUS IMPORTED_TARGET libmodbus>=3.1.0) -pkg_check_modules(RDKAFKA IMPORTED_TARGET rdkafka>=1.5.0) +pkg_check_modules(RDKAFKAPP IMPORTED_TARGET rdkafka++>=1.5.0) pkg_check_modules(HIREDIS IMPORTED_TARGET hiredis>=1.0.0) pkg_check_modules(REDISPP IMPORTED_TARGET redis++>=1.2.0) pkg_check_modules(RABBITMQ_C IMPORTED_TARGET librabbitmq>=0.8.0) @@ -193,7 +193,7 @@ cmake_dependent_option(WITH_NODE_IEC60870 "Build with iec60870 node-types" cmake_dependent_option(WITH_NODE_IEC61850 "Build with iec61850 node-types" "${WITH_DEFAULTS}" "LIBIEC61850_FOUND; NOT WITHOUT_GPL" OFF) cmake_dependent_option(WITH_NODE_INFINIBAND "Build with infiniband node-type" "${WITH_DEFAULTS}" "IBVerbs_FOUND; RDMACM_FOUND" OFF) # Infiniband node-type is currenly broken cmake_dependent_option(WITH_NODE_INFLUXDB "Build with influxdb node-type" "${WITH_DEFAULTS}" "" OFF) -cmake_dependent_option(WITH_NODE_KAFKA "Build with kafka node-type" "${WITH_DEFAULTS}" "RDKAFKA_FOUND" OFF) +cmake_dependent_option(WITH_NODE_KAFKA "Build with kafka node-type" "${WITH_DEFAULTS}" "RDKAFKAPP_FOUND" OFF) cmake_dependent_option(WITH_NODE_LOOPBACK "Build with loopback node-type" "${WITH_DEFAULTS}" "" OFF) cmake_dependent_option(WITH_NODE_MODBUS "Build with modbus node-type" "${WITH_DEFAULTS}" "MODBUS_FOUND" OFF) cmake_dependent_option(WITH_NODE_MQTT "Build with mqtt node-type" "${WITH_DEFAULTS}" "MOSQUITTO_FOUND" OFF) @@ -309,7 +309,7 @@ add_feature_info(NODE_MODBUS WITH_NODE_MODBUS "Build with add_feature_info(NODE_MQTT WITH_NODE_MQTT "Build with mqtt node-type") add_feature_info(NODE_NANOMSG WITH_NODE_NANOMSG "Build with nanomsg node-type") add_feature_info(NODE_NGSI WITH_NODE_NGSI "Build with ngsi node-type") -add_feature_info(NODE_OPAL_AYSNC WITH_NODE_OPAL_ASYNC "Build with opal.async node-type") +add_feature_info(NODE_OPAL_ASYNC WITH_NODE_OPAL_ASYNC "Build with opal.async node-type") add_feature_info(NODE_OPAL_ORCHESTRA WITH_NODE_OPAL_ORCHESTRA "Build with opal.orchestra node-type") add_feature_info(NODE_OPENDSS WITH_NODE_OPENDSS "Build with opendss node-type") add_feature_info(NODE_REDIS WITH_NODE_REDIS "Build with redis node-type") diff --git a/doc/openapi/components/schemas/config/nodes/kafka.yaml b/doc/openapi/components/schemas/config/nodes/kafka.yaml index a86024f8f..56624e6e6 100644 --- a/doc/openapi/components/schemas/config/nodes/kafka.yaml +++ b/doc/openapi/components/schemas/config/nodes/kafka.yaml @@ -56,7 +56,7 @@ allOf: in: type: object properties: - consume: + topic: type: string description: The Kafka topic to which this node-type will subscribe for receiving messages. @@ -67,7 +67,7 @@ allOf: out: type: object properties: - produce: + topic: type: string description: The Kafka topic to which this node-type will publish messages. diff --git a/etc/examples/nodes/kafka.conf b/etc/examples/nodes/kafka.conf index 4f74d5be6..e17f2ba84 100644 --- a/etc/examples/nodes/kafka.conf +++ b/etc/examples/nodes/kafka.conf @@ -7,27 +7,39 @@ nodes = { format = "json.kafka" - server = "localhost:9094" - protocol = "SASL_SSL" + server = "localhost:9092" + protocol = "PLAINTEXT" client_id = "villas-node" in = { - consume = "test-topic" + topic = "test-topic" group_id = "villas-node" } out = { - produce = "test-topic" + topic = "test-topic" } + } - ssl = { - ca = "/etc/ssl/certs/ca.pem" - } + siggen = { + type = "signal" - sasl = { - mechanisms = "SCRAM-SHA-512" - username = "scram-sha-512-usr" - password = "scram-sha-512-pwd" - } + rate = 20 + values = 5 + signal = "mixed" } } + +paths = ( + { + in = "siggen" + out = "kafka_node" + }, + { + in = "kafka_node" + + hooks = [ + "print" + ] + } +) diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 068ad9fba..efb6d7c02 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -123,7 +123,7 @@ endif() # Enable Kafka support if(WITH_NODE_KAFKA) list(APPEND NODE_SRC kafka.cpp) - list(APPEND LIBRARIES PkgConfig::RDKAFKA) + list(APPEND LIBRARIES PkgConfig::RDKAFKAPP) endif() # Enable Comedi support diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index 1c56e1e5f..5f9a60576 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -1,600 +1,449 @@ /* Node type: kafka. * * Author: Juan Pablo Noreña + * Author: Steffen Vogel * SPDX-FileCopyrightText: 2021 Universidad Nacional de Colombia + * SPDX-FileCopyrightText: 2025 OPAL-RT Germany GmbH * SPDX-License-Identifier: Apache-2.0 */ -#include +#include #include -#include +#include #include +#include +#include #include -#include +#include +#include #include using namespace villas; -using namespace villas::node; using namespace villas::utils; +using namespace villas::node; -// Each process has a list of clients for which a thread invokes the kafka loop -static struct List clients; -static pthread_t thread; static Logger logger; -static void kafka_logger_cb(const rd_kafka_t *rk, int level, const char *fac, - const char *buf) { - - switch (level) { - case LOG_EMERG: - case LOG_CRIT: - case LOG_ERR: - logger->error("{}: {}", fac, buf); - break; - - case LOG_ALERT: - case LOG_WARNING: - logger->warn("{}: {}", fac, buf); - break; - - case LOG_DEBUG: - logger->debug("{}: {}", fac, buf); - break; - - case LOG_NOTICE: - case LOG_INFO: - default: - logger->info("{}: {}", fac, buf); - break; - } -} - -static void kafka_message_cb(void *ctx, const rd_kafka_message_t *msg) { - int ret; - auto *n = (NodeCompat *)ctx; - auto *k = n->getData(); - struct Sample *smps[n->in.vectorize]; - - n->logger->debug("Received a message of {} bytes from broker {}", msg->len, - k->server); - - ret = sample_alloc_many(&k->pool, smps, n->in.vectorize); - if (ret <= 0) { - n->logger->warn("Pool underrun in consumer"); - return; - } - - ret = k->formatter->sscan((char *)msg->payload, msg->len, nullptr, smps, - n->in.vectorize); - if (ret < 0) { - n->logger->warn("Received an invalid message"); - n->logger->warn(" Payload: {}", (char *)msg->payload); - return; - } - - if (ret == 0) { - n->logger->debug("Skip empty message"); - sample_decref_many(smps, n->in.vectorize); - return; - } - - ret = queue_signalled_push_many(&k->queue, (void **)smps, n->in.vectorize); - if (ret < (int)n->in.vectorize) - n->logger->warn("Failed to enqueue samples"); -} - -static void *kafka_loop_thread(void *ctx) { - int ret; - - // Set the cancel type of this thread to async - ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - if (ret != 0) - throw RuntimeError("Unable to set cancel type of Kafka communication " - "thread to asynchronous."); - - while (true) { - for (unsigned i = 0; i < list_length(&clients); i++) { - auto *n = (NodeCompat *)list_at(&clients, i); - auto *k = n->getData(); - - // Execute kafka loop for this client - if (k->consumer.client) { - rd_kafka_message_t *msg = - rd_kafka_consumer_poll(k->consumer.client, k->timeout * 1000); - if (msg) { - kafka_message_cb((void *)n, msg); - rd_kafka_message_destroy(msg); - } - } +class KafkaNode : public Node, public RdKafka::EventCb { + +protected: + // Settings. + std::chrono::milliseconds timeout; + std::string server; // Hostname/IP:Port address of the bootstrap server. + std::string protocol; // Security protocol. + std::string produce; // Producer topic. + std::string consume; // Consumer topic. + std::string client_id; // Client ID. + std::string group_id; // Group ID. + std::string ssl_ca; // SSL CA file. + + struct { + std::unique_ptr client; + std::unique_ptr topic; + } producer; + + struct { + std::unique_ptr client; + std::unique_ptr topic; + std::unique_ptr queue; + int eventFd; + } consumer; + + struct { + std::string mechanisms; // SASL mechanisms. + std::string username; // SSL CA path. + std::string password; // SSL certificate. + } sasl; + + std::unique_ptr formatter; + + int _read(struct Sample *smps[], unsigned cnt) override { + assert(consumer.client != nullptr); + + auto msg = consumer.client->consume(consumer.queue.get(), timeout.count()); + + auto ret = formatter->sscan((char *)msg->payload(), msg->len(), nullptr, + smps, cnt); + if (ret < 0) { + logger->warn("Received an invalid message"); + logger->warn(" Payload: {}", (char *)msg->payload()); + return -1; } - } - - return nullptr; -} - -int villas::node::kafka_reverse(NodeCompat *n) { - auto *k = n->getData(); - - SWAP(k->produce, k->consume); - - return 0; -} - -int villas::node::kafka_init(NodeCompat *n) { - auto *k = n->getData(); - - // Default values - k->server = nullptr; - k->protocol = nullptr; - k->produce = nullptr; - k->consume = nullptr; - k->client_id = nullptr; - k->timeout = 1.0; - - k->consumer.client = nullptr; - k->consumer.group_id = nullptr; - k->producer.client = nullptr; - k->producer.topic = nullptr; - - k->sasl.mechanisms = nullptr; - k->sasl.username = nullptr; - k->sasl.password = nullptr; - - k->ssl.ca = nullptr; - - k->formatter = nullptr; - - return 0; -} - -int villas::node::kafka_parse(NodeCompat *n, json_t *json) { - int ret; - auto *k = n->getData(); - - const char *server; - const char *produce = nullptr; - const char *consume = nullptr; - const char *protocol; - const char *client_id = "villas-node"; - const char *group_id = nullptr; - - json_error_t err; - json_t *json_ssl = nullptr; - json_t *json_sasl = nullptr; - json_t *json_format = nullptr; - - ret = json_unpack_ex(json, &err, 0, - "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: o, s: s, " - "s?: F, s: s, s?: s, s?: o, s?: o }", - "out", "produce", &produce, "in", "consume", &consume, - "group_id", &group_id, "format", &json_format, "server", - &server, "timeout", &k->timeout, "protocol", &protocol, - "client_id", &client_id, "ssl", &json_ssl, "sasl", - &json_sasl); - if (ret) - throw ConfigError(json, err, "node-config-node-kafka"); - - k->server = strdup(server); - k->produce = produce ? strdup(produce) : nullptr; - k->consume = consume ? strdup(consume) : nullptr; - k->protocol = strdup(protocol); - k->client_id = strdup(client_id); - k->consumer.group_id = group_id ? strdup(group_id) : nullptr; - - if (strcmp(protocol, "SSL") && strcmp(protocol, "PLAINTEXT") && - strcmp(protocol, "SASL_SSL") && strcmp(protocol, "SASL_PLAINTEXT")) - throw ConfigError(json, "node-config-node-kafka-protocol", - "Invalid security protocol: {}", protocol); - - if (!k->produce && !k->consume) - throw ConfigError(json, "node-config-node-kafka", - "At least one topic has to be specified for node {}", - n->getName()); - - if (json_ssl) { - const char *ca; - - ret = json_unpack_ex(json_ssl, &err, 0, "{ s: s }", "ca", &ca); - if (ret) - throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", - "Failed to parse SSL configuration of node {}", - n->getName()); - - k->ssl.ca = strdup(ca); - } - - if (json_sasl) { - const char *mechanisms; - const char *username; - const char *password; - - ret = json_unpack_ex(json_sasl, &err, 0, "{ s: s, s: s, s: s }", - "mechanisms", &mechanisms, "username", &username, - "password", &password); - if (ret) - throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", - "Failed to parse SASL configuration"); - - k->sasl.mechanisms = strdup(mechanisms); - k->sasl.username = strdup(username); - k->sasl.password = strdup(password); - } - - // Format - if (k->formatter) - delete k->formatter; - k->formatter = json_format ? FormatFactory::make(json_format) - : FormatFactory::make("villas.binary"); - if (!k->formatter) - throw ConfigError(json_format, "node-config-node-kafka-format", - "Invalid format configuration"); - - return 0; -} -int villas::node::kafka_prepare(NodeCompat *n) { - int ret; - auto *k = n->getData(); - - k->formatter->start(n->getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); - - ret = pool_init(&k->pool, 1024, - SAMPLE_LENGTH(n->getInputSignals(false)->size())); - if (ret) return ret; + } - ret = queue_signalled_init(&k->queue, 1024); - if (ret) - return ret; - - return 0; -} - -char *villas::node::kafka_print(NodeCompat *n) { - auto *k = n->getData(); - - char *buf = nullptr; + int _write(struct Sample *smps[], unsigned cnt) override { + assert(producer.client != nullptr); - strcatf(&buf, "bootstrap.server=%s, client.id=%s, security.protocol=%s", - k->server, k->client_id, k->protocol); + size_t wbytes; - // Only show if not default - if (k->produce) - strcatf(&buf, ", out.produce=%s", k->produce); + char data[DEFAULT_FORMAT_BUFFER_LENGTH]; - if (k->consume) - strcatf(&buf, ", in.consume=%s", k->consume); + auto ret = formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); + if (ret < 0) + return ret; - return buf; -} + if (!produce.empty()) { + auto ret = producer.client->produce( + producer.topic.get(), RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, data, wbytes, NULL, 0, NULL); + if (ret != RdKafka::ErrorCode::ERR_NO_ERROR) { + logger->warn("Publish failed"); + return -abs(ret); + } + } else + logger->warn( + "No produce possible because no produce topic is configured"); -int villas::node::kafka_destroy(NodeCompat *n) { - int ret; - auto *k = n->getData(); + return cnt; + } - if (k->producer.client) - rd_kafka_destroy(k->producer.client); + int startProducer() { + std::string errstr; - if (k->consumer.client) - rd_kafka_destroy(k->consumer.client); + auto conf_prod = createCommonConf(); + if (!conf_prod) + throw MemoryAllocationError(); - if (k->formatter) - delete k->formatter; + producer.client = std::unique_ptr( + RdKafka::Producer::create(conf_prod.get(), errstr)); + if (!producer.client) + throw RuntimeError("{}", errstr); - ret = pool_destroy(&k->pool); - if (ret) - return ret; + auto topic_conf = std::unique_ptr( + RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC)); + if (!topic_conf) + throw MemoryAllocationError(); - ret = queue_signalled_destroy(&k->queue); - if (ret) - return ret; + auto cr = topic_conf->set("acks", "all", errstr); + if (cr != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - if (k->produce) - free(k->produce); + producer.topic = std::unique_ptr(RdKafka::Topic::create( + producer.client.get(), produce, topic_conf.get(), errstr)); + if (!producer.topic) + throw MemoryAllocationError(); - if (k->consume) - free(k->consume); + logger->info("Connected producer to bootstrap server {}", server); - if (k->protocol) - free(k->protocol); + return 0; + } - if (k->client_id) - free(k->client_id); + int startConsumer() { + std::string errstr; - free(k->server); + auto conf_cons = createCommonConf(); + if (!conf_cons) + throw MemoryAllocationError(); - return 0; -} + auto cr = conf_cons->set("group.id", group_id, errstr); + if (cr != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); -int villas::node::kafka_start(NodeCompat *n) { - int ret; - char errstr[1024]; - auto *k = n->getData(); + consumer.client = std::unique_ptr( + RdKafka::Consumer::create(conf_cons.get(), errstr)); + if (!consumer.client) + throw MemoryAllocationError(); - rd_kafka_conf_t *rdkconf = rd_kafka_conf_new(); - if (!rdkconf) - throw MemoryAllocationError(); + consumer.topic = std::unique_ptr(RdKafka::Topic::create( + consumer.client.get(), consume, nullptr, errstr)); + if (!consumer.topic) + throw MemoryAllocationError(); - rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb); + consumer.queue = std::unique_ptr( + RdKafka::Queue::create(consumer.client.get())); + if (!consumer.queue) + throw MemoryAllocationError(); - ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + consumer.eventFd = eventfd(0, 0); - ret = rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + uint64_t incr = 1; + consumer.queue->io_event_enable(consumer.eventFd, &incr, sizeof(incr)); - ret = rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + auto ec = consumer.client->start(consumer.topic.get(), 0, 0, + consumer.queue.get()); + if (ec != RdKafka::ErrorCode::ERR_NO_ERROR) + throw RuntimeError("Error subscribing to {} at {}: {}", consume, server, + RdKafka::err2str(ec)); - if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) { - ret = rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; - } + logger->info("Subscribed consumer from bootstrap server {}", server); - if (!strcmp(k->protocol, "SASL_PLAINTEXT") || - !strcmp(k->protocol, "SASL_SSL")) { - ret = rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanisms, - errstr, sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; - - ret = rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; - - ret = rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + return 0; } - if (k->produce) { - // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, - // so we will need to create a copy first - rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf); - if (!rdkconf_prod) - throw MemoryAllocationError(); - - k->producer.client = - rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr)); - if (!k->producer.client) - goto kafka_config_error; + std::unique_ptr createCommonConf() { + std::string errstr; - rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); - if (!topic_conf) + auto conf = std::unique_ptr( + RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); + if (!conf) throw MemoryAllocationError(); - ret = rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, - sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; - - k->producer.topic = - rd_kafka_topic_new(k->producer.client, k->produce, topic_conf); - if (!k->producer.topic) - throw MemoryAllocationError(); + auto ret = conf->set("event_cb", this, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - n->logger->info("Connected producer to bootstrap server {}", k->server); - } + ret = conf->set("client.id", client_id, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - if (k->consume) { - // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, - // so we will need to create a copy first - rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf); - if (!rdkconf_cons) - throw MemoryAllocationError(); + ret = conf->set("bootstrap.servers", server, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - rd_kafka_topic_partition_list_t *partitions = - rd_kafka_topic_partition_list_new(1); - if (!partitions) - throw MemoryAllocationError(); + ret = conf->set("security.protocol", protocol, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - rd_kafka_topic_partition_t *partition = - rd_kafka_topic_partition_list_add(partitions, k->consume, 0); - if (!partition) - throw RuntimeError("Failed to add new partition"); + if (protocol == "SASL_SSL" || protocol == "SSL") { + ret = conf->set("ssl.ca.location", ssl_ca, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); + } - ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, - errstr, sizeof(errstr)); - if (ret != RD_KAFKA_CONF_OK) - goto kafka_config_error; + if (protocol == "SASL_PLAINTEXT" || protocol == "SASL_SSL") { + ret = conf->set("sasl.mechanisms", sasl.mechanisms, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - k->consumer.client = - rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr)); - if (!k->consumer.client) - throw MemoryAllocationError(); + ret = conf->set("sasl.username", sasl.username, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); - ret = rd_kafka_subscribe(k->consumer.client, partitions); - if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) - throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, - k->server, rd_kafka_err2str((rd_kafka_resp_err_t)ret)); + ret = conf->set("sasl.password", sasl.password, errstr); + if (ret != RdKafka::Conf::CONF_OK) + throw RuntimeError("{}", errstr); + } - n->logger->info("Subscribed consumer from bootstrap server {}", k->server); + return conf; } - // Add client to global list of kafka clients - // so that thread can call kafka loop for this client - list_push(&clients, n); - - rd_kafka_conf_destroy(rdkconf); +public: + KafkaNode(const uuid_t &id = {}, const std::string &name = "") + : Node(id, name), timeout(1000), client_id("villas-node"), producer({}), + consumer({.eventFd = -1}) {} - return 0; + virtual ~KafkaNode() {} -kafka_config_error: - rd_kafka_conf_destroy(rdkconf); + int prepare() override { + formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); - throw RuntimeError("{}", errstr); + return Node::prepare(); + } - return -1; -} + int parse(json_t *json) override { + const char *svr; + const char *prod = nullptr; + const char *cons = nullptr; + const char *proto; + const char *cid = nullptr; + const char *gid = nullptr; + double to = -1; -int villas::node::kafka_stop(NodeCompat *n) { - int ret; - auto *k = n->getData(); + int ret = Node::parse(json); + if (ret) + return ret; + + json_error_t err; + json_t *json_ssl = nullptr; + json_t *json_sasl = nullptr; + json_t *json_format = nullptr; + + ret = json_unpack_ex(json, &err, 0, + "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: o, s: s, " + "s?: F, s: s, s?: s, s?: o, s?: o }", + "out", "topic", &prod, "in", "topic", &cons, + "group_id", &gid, "format", &json_format, "server", + &svr, "timeout", &to, "protocol", &proto, "client_id", + &cid, "ssl", &json_ssl, "sasl", &json_sasl); + if (ret) + throw ConfigError(json, err, "node-config-node-kafka"); - if (k->producer.client) { - ret = rd_kafka_flush(k->producer.client, k->timeout * 1000); - if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) - n->logger->error("Failed to flush messages: {}", - rd_kafka_err2str((rd_kafka_resp_err_t)ret)); + server = svr; + protocol = proto; - /* If the output queue is still not empty there is an issue - * with producing messages to the clusters. */ - if (rd_kafka_outq_len(k->producer.client) > 0) - n->logger->warn("{} message(s) were not delivered", - rd_kafka_outq_len(k->producer.client)); - } + if (prod) + produce = prod; - // Unregister client from global kafka client list - // so that kafka loop is no longer invoked for this client - // important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect - list_remove_all(&clients, n); + if (cons) + consume = cons; - ret = queue_signalled_close(&k->queue); - if (ret) - return ret; + if (cid) + client_id = cid; - return 0; -} + if (gid) + group_id = gid; -int villas::node::kafka_type_start(villas::node::SuperNode *sn) { - int ret; + if (to >= 0) + timeout = std::chrono::milliseconds((int)(to * 1000)); - logger = Log::get("node:kafka"); + if (protocol != "SSL" && protocol != "PLAINTEXT" && + protocol != "SASL_SSL" && protocol != "SASL_PLAINTEXT") + throw ConfigError(json, "node-config-node-kafka-protocol", + "Invalid security protocol: {}", protocol); - ret = list_init(&clients); - if (ret) - goto kafka_error; + if (produce.empty() && consume.empty()) + throw ConfigError(json, "node-config-node-kafka", + "At least one topic has to be specified for node {}", + getName()); - // Start thread here to run kafka loop for registered clients - ret = pthread_create(&thread, nullptr, kafka_loop_thread, nullptr); - if (ret) - goto kafka_error; + if (json_ssl) { + const char *ca = nullptr; - return 0; + ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s }", "ca", &ca); + if (ret) + throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", + "Failed to parse SSL configuration of node {}", + getName()); -kafka_error: - logger->warn("Error initialazing node type kafka"); + if (ca) + ssl_ca = ca; + } - return ret; -} + if (json_sasl) { + const char *mechanisms; + const char *username; + const char *password; + + ret = json_unpack_ex(json_sasl, &err, 0, "{ s: s, s: s, s: s }", + "mechanisms", &mechanisms, "username", &username, + "password", &password); + if (ret) + throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", + "Failed to parse SASL configuration"); + + sasl.mechanisms = mechanisms; + sasl.username = username; + sasl.password = password; + } -int villas::node::kafka_type_stop() { - int ret; + // Format + formatter = std::unique_ptr( + json_format ? FormatFactory::make(json_format) + : FormatFactory::make("villas.binary")); + if (!formatter) + throw ConfigError(json_format, "node-config-node-kafka-format", + "Invalid format configuration"); - // Stop thread here that executes kafka loop - ret = pthread_cancel(thread); - if (ret) - return ret; + return 0; + } - logger->debug( - "Called pthread_cancel() on kafka communication management thread."); + const std::string &getDetails() override { + details = fmt::format("server={}, client_id={}, protocol={}", server, + client_id, protocol); - ret = pthread_join(thread, nullptr); - if (ret) - goto kafka_error; + if (!produce.empty()) + details += fmt::format(", produce={}", produce); - // When this is called the list of clients should be empty - if (list_length(&clients) > 0) - throw RuntimeError( - "List of kafka clients contains elements at time of destruction. Call " - "node_stop for each kafka node before stopping node type!"); + if (!consume.empty()) + details += fmt::format(", consume={}", consume); - ret = list_destroy(&clients, nullptr, false); - if (ret) - goto kafka_error; + return details; + } - return 0; + int start() override { + if (!produce.empty()) { + auto ret = startProducer(); + if (ret) + return ret; + } -kafka_error: - logger->warn("Error stoping node type kafka"); + if (!consume.empty()) { + auto ret = startConsumer(); + if (ret) + return ret; + } - return ret; -} + int ret = Node::start(); + if (!ret) + state = State::STARTED; -int villas::node::kafka_read(NodeCompat *n, struct Sample *const smps[], - unsigned cnt) { - int pulled; - auto *k = n->getData(); - struct Sample *smpt[cnt]; + return 0; + } - pulled = queue_signalled_pull_many(&k->queue, (void **)smpt, cnt); + int stop() override { + int ret = Node::stop(); + if (ret) + return ret; + + if (producer.client) { + auto ret = producer.client->flush(timeout.count()); + if (ret != RdKafka::ErrorCode::ERR_NO_ERROR) + logger->error("Failed to flush messages: {}", RdKafka::err2str(ret)); + + // If the output queue is still not empty there is an issue + // with producing messages to the clusters. + if (producer.client->outq_len() > 0) + logger->warn("{} message(s) were not delivered", + producer.client->outq_len()); + } - sample_copy_many(smps, smpt, pulled); - sample_decref_many(smpt, pulled); + return 0; + } - return pulled; -} + int reverse() override { + SWAP(produce, consume); -int villas::node::kafka_write(NodeCompat *n, struct Sample *const smps[], - unsigned cnt) { - int ret; - auto *k = n->getData(); + return 0; + } - size_t wbytes; + std::vector getPollFDs() override { + if (consumer.eventFd >= 0) + return {consumer.eventFd}; - char data[DEFAULT_FORMAT_BUFFER_LENGTH]; + return {}; + } - ret = k->formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); - if (ret < 0) - return ret; + void event_cb(RdKafka::Event &event) override { + switch (event.type()) { + case RdKafka::Event::EVENT_ERROR: + logger->error("Kafka error: {}", event.str()); + break; + + case RdKafka::Event::EVENT_STATS: + logger->info("Kafka stats: {}", event.str()); + break; + + case RdKafka::Event::EVENT_LOG: + switch (event.severity()) { + case RdKafka::Event::EVENT_SEVERITY_DEBUG: + logger->debug("{}", event.str()); + break; + + case RdKafka::Event::EVENT_SEVERITY_NOTICE: + case RdKafka::Event::EVENT_SEVERITY_INFO: + logger->info("{}", event.str()); + break; + + case RdKafka::Event::EVENT_SEVERITY_ALERT: + case RdKafka::Event::EVENT_SEVERITY_WARNING: + logger->warn("{}", event.str()); + break; + + case RdKafka::Event::EVENT_SEVERITY_ERROR: + case RdKafka::Event::EVENT_SEVERITY_CRITICAL: + case RdKafka::Event::EVENT_SEVERITY_EMERG: + logger->error("{}", event.str()); + break; + } - if (k->produce) { - ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, - RD_KAFKA_MSG_F_COPY, data, wbytes, NULL, 0, NULL); + break; - if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) { - n->logger->warn("Publish failed"); - return -abs(ret); + default: + logger->info("Kafka event {}: {}", (int)event.type(), event.str()); + break; } - } else - n->logger->warn( - "No produce possible because no produce topic is configured"); - - return cnt; -} - -int villas::node::kafka_poll_fds(NodeCompat *n, int fds[]) { - auto *k = n->getData(); - - fds[0] = queue_signalled_fd(&k->queue); - - return 1; -} - -static NodeCompatType p; - -__attribute__((constructor(110))) static void register_plugin() { - p.name = "kafka"; - p.description = "Kafka event message streaming (rdkafka)"; - p.vectorize = 0; - p.size = sizeof(struct kafka); - p.type.start = kafka_type_start; - p.type.stop = kafka_type_stop; - p.destroy = kafka_destroy; - p.prepare = kafka_prepare; - p.parse = kafka_parse; - p.prepare = kafka_prepare; - p.print = kafka_print; - p.init = kafka_init; - p.destroy = kafka_destroy; - p.start = kafka_start; - p.stop = kafka_stop; - p.read = kafka_read; - p.write = kafka_write; - p.reverse = kafka_reverse; - p.poll_fds = kafka_poll_fds; - - static NodeCompatFactory ncp(&p); -} + } +}; + +// Register node +static char n[] = "kafka"; +static char d[] = "Kafka event message streaming (rdkafka)"; +static NodePlugin + p; diff --git a/lib/nodes/kafka_old.cpp b/lib/nodes/kafka_old.cpp new file mode 100644 index 000000000..1c56e1e5f --- /dev/null +++ b/lib/nodes/kafka_old.cpp @@ -0,0 +1,600 @@ +/* Node type: kafka. + * + * Author: Juan Pablo Noreña + * SPDX-FileCopyrightText: 2021 Universidad Nacional de Colombia + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include +#include + +#include +#include +#include +#include + +using namespace villas; +using namespace villas::node; +using namespace villas::utils; + +// Each process has a list of clients for which a thread invokes the kafka loop +static struct List clients; +static pthread_t thread; +static Logger logger; + +static void kafka_logger_cb(const rd_kafka_t *rk, int level, const char *fac, + const char *buf) { + + switch (level) { + case LOG_EMERG: + case LOG_CRIT: + case LOG_ERR: + logger->error("{}: {}", fac, buf); + break; + + case LOG_ALERT: + case LOG_WARNING: + logger->warn("{}: {}", fac, buf); + break; + + case LOG_DEBUG: + logger->debug("{}: {}", fac, buf); + break; + + case LOG_NOTICE: + case LOG_INFO: + default: + logger->info("{}: {}", fac, buf); + break; + } +} + +static void kafka_message_cb(void *ctx, const rd_kafka_message_t *msg) { + int ret; + auto *n = (NodeCompat *)ctx; + auto *k = n->getData(); + struct Sample *smps[n->in.vectorize]; + + n->logger->debug("Received a message of {} bytes from broker {}", msg->len, + k->server); + + ret = sample_alloc_many(&k->pool, smps, n->in.vectorize); + if (ret <= 0) { + n->logger->warn("Pool underrun in consumer"); + return; + } + + ret = k->formatter->sscan((char *)msg->payload, msg->len, nullptr, smps, + n->in.vectorize); + if (ret < 0) { + n->logger->warn("Received an invalid message"); + n->logger->warn(" Payload: {}", (char *)msg->payload); + return; + } + + if (ret == 0) { + n->logger->debug("Skip empty message"); + sample_decref_many(smps, n->in.vectorize); + return; + } + + ret = queue_signalled_push_many(&k->queue, (void **)smps, n->in.vectorize); + if (ret < (int)n->in.vectorize) + n->logger->warn("Failed to enqueue samples"); +} + +static void *kafka_loop_thread(void *ctx) { + int ret; + + // Set the cancel type of this thread to async + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + if (ret != 0) + throw RuntimeError("Unable to set cancel type of Kafka communication " + "thread to asynchronous."); + + while (true) { + for (unsigned i = 0; i < list_length(&clients); i++) { + auto *n = (NodeCompat *)list_at(&clients, i); + auto *k = n->getData(); + + // Execute kafka loop for this client + if (k->consumer.client) { + rd_kafka_message_t *msg = + rd_kafka_consumer_poll(k->consumer.client, k->timeout * 1000); + if (msg) { + kafka_message_cb((void *)n, msg); + rd_kafka_message_destroy(msg); + } + } + } + } + + return nullptr; +} + +int villas::node::kafka_reverse(NodeCompat *n) { + auto *k = n->getData(); + + SWAP(k->produce, k->consume); + + return 0; +} + +int villas::node::kafka_init(NodeCompat *n) { + auto *k = n->getData(); + + // Default values + k->server = nullptr; + k->protocol = nullptr; + k->produce = nullptr; + k->consume = nullptr; + k->client_id = nullptr; + k->timeout = 1.0; + + k->consumer.client = nullptr; + k->consumer.group_id = nullptr; + k->producer.client = nullptr; + k->producer.topic = nullptr; + + k->sasl.mechanisms = nullptr; + k->sasl.username = nullptr; + k->sasl.password = nullptr; + + k->ssl.ca = nullptr; + + k->formatter = nullptr; + + return 0; +} + +int villas::node::kafka_parse(NodeCompat *n, json_t *json) { + int ret; + auto *k = n->getData(); + + const char *server; + const char *produce = nullptr; + const char *consume = nullptr; + const char *protocol; + const char *client_id = "villas-node"; + const char *group_id = nullptr; + + json_error_t err; + json_t *json_ssl = nullptr; + json_t *json_sasl = nullptr; + json_t *json_format = nullptr; + + ret = json_unpack_ex(json, &err, 0, + "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: o, s: s, " + "s?: F, s: s, s?: s, s?: o, s?: o }", + "out", "produce", &produce, "in", "consume", &consume, + "group_id", &group_id, "format", &json_format, "server", + &server, "timeout", &k->timeout, "protocol", &protocol, + "client_id", &client_id, "ssl", &json_ssl, "sasl", + &json_sasl); + if (ret) + throw ConfigError(json, err, "node-config-node-kafka"); + + k->server = strdup(server); + k->produce = produce ? strdup(produce) : nullptr; + k->consume = consume ? strdup(consume) : nullptr; + k->protocol = strdup(protocol); + k->client_id = strdup(client_id); + k->consumer.group_id = group_id ? strdup(group_id) : nullptr; + + if (strcmp(protocol, "SSL") && strcmp(protocol, "PLAINTEXT") && + strcmp(protocol, "SASL_SSL") && strcmp(protocol, "SASL_PLAINTEXT")) + throw ConfigError(json, "node-config-node-kafka-protocol", + "Invalid security protocol: {}", protocol); + + if (!k->produce && !k->consume) + throw ConfigError(json, "node-config-node-kafka", + "At least one topic has to be specified for node {}", + n->getName()); + + if (json_ssl) { + const char *ca; + + ret = json_unpack_ex(json_ssl, &err, 0, "{ s: s }", "ca", &ca); + if (ret) + throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", + "Failed to parse SSL configuration of node {}", + n->getName()); + + k->ssl.ca = strdup(ca); + } + + if (json_sasl) { + const char *mechanisms; + const char *username; + const char *password; + + ret = json_unpack_ex(json_sasl, &err, 0, "{ s: s, s: s, s: s }", + "mechanisms", &mechanisms, "username", &username, + "password", &password); + if (ret) + throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", + "Failed to parse SASL configuration"); + + k->sasl.mechanisms = strdup(mechanisms); + k->sasl.username = strdup(username); + k->sasl.password = strdup(password); + } + + // Format + if (k->formatter) + delete k->formatter; + k->formatter = json_format ? FormatFactory::make(json_format) + : FormatFactory::make("villas.binary"); + if (!k->formatter) + throw ConfigError(json_format, "node-config-node-kafka-format", + "Invalid format configuration"); + + return 0; +} + +int villas::node::kafka_prepare(NodeCompat *n) { + int ret; + auto *k = n->getData(); + + k->formatter->start(n->getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); + + ret = pool_init(&k->pool, 1024, + SAMPLE_LENGTH(n->getInputSignals(false)->size())); + if (ret) + return ret; + + ret = queue_signalled_init(&k->queue, 1024); + if (ret) + return ret; + + return 0; +} + +char *villas::node::kafka_print(NodeCompat *n) { + auto *k = n->getData(); + + char *buf = nullptr; + + strcatf(&buf, "bootstrap.server=%s, client.id=%s, security.protocol=%s", + k->server, k->client_id, k->protocol); + + // Only show if not default + if (k->produce) + strcatf(&buf, ", out.produce=%s", k->produce); + + if (k->consume) + strcatf(&buf, ", in.consume=%s", k->consume); + + return buf; +} + +int villas::node::kafka_destroy(NodeCompat *n) { + int ret; + auto *k = n->getData(); + + if (k->producer.client) + rd_kafka_destroy(k->producer.client); + + if (k->consumer.client) + rd_kafka_destroy(k->consumer.client); + + if (k->formatter) + delete k->formatter; + + ret = pool_destroy(&k->pool); + if (ret) + return ret; + + ret = queue_signalled_destroy(&k->queue); + if (ret) + return ret; + + if (k->produce) + free(k->produce); + + if (k->consume) + free(k->consume); + + if (k->protocol) + free(k->protocol); + + if (k->client_id) + free(k->client_id); + + free(k->server); + + return 0; +} + +int villas::node::kafka_start(NodeCompat *n) { + int ret; + char errstr[1024]; + auto *k = n->getData(); + + rd_kafka_conf_t *rdkconf = rd_kafka_conf_new(); + if (!rdkconf) + throw MemoryAllocationError(); + + rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb); + + ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + ret = rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + ret = rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) { + ret = rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + } + + if (!strcmp(k->protocol, "SASL_PLAINTEXT") || + !strcmp(k->protocol, "SASL_SSL")) { + ret = rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanisms, + errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + ret = rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + ret = rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + } + + if (k->produce) { + // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, + // so we will need to create a copy first + rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf); + if (!rdkconf_prod) + throw MemoryAllocationError(); + + k->producer.client = + rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr)); + if (!k->producer.client) + goto kafka_config_error; + + rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); + if (!topic_conf) + throw MemoryAllocationError(); + + ret = rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, + sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + k->producer.topic = + rd_kafka_topic_new(k->producer.client, k->produce, topic_conf); + if (!k->producer.topic) + throw MemoryAllocationError(); + + n->logger->info("Connected producer to bootstrap server {}", k->server); + } + + if (k->consume) { + // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, + // so we will need to create a copy first + rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf); + if (!rdkconf_cons) + throw MemoryAllocationError(); + + rd_kafka_topic_partition_list_t *partitions = + rd_kafka_topic_partition_list_new(1); + if (!partitions) + throw MemoryAllocationError(); + + rd_kafka_topic_partition_t *partition = + rd_kafka_topic_partition_list_add(partitions, k->consume, 0); + if (!partition) + throw RuntimeError("Failed to add new partition"); + + ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, + errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + k->consumer.client = + rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr)); + if (!k->consumer.client) + throw MemoryAllocationError(); + + ret = rd_kafka_subscribe(k->consumer.client, partitions); + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) + throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, + k->server, rd_kafka_err2str((rd_kafka_resp_err_t)ret)); + + n->logger->info("Subscribed consumer from bootstrap server {}", k->server); + } + + // Add client to global list of kafka clients + // so that thread can call kafka loop for this client + list_push(&clients, n); + + rd_kafka_conf_destroy(rdkconf); + + return 0; + +kafka_config_error: + rd_kafka_conf_destroy(rdkconf); + + throw RuntimeError("{}", errstr); + + return -1; +} + +int villas::node::kafka_stop(NodeCompat *n) { + int ret; + auto *k = n->getData(); + + if (k->producer.client) { + ret = rd_kafka_flush(k->producer.client, k->timeout * 1000); + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) + n->logger->error("Failed to flush messages: {}", + rd_kafka_err2str((rd_kafka_resp_err_t)ret)); + + /* If the output queue is still not empty there is an issue + * with producing messages to the clusters. */ + if (rd_kafka_outq_len(k->producer.client) > 0) + n->logger->warn("{} message(s) were not delivered", + rd_kafka_outq_len(k->producer.client)); + } + + // Unregister client from global kafka client list + // so that kafka loop is no longer invoked for this client + // important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect + list_remove_all(&clients, n); + + ret = queue_signalled_close(&k->queue); + if (ret) + return ret; + + return 0; +} + +int villas::node::kafka_type_start(villas::node::SuperNode *sn) { + int ret; + + logger = Log::get("node:kafka"); + + ret = list_init(&clients); + if (ret) + goto kafka_error; + + // Start thread here to run kafka loop for registered clients + ret = pthread_create(&thread, nullptr, kafka_loop_thread, nullptr); + if (ret) + goto kafka_error; + + return 0; + +kafka_error: + logger->warn("Error initialazing node type kafka"); + + return ret; +} + +int villas::node::kafka_type_stop() { + int ret; + + // Stop thread here that executes kafka loop + ret = pthread_cancel(thread); + if (ret) + return ret; + + logger->debug( + "Called pthread_cancel() on kafka communication management thread."); + + ret = pthread_join(thread, nullptr); + if (ret) + goto kafka_error; + + // When this is called the list of clients should be empty + if (list_length(&clients) > 0) + throw RuntimeError( + "List of kafka clients contains elements at time of destruction. Call " + "node_stop for each kafka node before stopping node type!"); + + ret = list_destroy(&clients, nullptr, false); + if (ret) + goto kafka_error; + + return 0; + +kafka_error: + logger->warn("Error stoping node type kafka"); + + return ret; +} + +int villas::node::kafka_read(NodeCompat *n, struct Sample *const smps[], + unsigned cnt) { + int pulled; + auto *k = n->getData(); + struct Sample *smpt[cnt]; + + pulled = queue_signalled_pull_many(&k->queue, (void **)smpt, cnt); + + sample_copy_many(smps, smpt, pulled); + sample_decref_many(smpt, pulled); + + return pulled; +} + +int villas::node::kafka_write(NodeCompat *n, struct Sample *const smps[], + unsigned cnt) { + int ret; + auto *k = n->getData(); + + size_t wbytes; + + char data[DEFAULT_FORMAT_BUFFER_LENGTH]; + + ret = k->formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); + if (ret < 0) + return ret; + + if (k->produce) { + ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, data, wbytes, NULL, 0, NULL); + + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) { + n->logger->warn("Publish failed"); + return -abs(ret); + } + } else + n->logger->warn( + "No produce possible because no produce topic is configured"); + + return cnt; +} + +int villas::node::kafka_poll_fds(NodeCompat *n, int fds[]) { + auto *k = n->getData(); + + fds[0] = queue_signalled_fd(&k->queue); + + return 1; +} + +static NodeCompatType p; + +__attribute__((constructor(110))) static void register_plugin() { + p.name = "kafka"; + p.description = "Kafka event message streaming (rdkafka)"; + p.vectorize = 0; + p.size = sizeof(struct kafka); + p.type.start = kafka_type_start; + p.type.stop = kafka_type_stop; + p.destroy = kafka_destroy; + p.prepare = kafka_prepare; + p.parse = kafka_parse; + p.prepare = kafka_prepare; + p.print = kafka_print; + p.init = kafka_init; + p.destroy = kafka_destroy; + p.start = kafka_start; + p.stop = kafka_stop; + p.read = kafka_read; + p.write = kafka_write; + p.reverse = kafka_reverse; + p.poll_fds = kafka_poll_fds; + + static NodeCompatFactory ncp(&p); +} From 7d948f6b88b863d89df066b4493d5b2c5cd8a848 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Thu, 27 Nov 2025 19:18:43 +0100 Subject: [PATCH 2/9] Harmonize parsing of signal lists Signed-off-by: Steffen Vogel --- include/villas/signal_list.hpp | 19 +++---- lib/hooks/lua.cpp | 2 + lib/node_direction.cpp | 47 +--------------- lib/signal_list.cpp | 100 ++++++++++++++++++--------------- 4 files changed, 68 insertions(+), 100 deletions(-) diff --git a/include/villas/signal_list.hpp b/include/villas/signal_list.hpp index 210a11bbb..aa560dc91 100644 --- a/include/villas/signal_list.hpp +++ b/include/villas/signal_list.hpp @@ -8,6 +8,8 @@ #pragma once #include +#include +#include #include @@ -24,16 +26,9 @@ class SignalList : public std::vector { using Ptr = std::shared_ptr; SignalList() {} - + SignalList(json_t *json); + SignalList(std::string_view dt); SignalList(unsigned len, enum SignalType fmt); - SignalList(const char *dt); - SignalList(json_t *json) { - int ret = parse(json); - if (ret) - throw RuntimeError("Failed to parse signal list"); - } - - int parse(json_t *json); Ptr clone(); @@ -42,9 +37,11 @@ class SignalList : public std::vector { json_t *toJson() const; - int getIndexByName(const std::string &name); - Signal::Ptr getByName(const std::string &name); + int getIndexByName(std::string_view name); + Signal::Ptr getByName(std::string_view name); Signal::Ptr getByIndex(unsigned idx); + + void parse(json_t *json_signals); }; } // namespace node diff --git a/lib/hooks/lua.cpp b/lib/hooks/lua.cpp index 0dd89a402..8683c1746 100644 --- a/lib/hooks/lua.cpp +++ b/lib/hooks/lua.cpp @@ -420,7 +420,9 @@ void LuaHook::parseExpressions(json_t *json_sigs) { size_t i; json_t *json_sig; + expressions.clear(); signalsExpressions->clear(); + ret = signalsExpressions->parse(json_sigs); if (ret) throw ConfigError(json_sigs, "node-config-hook-lua-signals", diff --git a/lib/node_direction.cpp b/lib/node_direction.cpp index 39088b014..ed925f99d 100644 --- a/lib/node_direction.cpp +++ b/lib/node_direction.cpp @@ -44,53 +44,10 @@ int NodeDirection::parse(json_t *json) { signals = std::make_shared(); if (!signals) throw MemoryAllocationError(); - } else if (json_is_object(json_signals) || json_is_array(json_signals)) { - signals = std::make_shared(); + } else if (json_signals) { + signals = std::make_shared(json_signals); if (!signals) throw MemoryAllocationError(); - - if (json_is_object(json_signals)) { - json_t *json_name, *json_signal = json_signals; - int count; - - ret = json_unpack_ex(json_signal, &err, 0, "{ s: i }", "count", &count); - if (ret) - throw ConfigError(json_signals, "node-config-node-signals", - "Invalid signal definition"); - - json_signals = json_array(); - for (int i = 0; i < count; i++) { - json_t *json_signal_copy = json_copy(json_signal); - - json_object_del(json_signal, "count"); - - // Append signal index - json_name = json_object_get(json_signal_copy, "name"); - if (json_name) { - const char *name = json_string_value(json_name); - char *name_new; - - int ret __attribute__((unused)); - ret = asprintf(&name_new, "%s%d", name, i); - - json_string_set(json_name, name_new); - } - - json_array_append_new(json_signals, json_signal_copy); - } - json_object_set_new(json, "signals", json_signals); - } - - ret = signals->parse(json_signals); - if (ret) - throw ConfigError(json_signals, "node-config-node-signals", - "Failed to parse signal definition"); - } else if (json_is_string(json_signals)) { - const char *dt = json_string_value(json_signals); - - signals = std::make_shared(dt); - if (!signals) - return -1; } else { signals = std::make_shared(DEFAULT_SAMPLE_LENGTH, SignalType::FLOAT); diff --git a/lib/signal_list.cpp b/lib/signal_list.cpp index dd1359973..38af58bb9 100644 --- a/lib/signal_list.cpp +++ b/lib/signal_list.cpp @@ -17,66 +17,78 @@ using namespace villas; using namespace villas::node; using namespace villas::utils; -int SignalList::parse(json_t *json) { - int ret; +SignalList::SignalList(json_t *json_signals) { parse(json_signals); } - if (!json_is_array(json)) - return -1; +SignalList::SignalList(unsigned len, enum SignalType typ) { + auto typ_str = signalTypeToString(typ); - size_t i; - json_t *json_signal; - json_array_foreach(json, i, json_signal) { - auto sig = std::make_shared(); - if (!sig) - throw MemoryAllocationError(); + auto *json_signals = json_pack("{ s: s, s: s, s: i }", "name", "signal", + "type", typ_str.c_str(), "count", len); - ret = sig->parse(json_signal); - if (ret) - return ret; + parse(json_signals); +} - push_back(sig); - } +SignalList::SignalList(std::string_view dt) { + json_t *json_signals = json_array(); - return 0; -} + int i = 0; + char *e; -SignalList::SignalList(unsigned len, enum SignalType typ) { - char name[32]; + auto *dtc = dt.data(); - for (unsigned i = 0; i < len; i++) { - snprintf(name, sizeof(name), "signal%u", i); + for (const char *t = dtc; *t; t = e + 1) { + auto len = strtoul(t, &e, 10); + if (t == e) + len = 1; - auto sig = std::make_shared(name, "", typ); - if (!sig) + auto name = fmt::format("signal_{}", i++); + + auto typ = signalTypeFromFormatString(*e); + if (typ == SignalType::INVALID) throw RuntimeError("Failed to create signal list"); - push_back(sig); + auto typ_str = signalTypeToString(typ); + + auto *json_signal = json_pack("{ s: s, s: s, s: i }", "name", name.c_str(), + "type", typ_str.c_str(), "count", len); + + json_array_append_new(json_signals, json_signal); } + + parse(json_signals); } -SignalList::SignalList(const char *dt) { - int len, i = 0; - char name[32], *e; - enum SignalType typ; +void SignalList::parse(json_t *json_signals) { + clear(); - for (const char *t = dt; *t; t = e + 1) { - len = strtoul(t, &e, 10); - if (t == e) - len = 1; + if (json_is_string(json_signals)) { + SignalList(json_string_value(json_signals)); + } else if (json_is_object(json_signals)) { + auto *json_tmp = json_signals; - typ = signalTypeFromFormatString(*e); - if (typ == SignalType::INVALID) - throw RuntimeError("Failed to create signal list"); + json_signals = json_array(); + json_array_append_new(json_signals, json_tmp); + } else { + throw ConfigError(json_signals, "Invalid signal list"); + } + + size_t i; + json_t *json_signal; + json_array_foreach(json_signals, i, json_signal) { + if (!json_is_object(json_signal)) { + throw ConfigError(json_signal, + "Signal definitions must be a JSON object"); + } - for (int j = 0; j < len; j++) { - snprintf(name, sizeof(name), "signal%d", i++); + auto sig = std::make_shared(); + if (!sig) + throw MemoryAllocationError(); - auto sig = std::make_shared(name, "", typ); - if (!sig) - throw RuntimeError("Failed to create signal list"); + auto ret = sig->parse(json_signal); + if (ret) + throw ConfigError(json_signal, "Failed to parse signal definition"); - push_back(sig); - } + push_back(sig); } } @@ -122,7 +134,7 @@ json_t *SignalList::toJson() const { Signal::Ptr SignalList::getByIndex(unsigned idx) { return this->at(idx); } -int SignalList::getIndexByName(const std::string &name) { +int SignalList::getIndexByName(std::string_view name) { unsigned i = 0; for (auto s : *this) { if (name == s->name) @@ -134,7 +146,7 @@ int SignalList::getIndexByName(const std::string &name) { return -1; } -Signal::Ptr SignalList::getByName(const std::string &name) { +Signal::Ptr SignalList::getByName(std::string_view name) { for (auto s : *this) { if (name == s->name) return s; From 64617821723dce4e4110391365c5f486675e9369 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sun, 30 Nov 2025 17:18:20 +0100 Subject: [PATCH 3/9] Update auto-generated code Signed-off-by: Steffen Vogel --- python/villas/node/villas_pb2.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/villas/node/villas_pb2.py b/python/villas/node/villas_pb2.py index b764662c0..a3aa59453 100644 --- a/python/villas/node/villas_pb2.py +++ b/python/villas/node/villas_pb2.py @@ -22,6 +22,8 @@ _sym_db = _symbol_database.Default() + + DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cvillas.proto\x12\x0bvillas.node\"/\n\x07Message\x12$\n\x07samples\x18\x01 \x03(\x0b\x32\x13.villas.node.Sample\"\xfe\x01\n\x06Sample\x12,\n\x04type\x18\x01 \x02(\x0e\x32\x18.villas.node.Sample.Type:\x04\x44\x41TA\x12\x10\n\x08sequence\x18\x02 \x01(\x04\x12)\n\tts_origin\x18\x03 \x01(\x0b\x32\x16.villas.node.Timestamp\x12+\n\x0bts_received\x18\x04 \x01(\x0b\x32\x16.villas.node.Timestamp\x12\x11\n\tnew_frame\x18\x05 \x01(\x08\x12\"\n\x06values\x18\x64 \x03(\x0b\x32\x12.villas.node.Value\"%\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\t\n\x05START\x10\x02\x12\x08\n\x04STOP\x10\x03\"&\n\tTimestamp\x12\x0b\n\x03sec\x18\x01 \x02(\r\x12\x0c\n\x04nsec\x18\x02 \x02(\r\"Z\n\x05Value\x12\x0b\n\x01\x66\x18\x01 \x01(\x01H\x00\x12\x0b\n\x01i\x18\x02 \x01(\x03H\x00\x12\x0b\n\x01\x62\x18\x03 \x01(\x08H\x00\x12!\n\x01z\x18\x04 \x01(\x0b\x32\x14.villas.node.ComplexH\x00\x42\x07\n\x05value\"%\n\x07\x43omplex\x12\x0c\n\x04real\x18\x01 \x02(\x02\x12\x0c\n\x04imag\x18\x02 \x02(\x02') _globals = globals() From 1af4e384815a28ccce567d823591ee8c9af30980 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sun, 30 Nov 2025 21:57:10 +0100 Subject: [PATCH 4/9] Allow overwriting signal parsing Signed-off-by: Steffen Vogel --- include/villas/node.hpp | 9 +++++- include/villas/node_direction.hpp | 4 ++- include/villas/signal.hpp | 6 ++-- include/villas/signal_list.hpp | 8 +++-- lib/hooks/lua.cpp | 6 +--- lib/node.cpp | 39 ++++++++++++++-------- lib/node_compat.cpp | 2 +- lib/node_direction.cpp | 5 +-- lib/nodes/api.cpp | 2 +- lib/nodes/exec.cpp | 2 +- lib/nodes/fpga.cpp | 2 +- lib/nodes/iec60870.cpp | 2 +- lib/nodes/iec61850_goose.cpp | 7 ++-- lib/nodes/kafka.cpp | 10 +++--- lib/nodes/loopback.cpp | 8 +++-- lib/nodes/modbus.cpp | 3 +- lib/nodes/opal_async.cpp | 7 ++-- lib/nodes/opal_orchestra.cpp | 2 +- lib/nodes/opendss.cpp | 3 +- lib/nodes/signal.cpp | 14 +++++--- lib/nodes/webrtc.cpp | 2 +- lib/signal.cpp | 10 ++++++ lib/signal_list.cpp | 54 ++++++++++++++++++++++++------- 23 files changed, 139 insertions(+), 68 deletions(-) diff --git a/include/villas/node.hpp b/include/villas/node.hpp index f4c52a62e..0b7ab6858 100644 --- a/include/villas/node.hpp +++ b/include/villas/node.hpp @@ -7,6 +7,7 @@ #pragma once +#include #include #include @@ -106,6 +107,13 @@ class Node { virtual json_t *_readStatus() const { return nullptr; } + int parseCommon( + json_t *json, + std::function + parse_signal = [](json_t *j, NodeDirection::Direction d) { + return Signal::fromJson(j); + }); + public: // Initialize node with default values Node(const uuid_t &id = {}, const std::string &name = ""); @@ -277,7 +285,6 @@ class Node { }; class NodeFactory : public villas::plugin::Plugin { - friend Node; protected: diff --git a/include/villas/node_direction.hpp b/include/villas/node_direction.hpp index ecd69b356..7ef96cacf 100644 --- a/include/villas/node_direction.hpp +++ b/include/villas/node_direction.hpp @@ -7,6 +7,8 @@ #pragma once +#include + #include #include @@ -50,7 +52,7 @@ class NodeDirection { NodeDirection(enum NodeDirection::Direction dir, Node *n); - int parse(json_t *json); + int parse(json_t *json, std::function parse_signal); void check(); int prepare(); int start(); diff --git a/include/villas/signal.hpp b/include/villas/signal.hpp index 71e90e629..c7fc48464 100644 --- a/include/villas/signal.hpp +++ b/include/villas/signal.hpp @@ -42,12 +42,14 @@ class Signal { // Parse signal description. int parse(json_t *json); - std::string toString(const union SignalData *d = nullptr) const; + virtual std::string toString(const union SignalData *d = nullptr) const; // Produce JSON representation of signal. - json_t *toJson() const; + virtual json_t *toJson() const; bool isNext(const Signal &sig); + + static Signal::Ptr fromJson(json_t *json); }; } // namespace node diff --git a/include/villas/signal_list.hpp b/include/villas/signal_list.hpp index aa560dc91..ef6e08daf 100644 --- a/include/villas/signal_list.hpp +++ b/include/villas/signal_list.hpp @@ -7,6 +7,7 @@ #pragma once +#include #include #include #include @@ -26,7 +27,8 @@ class SignalList : public std::vector { using Ptr = std::shared_ptr; SignalList() {} - SignalList(json_t *json); + SignalList(json_t *json, std::function parse_signal = + Signal::fromJson); SignalList(std::string_view dt); SignalList(unsigned len, enum SignalType fmt); @@ -41,7 +43,9 @@ class SignalList : public std::vector { Signal::Ptr getByName(std::string_view name); Signal::Ptr getByIndex(unsigned idx); - void parse(json_t *json_signals); + void + parse(json_t *json_signals, + std::function parse_signal = Signal::fromJson); }; } // namespace node diff --git a/lib/hooks/lua.cpp b/lib/hooks/lua.cpp index 8683c1746..d33951e7c 100644 --- a/lib/hooks/lua.cpp +++ b/lib/hooks/lua.cpp @@ -416,17 +416,13 @@ LuaHook::LuaHook(Path *p, Node *n, int fl, int prio, bool en) LuaHook::~LuaHook() { lua_close(L); } void LuaHook::parseExpressions(json_t *json_sigs) { - int ret; size_t i; json_t *json_sig; expressions.clear(); signalsExpressions->clear(); - ret = signalsExpressions->parse(json_sigs); - if (ret) - throw ConfigError(json_sigs, "node-config-hook-lua-signals", - "Setting 'signals' must be a list of dicts"); + signalsExpressions->parse(json_sigs); // cppcheck-suppress unknownMacro json_array_foreach(json_sigs, i, json_sig) diff --git a/lib/node.cpp b/lib/node.cpp index b7453eb4b..f4dd16f1f 100644 --- a/lib/node.cpp +++ b/lib/node.cpp @@ -97,7 +97,12 @@ int Node::prepare() { return 0; } -int Node::parse(json_t *json) { +int Node::parse(json_t *json) { return parseCommon(json); } + +int Node::parseCommon( + json_t *json, + std::function + parse_signal) { assert(state == State::INITIALIZED || state == State::PARSED || state == State::CHECKED); @@ -138,31 +143,37 @@ int Node::parse(json_t *json) { #endif // WITH_NETEM } - struct { + struct Direction { const char *str; - struct NodeDirection *dir; - } dirs[] = {{"in", &in}, {"out", &out}}; + struct NodeDirection *obj; + enum NodeDirection::Direction dir; + }; + std::vector dirs = {{"in", &in, NodeDirection::Direction::IN}, + {"out", &out, NodeDirection::Direction::OUT}}; - const char *fields[] = {"signals", "builtin", "vectorize", "hooks"}; + std::vector fields = {"signals", "builtin", "vectorize", + "hooks"}; - for (unsigned j = 0; j < ARRAY_LEN(dirs); j++) { - json_t *json_dir = json_object_get(json, dirs[j].str); + for (auto &dir : dirs) { + json_t *json_dir = json_object_get(json, dir.str); // Skip if direction is unused if (!json_dir) { json_dir = json_pack("{ s: b }", "enabled", 0); } - // Copy missing fields from main node config to direction config - for (unsigned i = 0; i < ARRAY_LEN(fields); i++) { - json_t *json_field_dir = json_object_get(json_dir, fields[i]); - json_t *json_field_node = json_object_get(json, fields[i]); + // Copy missing fields from main node config to direction config. + for (auto &field : fields) { + json_t *json_field_dir = json_object_get(json_dir, field.c_str()); + json_t *json_field_node = json_object_get(json, field.c_str()); - if (json_field_node && !json_field_dir) - json_object_set(json_dir, fields[i], json_field_node); + if (json_field_node && !json_field_dir) { + json_object_set(json_dir, field.c_str(), json_field_node); + } } - ret = dirs[j].dir->parse(json_dir); + ret = dir.obj->parse(json_dir, + [&](json_t *j) { return parse_signal(j, dir.dir); }); if (ret) return ret; } diff --git a/lib/node_compat.cpp b/lib/node_compat.cpp index 02fcd759c..48bb7a80e 100644 --- a/lib/node_compat.cpp +++ b/lib/node_compat.cpp @@ -74,7 +74,7 @@ int NodeCompat::prepare() { } int NodeCompat::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/node_direction.cpp b/lib/node_direction.cpp index ed925f99d..ff6d02473 100644 --- a/lib/node_direction.cpp +++ b/lib/node_direction.cpp @@ -22,7 +22,8 @@ NodeDirection::NodeDirection(enum NodeDirection::Direction dir, Node *n) : direction(dir), path(nullptr), node(n), enabled(1), builtin(1), vectorize(1), config(nullptr) {} -int NodeDirection::parse(json_t *json) { +int NodeDirection::parse(json_t *json, + std::function parse_signal) { int ret; json_error_t err; @@ -45,7 +46,7 @@ int NodeDirection::parse(json_t *json) { if (!signals) throw MemoryAllocationError(); } else if (json_signals) { - signals = std::make_shared(json_signals); + signals = std::make_shared(json_signals, parse_signal); if (!signals) throw MemoryAllocationError(); } else { diff --git a/lib/nodes/api.cpp b/lib/nodes/api.cpp index 89b9dfe37..4bb1f9af6 100644 --- a/lib/nodes/api.cpp +++ b/lib/nodes/api.cpp @@ -88,7 +88,7 @@ int APINode::_write(struct Sample *smps[], unsigned cnt) { } int APINode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/nodes/exec.cpp b/lib/nodes/exec.cpp index 6fe0cbde8..b94134e70 100644 --- a/lib/nodes/exec.cpp +++ b/lib/nodes/exec.cpp @@ -28,7 +28,7 @@ ExecNode::~ExecNode() { } int ExecNode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/nodes/fpga.cpp b/lib/nodes/fpga.cpp index 45f1e3f24..dcf3b1bb7 100644 --- a/lib/nodes/fpga.cpp +++ b/lib/nodes/fpga.cpp @@ -112,7 +112,7 @@ int FpgaNode::prepare() { int FpgaNode::stop() { return Node::stop(); } int FpgaNode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) { return ret; } diff --git a/lib/nodes/iec60870.cpp b/lib/nodes/iec60870.cpp index e28f1d952..5b73647f7 100644 --- a/lib/nodes/iec60870.cpp +++ b/lib/nodes/iec60870.cpp @@ -677,7 +677,7 @@ SlaveNode::SlaveNode(const uuid_t &id, const std::string &name) SlaveNode::~SlaveNode() { destroySlave(); } int SlaveNode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/nodes/iec61850_goose.cpp b/lib/nodes/iec61850_goose.cpp index 45003a1c3..35d791a52 100644 --- a/lib/nodes/iec61850_goose.cpp +++ b/lib/nodes/iec61850_goose.cpp @@ -622,16 +622,15 @@ GooseNode::~GooseNode() { } int GooseNode::parse(json_t *json) { - int ret; - json_error_t err; - - ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; json_t *json_keys = nullptr; json_t *json_in = nullptr; json_t *json_out = nullptr; + + json_error_t err; ret = json_unpack_ex(json, &err, 0, // "{ s:?o, s:?o, s:?o }", // "keys", &json_keys, // diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index 5f9a60576..bce209b8a 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -235,6 +235,10 @@ class KafkaNode : public Node, public RdKafka::EventCb { } int parse(json_t *json) override { + int ret = Node::parseCommon(json); + if (ret) + return ret; + const char *svr; const char *prod = nullptr; const char *cons = nullptr; @@ -243,15 +247,11 @@ class KafkaNode : public Node, public RdKafka::EventCb { const char *gid = nullptr; double to = -1; - int ret = Node::parse(json); - if (ret) - return ret; - - json_error_t err; json_t *json_ssl = nullptr; json_t *json_sasl = nullptr; json_t *json_format = nullptr; + json_error_t err; ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: o, s: s, " "s?: F, s: s, s?: s, s?: o, s?: o }", diff --git a/lib/nodes/loopback.cpp b/lib/nodes/loopback.cpp index 5cd411afa..b48e0f119 100644 --- a/lib/nodes/loopback.cpp +++ b/lib/nodes/loopback.cpp @@ -94,11 +94,13 @@ const std::string &LoopbackNode::getDetails() { } int LoopbackNode::parse(json_t *json) { + int ret = Node::parseCommon(json); + if (ret) + return ret; + const char *mode_str = nullptr; json_error_t err; - int ret; - ret = json_unpack_ex(json, &err, 0, "{ s?: i, s?: s }", "queuelen", &queuelen, "mode", &mode_str); if (ret) @@ -120,7 +122,7 @@ int LoopbackNode::parse(json_t *json) { "Unknown mode '{}'", mode_str); } - return Node::parse(json); + return 0; } // Register node diff --git a/lib/nodes/modbus.cpp b/lib/nodes/modbus.cpp index 90ee4a5a5..d55a5b76d 100644 --- a/lib/nodes/modbus.cpp +++ b/lib/nodes/modbus.cpp @@ -818,7 +818,8 @@ unsigned int ModbusNode::parseMappings(std::vector &mappings, } int ModbusNode::parse(json_t *json) { - if (auto ret = Node::parse(json)) + int ret = Node::parseCommon(json); + if (ret) return ret; json_error_t err; diff --git a/lib/nodes/opal_async.cpp b/lib/nodes/opal_async.cpp index f3a5dcee6..4c04e2c44 100644 --- a/lib/nodes/opal_async.cpp +++ b/lib/nodes/opal_async.cpp @@ -139,12 +139,13 @@ void LogSink::sink_it_(const spdlog::details::log_msg &msg) { } int OpalAsyncNode::parse(json_t *json) { - int ret, rply = -1, id = -1; - - ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; + int rply = -1; + int id = -1; + json_error_t err; ret = json_unpack_ex(json, &err, 0, "{ s: i, s?: { s?: b } }", "id", &id, "in", "reply", &rply); diff --git a/lib/nodes/opal_orchestra.cpp b/lib/nodes/opal_orchestra.cpp index cba9a456e..72383256b 100644 --- a/lib/nodes/opal_orchestra.cpp +++ b/lib/nodes/opal_orchestra.cpp @@ -408,7 +408,7 @@ class OpalOrchestraNode : public Node { } int parse(json_t *json) override { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/nodes/opendss.cpp b/lib/nodes/opendss.cpp index b9fcb107e..8757453aa 100644 --- a/lib/nodes/opendss.cpp +++ b/lib/nodes/opendss.cpp @@ -133,8 +133,7 @@ void OpenDSS::parseData(json_t *json, bool in) { } int OpenDSS::parse(json_t *json) { - - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/nodes/signal.cpp b/lib/nodes/signal.cpp index d5ce3fb24..3713ee68d 100644 --- a/lib/nodes/signal.cpp +++ b/lib/nodes/signal.cpp @@ -195,15 +195,15 @@ int SignalNode::prepare() { } int SignalNode::parse(json_t *json) { - int r = -1, m = -1, ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; - json_error_t err; - - size_t i; - json_t *json_signals, *json_signal; + int r = -1; + int m = -1; + json_t *json_signals; + json_error_t err; ret = json_unpack_ex(json, &err, 0, "{ s?: b, s?: i, s?: F, s?: b, s: { s: o } }", "realtime", &r, "limit", &limit, "rate", &rate, @@ -218,7 +218,11 @@ int SignalNode::parse(json_t *json) { monitor_missed = m != 0; signals.clear(); + unsigned j = 0; + + size_t i; + json_t *json_signal; json_array_foreach(json_signals, i, json_signal) { auto sig = SignalNodeSignal(json_signal); diff --git a/lib/nodes/webrtc.cpp b/lib/nodes/webrtc.cpp index e89026817..e28943d96 100644 --- a/lib/nodes/webrtc.cpp +++ b/lib/nodes/webrtc.cpp @@ -48,7 +48,7 @@ WebRTCNode::~WebRTCNode() { } int WebRTCNode::parse(json_t *json) { - int ret = Node::parse(json); + int ret = Node::parseCommon(json); if (ret) return ret; diff --git a/lib/signal.cpp b/lib/signal.cpp index 875c2b33a..02cc3b405 100644 --- a/lib/signal.cpp +++ b/lib/signal.cpp @@ -123,3 +123,13 @@ bool Signal::isNext(const Signal &sig) { return isNextName(name, sig.name); } + +Signal::Ptr Signal::fromJson(json_t *json) { + auto signal = std::make_shared(); + + auto ret = signal->parse(json); + if (ret) + return nullptr; + + return signal; +} diff --git a/lib/signal_list.cpp b/lib/signal_list.cpp index 38af58bb9..6ef977521 100644 --- a/lib/signal_list.cpp +++ b/lib/signal_list.cpp @@ -17,7 +17,10 @@ using namespace villas; using namespace villas::node; using namespace villas::utils; -SignalList::SignalList(json_t *json_signals) { parse(json_signals); } +SignalList::SignalList(json_t *json_signals, + std::function parse_signal) { + parse(json_signals, parse_signal); +} SignalList::SignalList(unsigned len, enum SignalType typ) { auto typ_str = signalTypeToString(typ); @@ -58,7 +61,8 @@ SignalList::SignalList(std::string_view dt) { parse(json_signals); } -void SignalList::parse(json_t *json_signals) { +void SignalList::parse(json_t *json_signals, + std::function parse_signal) { clear(); if (json_is_string(json_signals)) { @@ -68,8 +72,9 @@ void SignalList::parse(json_t *json_signals) { json_signals = json_array(); json_array_append_new(json_signals, json_tmp); - } else { - throw ConfigError(json_signals, "Invalid signal list"); + } else if (!json_is_array(json_signals)) { + throw ConfigError(json_signals, "node-config-node-signals", + "Invalid signal list"); } size_t i; @@ -77,18 +82,45 @@ void SignalList::parse(json_t *json_signals) { json_array_foreach(json_signals, i, json_signal) { if (!json_is_object(json_signal)) { throw ConfigError(json_signal, + "node-config-node-signal" "Signal definitions must be a JSON object"); } - auto sig = std::make_shared(); - if (!sig) - throw MemoryAllocationError(); + std::string baseName = "signal"; + bool appendIndex = false; + + int count = 1; + const char *nme = nullptr; + + int ret = json_unpack(json_signal, "{ s?: i, s?: s }", "count", &count, + "name", &nme); + if (ret) { + throw ConfigError(json_signal, "node-config-node-signal", + "Failed to parse signal definition"); + } - auto ret = sig->parse(json_signal); - if (ret) - throw ConfigError(json_signal, "Failed to parse signal definition"); + if (count > 1) { + json_object_del(json_signal, "count"); + appendIndex = true; + } - push_back(sig); + if (nme) { + baseName = nme; + } + + for (int j = 0; j < count; j++) { + if (appendIndex) { + auto name = fmt::format("{}_{}", baseName, j); + json_object_set_new(json_signal, "name", json_string(name.c_str())); + } + + auto signal = parse_signal(json_signal); + if (!signal) + throw ConfigError(json_signal, "node-config-node-signal", + "Failed to parse signal definition"); + + push_back(signal); + } } } From 6d6738b38180cf9d06f23974d2efd35dbcf6a7f0 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sun, 30 Nov 2025 21:59:30 +0100 Subject: [PATCH 5/9] fix(cmake): Remove superfluous CMake variable Signed-off-by: Steffen Vogel --- tools/run-cppcheck.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/run-cppcheck.sh b/tools/run-cppcheck.sh index ee84ab588..7267efa0d 100755 --- a/tools/run-cppcheck.sh +++ b/tools/run-cppcheck.sh @@ -8,7 +8,7 @@ SOURCE_DIR=${SCRIPT_DIR}/.. BUILD_DIR=${SOURCE_DIR}/build # Generate compilation database -cmake -S ${SOURCE_DIR} -B ${BUILD_DIR} -DCMAKE_EXPORT_COMPILE_COMMANDS=ON +cmake -S ${SOURCE_DIR} -B ${BUILD_DIR} touch ${BUILD_DIR}/lib/formats/villas.pb-c.{c,h} From 65c9f56a1eacf50d6eb53473d5616d296fd01067 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 1 Dec 2025 10:13:34 +0100 Subject: [PATCH 6/9] fix(node-opal-orchestra): Remove ddf_overwrite_only option Signed-off-by: Steffen Vogel --- .../schemas/config/nodes/opal_orchestra.yaml | 6 --- lib/nodes/opal_orchestra.cpp | 41 +++---------------- 2 files changed, 5 insertions(+), 42 deletions(-) diff --git a/doc/openapi/components/schemas/config/nodes/opal_orchestra.yaml b/doc/openapi/components/schemas/config/nodes/opal_orchestra.yaml index 816bbee25..6fec75b63 100644 --- a/doc/openapi/components/schemas/config/nodes/opal_orchestra.yaml +++ b/doc/openapi/components/schemas/config/nodes/opal_orchestra.yaml @@ -59,12 +59,6 @@ allOf: description: >- If true, the DDF file provided in the 'dff' setting will be overwriting with settings and signals from the VILLASnode configuration. - ddf_overwrite_only: - type: boolean - default: false - description: >- - If true, VILLASnode will overwrite the file provided in the 'ddf' setting, and terminate immediately afterwards. - rate: type: number default: 1 diff --git a/lib/nodes/opal_orchestra.cpp b/lib/nodes/opal_orchestra.cpp index 72383256b..deb4c5cbf 100644 --- a/lib/nodes/opal_orchestra.cpp +++ b/lib/nodes/opal_orchestra.cpp @@ -269,16 +269,7 @@ class OpalOrchestraNode : public Node { // Overwrite the data definition file (DDF). bool dataDefinitionFileOverwrite; - // Overwrite the data definition file (DDF) and terminate VILLASnode. - bool dataDefinitionFileWriteOnly; - int _read(struct Sample *smps[], unsigned cnt) override { - if (dataDefinitionFileWriteOnly) { - logger->warn("Stopping node after writing the DDF file"); - setState(State::STOPPING); - return -1; - } - assert(cnt == 1); if (!domain.synchronous) { @@ -314,12 +305,6 @@ class OpalOrchestraNode : public Node { } int _write(struct Sample *smps[], unsigned cnt) override { - if (dataDefinitionFileWriteOnly) { - logger->warn("Stopping node after writing the DDF file"); - setState(State::STOPPING); - return -1; - } - assert(cnt == 1); try { @@ -348,8 +333,7 @@ class OpalOrchestraNode : public Node { unsigned int key = 0) : Node(id, name), task(), connectionKey(key), status(nullptr), domain(), subscribeMappings(), publishMappings(), rate(1), connectTimeout(5), - skipWaitToGo(false), dataDefinitionFileOverwrite(false), - dataDefinitionFileWriteOnly(false) {} + skipWaitToGo(false), dataDefinitionFileOverwrite(false) {} void parseSignals(json_t *json, SignalList::Ptr signals, DataSet &dataSet, std::unordered_map, @@ -431,13 +415,13 @@ class OpalOrchestraNode : public Node { ret = json_unpack_ex( json, &err, 0, "{ s: s, s?: b, s?: b, s?: o, s?: s, s?: o, s?: o, s?: o, s?: b, s?: " - "b, s?: b, s?: F, s?: { s?: o }, s?: { s?: o } }", + "b, s?: F, s?: { s?: o }, s?: { s?: o } }", "domain", &dn, "synchronous", &sy, "states", &sts, "connection", &json_connection, "ddf", &ddf, "connect_timeout", &json_connect_timeout, "flag_delay", &json_flag_delay, "flag_delay_tool", &json_flag_delay_tool, "skip_wait_to_go", &sw, "ddf_overwrite", &ow, - "ddf_overwrite_only", &owo, "rate", &rate, "in", "signals", - &json_in_signals, "out", "signals", &json_out_signals); + "rate", &rate, "in", "signals", &json_in_signals, "out", "signals", + &json_out_signals); if (ret) { throw ConfigError(json, err, "node-config-node-opal-orchestra"); } @@ -464,10 +448,6 @@ class OpalOrchestraNode : public Node { dataDefinitionFileOverwrite = ow > 0; } - if (owo >= 0) { - dataDefinitionFileWriteOnly = owo > 0; - } - if (json_connect_timeout) { connectTimeout = parse_duration(json_connect_timeout); @@ -531,8 +511,7 @@ class OpalOrchestraNode : public Node { int prepare() override { // Write DDF. - if (dataDefinitionFilename && - (dataDefinitionFileOverwrite || dataDefinitionFileWriteOnly)) { + if (dataDefinitionFilename && dataDefinitionFileOverwrite) { // TODO: Possibly merge Orchestra domains from all nodes into one DDF. auto ddf = DataDefinitionFile(); @@ -541,10 +520,6 @@ class OpalOrchestraNode : public Node { logger->info("Wrote Orchestra Data Definition file (DDF) to '{}'", *dataDefinitionFilename); - - if (dataDefinitionFileWriteOnly) { - return Node::prepare(); - } } logger->debug("Connecting to Orchestra framework: domain={}, ddf={}, " @@ -623,14 +598,8 @@ class OpalOrchestraNode : public Node { } int start() override { - if (dataDefinitionFileWriteOnly) { - return Node::start(); - } - RTConnectionLockGuard guard(connectionKey); - RTWaitReadyToGo(); - if (!domain.synchronous) { task.setRate(rate); } From 58b716484237547b40926a574370bfc53d760055 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 1 Dec 2025 10:14:33 +0100 Subject: [PATCH 7/9] fix(node-opal-orchestra): Fix skip_wait_to_to_go setting Signed-off-by: Steffen Vogel --- lib/nodes/opal_orchestra.cpp | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/nodes/opal_orchestra.cpp b/lib/nodes/opal_orchestra.cpp index deb4c5cbf..633bf3d32 100644 --- a/lib/nodes/opal_orchestra.cpp +++ b/lib/nodes/opal_orchestra.cpp @@ -530,11 +530,15 @@ class OpalOrchestraNode : public Node { RTConnectionLockGuard guard(connectionKey); - auto ret = - dataDefinitionFilename - ? RTConnectWithFile(dataDefinitionFilename->c_str(), - domain.name.c_str(), connectTimeout.count()) - : RTConnect(domain.name.c_str(), connectTimeout.count()); + auto ret = RTSetSkipWaitToGoAtConnection(skipWaitToGo); + if (ret != RTAPI_SUCCESS) { + throw RTError(ret, "Failed to check ready to go"); + } + + ret = dataDefinitionFilename + ? RTConnectWithFile(dataDefinitionFilename->c_str(), + domain.name.c_str(), connectTimeout.count()) + : RTConnect(domain.name.c_str(), connectTimeout.count()); if (ret != RTAPI_SUCCESS) { throw RTError(ret, "Failed to connect to Orchestra framework"); } @@ -560,11 +564,6 @@ class OpalOrchestraNode : public Node { } } - ret = RTSetSkipWaitToGoAtConnection(skipWaitToGo); - if (ret != RTAPI_SUCCESS) { - throw RTError(ret, "Failed to check ready to go"); - } - if (std::shared_ptr c = std::dynamic_pointer_cast(domain.connection)) { ret = RTSetupRefMemRemoteConnection(c->name.c_str(), c->pciIndex); From 80db91e3f4ae212b22f910aaecc53c6ad47bce00 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 1 Dec 2025 10:18:44 +0100 Subject: [PATCH 8/9] fix(node-opal-orchestra): Use new signal parsing infranstructure Signed-off-by: Steffen Vogel --- lib/nodes/opal_orchestra.cpp | 120 ++++++++++++++++------------------- 1 file changed, 54 insertions(+), 66 deletions(-) diff --git a/lib/nodes/opal_orchestra.cpp b/lib/nodes/opal_orchestra.cpp index 633bf3d32..715f916f7 100644 --- a/lib/nodes/opal_orchestra.cpp +++ b/lib/nodes/opal_orchestra.cpp @@ -335,93 +335,91 @@ class OpalOrchestraNode : public Node { subscribeMappings(), publishMappings(), rate(1), connectTimeout(5), skipWaitToGo(false), dataDefinitionFileOverwrite(false) {} - void parseSignals(json_t *json, SignalList::Ptr signals, DataSet &dataSet, - std::unordered_map, - OpalOrchestraMapping> &mappings) { - if (!json_is_array(json)) { - throw ConfigError(json, "node-config-node-opal-orchestra-signals", - "Signals must be an array"); - } + Signal::Ptr parseSignal(json_t *json_signal, NodeDirection::Direction dir) { + auto signal = Signal::fromJson(json_signal); - size_t i; - json_t *json_signal; - json_error_t err; + DataSet &dataSet = + dir == NodeDirection::Direction::IN ? domain.publish : domain.subscribe; + std::unordered_map, OpalOrchestraMapping> + &mappings = dir == NodeDirection::Direction::IN ? publishMappings + : subscribeMappings; - json_array_foreach(json, i, json_signal) { - auto signal = signals->getByIndex(i); + const char *nme = nullptr; + const char *typ = nullptr; + int oi = -1; - const char *nme = nullptr; - const char *typ = nullptr; - int oi = -1; + json_error_t err; + auto ret = json_unpack_ex(json_signal, &err, 0, "{ s?: s, s?: s, s?: i }", + "orchestra_name", &nme, "orchestra_type", &typ, + "orchestra_index", &oi); + if (ret) { + throw ConfigError(json_signal, err, + "node-config-node-opal-orchestra-signals"); + } - auto ret = json_unpack_ex(json_signal, &err, 0, "{ s?: s, s?: s, s?: i }", - "orchestra_name", &nme, "orchestra_type", &typ, - "orchestra_index", &oi); - if (ret) { - throw ConfigError(json_signal, err, - "node-config-node-opal-orchestra-signals"); - } + std::optional orchestraIdx; - std::optional orchestraIdx; + if (oi >= 0) { + orchestraIdx = oi; + } - if (oi >= 0) { - orchestraIdx = oi; - } + auto defaultValue = + signal->init.cast(signal->type, node::SignalType::FLOAT); - auto defaultValue = - signal->init.cast(signal->type, node::SignalType::FLOAT); + auto orchestraType = typ ? orchestra::signalTypeFromString(typ) + : orchestra::toOrchestraSignalType(signal->type); - auto orchestraType = typ ? orchestra::signalTypeFromString(typ) - : orchestra::toOrchestraSignalType(signal->type); + auto orchestraName = nme ? nme : signal->name; - auto orchestraName = nme ? nme : signal->name; + bool inserted = false; + auto item = dataSet.upsertItem(orchestraName, inserted); - bool inserted = false; - auto item = dataSet.upsertItem(orchestraName, inserted); + if (inserted) { + item->type = orchestraType; + item->defaultValue = defaultValue.f; - if (inserted) { - item->type = orchestraType; - item->defaultValue = defaultValue.f; + mappings.emplace(item, OpalOrchestraMapping(item, orchestraName)); + } - mappings.emplace(item, OpalOrchestraMapping(item, orchestraName)); - } + auto &mapping = mappings.at(item); + mapping.addSignal(signal, orchestraIdx); - auto &mapping = mappings.at(item); - mapping.addSignal(signal, orchestraIdx); - } + return signal; } int parse(json_t *json) override { - int ret = Node::parseCommon(json); - if (ret) - return ret; + domain = Domain(); + publishMappings.clear(); + subscribeMappings.clear(); + + int reti = parseCommon( + json, [&](json_t *json_signal, NodeDirection::Direction dir) { + return parseSignal(json_signal, dir); + }); + if (reti) + return reti; + int sw = -1; + int ow = -1; + int sy = -1; + int sts = -1; const char *dn = nullptr; const char *ddf = nullptr; - json_t *json_in_signals = nullptr; - json_t *json_out_signals = nullptr; json_t *json_connection = nullptr; json_t *json_connect_timeout = nullptr; json_t *json_flag_delay = nullptr; json_t *json_flag_delay_tool = nullptr; - int sw = -1; - int ow = -1; - int owo = -1; - int sy = -1; - int sts = -1; - json_error_t err; - ret = json_unpack_ex( + auto ret = json_unpack_ex( json, &err, 0, "{ s: s, s?: b, s?: b, s?: o, s?: s, s?: o, s?: o, s?: o, s?: b, s?: " - "b, s?: F, s?: { s?: o }, s?: { s?: o } }", + "b, s?: F }", "domain", &dn, "synchronous", &sy, "states", &sts, "connection", &json_connection, "ddf", &ddf, "connect_timeout", &json_connect_timeout, "flag_delay", &json_flag_delay, "flag_delay_tool", &json_flag_delay_tool, "skip_wait_to_go", &sw, "ddf_overwrite", &ow, - "rate", &rate, "in", "signals", &json_in_signals, "out", "signals", - &json_out_signals); + "rate", &rate); if (ret) { throw ConfigError(json, err, "node-config-node-opal-orchestra"); } @@ -466,16 +464,6 @@ class OpalOrchestraNode : public Node { domain.connection = Connection::fromJson(json_connection); } - if (json_in_signals) { - parseSignals(json_in_signals, in.getSignals(false), domain.publish, - publishMappings); - } - - if (json_out_signals) { - parseSignals(json_out_signals, out.getSignals(false), domain.subscribe, - subscribeMappings); - } - return 0; } From 27de851db450349b0571ffad9c6dd069ee6e597b Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 4 Mar 2026 14:17:17 +0000 Subject: [PATCH 9/9] wip --- .../components/schemas/formats/edgeflex.yaml | 8 +- doc/openapi/paths/nodes.yaml | 14 +- etc/examples/nodes/opal_orchestra.conf | 3 +- etc/examples/typhoon.conf | 10 +- include/villas/node.hpp | 6 +- include/villas/node/config.hpp.in | 2 +- include/villas/node_direction.hpp | 2 +- include/villas/nodes/nanomsg.hpp | 3 - include/villas/nodes/socket.hpp | 3 +- include/villas/signal.hpp | 5 +- include/villas/signal_list.hpp | 10 +- lib/formats/column.cpp | 17 +- lib/formats/iotagent_ul.cpp | 5 +- lib/formats/json.cpp | 11 +- lib/formats/json_edgeflex.cpp | 5 +- lib/formats/json_reserve.cpp | 5 +- lib/formats/msg.cpp | 6 +- lib/formats/opal_asyncip.cpp | 4 +- lib/formats/protobuf.cpp | 6 +- lib/formats/value.cpp | 9 +- lib/formats/villas_human.cpp | 8 +- lib/hooks/dp.cpp | 2 - lib/hooks/lua.cpp | 2 - lib/hooks/stats.cpp | 2 +- lib/node.cpp | 6 +- lib/node_direction.cpp | 2 +- lib/nodes/amqp.cpp | 2 +- lib/nodes/kafka.cpp | 51 ++-- lib/nodes/mqtt.cpp | 2 +- lib/nodes/nanomsg.cpp | 4 +- lib/nodes/opal_orchestra.cpp | 234 +++++++----------- lib/nodes/redis.cpp | 4 +- lib/signal.cpp | 41 +-- lib/signal_list.cpp | 41 +-- tests/integration/hook-average.sh | 4 +- tests/integration/hook-cast.sh | 6 +- tests/integration/hook-frame.sh | 22 +- tests/integration/hook-gate.sh | 2 +- tests/integration/hook-lua.sh | 12 +- tests/integration/hook-lua_script.sh | 6 +- tests/integration/hook-reorder_ts.sh | 2 +- tests/integration/hook-scale.sh | 4 +- tests/integration/node-loopback-socket.sh | 8 +- .../integration/pipe-loopback-iec61850-9-2.sh | 4 +- tests/integration/pipe-loopback-nanomsg.sh | 2 +- tests/integration/pipe-loopback-rtp-dual.sh | 4 +- tests/integration/pipe-loopback-rtp-remote.sh | 4 +- tests/integration/pipe-loopback-rtp-tbf.sh | 4 +- tests/integration/pipe-loopback-rtp.sh | 2 +- tests/integration/pipe-loopback-socket.sh | 2 +- tests/integration/pipe-loopback-zeromq.sh | 2 +- 51 files changed, 259 insertions(+), 366 deletions(-) diff --git a/doc/openapi/components/schemas/formats/edgeflex.yaml b/doc/openapi/components/schemas/formats/edgeflex.yaml index 2826cbb02..be88ce0fd 100644 --- a/doc/openapi/components/schemas/formats/edgeflex.yaml +++ b/doc/openapi/components/schemas/formats/edgeflex.yaml @@ -34,9 +34,9 @@ additionalProperties: example: - created: 1633791645123 - signal0: 123.456 - signal1: true - signal2: 1234 - signal3: + signal_0: 123.456 + signal_1: true + signal_2: 1234 + signal_3: real: 1234.4556 imag: 23232.12312 diff --git a/doc/openapi/paths/nodes.yaml b/doc/openapi/paths/nodes.yaml index 223eb5232..03d7b80da 100644 --- a/doc/openapi/paths/nodes.yaml +++ b/doc/openapi/paths/nodes.yaml @@ -41,28 +41,28 @@ get: signals: - type: float enabled: true - name: signal0 + name: signal_0 - type: float enabled: true - name: signal1 + name: signal_1 - type: float enabled: true - name: signal2 + name: signal_2 - type: float enabled: true - name: signal3 + name: signal_3 out: vectorize: 2 signals: - type: float enabled: true - name: signal0 + name: signal_0 - type: float enabled: true - name: signal1 + name: signal_1 - type: float enabled: true - name: signal2 + name: signal_2 type: websocket vectorize: 2 series: diff --git a/etc/examples/nodes/opal_orchestra.conf b/etc/examples/nodes/opal_orchestra.conf index f319e495a..7aa0dc83a 100644 --- a/etc/examples/nodes/opal_orchestra.conf +++ b/etc/examples/nodes/opal_orchestra.conf @@ -74,7 +74,6 @@ nodes = { orchestra_name = "pub_signal_float" orchestra_type = "float64" - orchestra_index = 0 }, { name = "pub_signal2" @@ -83,7 +82,7 @@ nodes = { orchestra_name = "pub_signal_float" orchestra_type = "float64" }, - { name = "signal_float", orchestra_name = "some_bus/signal_float", orchestra_type = "float64", orchestra_index = 2 }, + { name = "signal_float", orchestra_name = "some_bus/signal_float", orchestra_type = "float64" }, { name = "signal_bool", orchestra_name = "some_bus/signal_bool", orchestra_type = "boolean" }, { name = "signal_uint8", orchestra_name = "some_bus/some_nested_bus/signal_uint8", orchestra_type = "unsigned int8" }, { name = "signal_uint8_2", orchestra_type = "unsigned int8" } diff --git a/etc/examples/typhoon.conf b/etc/examples/typhoon.conf index 536dd1646..076a333e6 100644 --- a/etc/examples/typhoon.conf +++ b/etc/examples/typhoon.conf @@ -21,11 +21,11 @@ nodes = { ) signals = ( - { name = "signal0", type="float" }, - { name = "signal1", type="integer" }, - { name = "signal2", type="boolean" }, - { name = "signal3", type="float" }, - { name = "signal4", type="complex" } + { name = "signal_0", type="float" }, + { name = "signal_1", type="integer" }, + { name = "signal_2", type="boolean" }, + { name = "signal_3", type="float" }, + { name = "signal_4", type="complex" } ) } diff --git a/include/villas/node.hpp b/include/villas/node.hpp index 0b7ab6858..a30402919 100644 --- a/include/villas/node.hpp +++ b/include/villas/node.hpp @@ -109,9 +109,9 @@ class Node { int parseCommon( json_t *json, - std::function - parse_signal = [](json_t *j, NodeDirection::Direction d) { - return Signal::fromJson(j); + std::function + parse_signal = [](json_t *json, NodeDirection::Direction dir, unsigned index) { + return Signal::fromJson(json); }); public: diff --git a/include/villas/node/config.hpp.in b/include/villas/node/config.hpp.in index 2ea3cb756..441b56c57 100644 --- a/include/villas/node/config.hpp.in +++ b/include/villas/node/config.hpp.in @@ -16,7 +16,7 @@ #define DEFAULT_SAMPLE_LENGTH 64u #define DEFAULT_QUEUE_LENGTH 1024u #define MAX_SAMPLE_LENGTH 512u -#define DEFAULT_FORMAT_BUFFER_LENGTH 4096u +#define DEFAULT_FORMAT_BUFFER_LENGTH (4096u*1024u) /* Number of hugepages which are requested from the the kernel. * @see https://www.kernel.org/doc/Documentation/vm/hugetlbpage.txt diff --git a/include/villas/node_direction.hpp b/include/villas/node_direction.hpp index 7ef96cacf..0e79be060 100644 --- a/include/villas/node_direction.hpp +++ b/include/villas/node_direction.hpp @@ -52,7 +52,7 @@ class NodeDirection { NodeDirection(enum NodeDirection::Direction dir, Node *n); - int parse(json_t *json, std::function parse_signal); + int parse(json_t *json, std::function parse_signal); void check(); int prepare(); int start(); diff --git a/include/villas/nodes/nanomsg.hpp b/include/villas/nodes/nanomsg.hpp index 775149002..5bc8ed7d6 100644 --- a/include/villas/nodes/nanomsg.hpp +++ b/include/villas/nodes/nanomsg.hpp @@ -16,9 +16,6 @@ namespace node { // Forward declarations class NodeCompat; -// The maximum length of a packet which contains stuct msg. -#define NANOMSG_MAX_PACKET_LEN 1500 - struct nanomsg { struct { int socket; diff --git a/include/villas/nodes/socket.hpp b/include/villas/nodes/socket.hpp index 8bdd7b49e..af72e850d 100644 --- a/include/villas/nodes/socket.hpp +++ b/include/villas/nodes/socket.hpp @@ -17,8 +17,7 @@ namespace node { // Forward declarations class NodeCompat; -// The maximum length of a packet which contains stuct msg. -#define SOCKET_INITIAL_BUFFER_LEN (64 * 1024) +#define SOCKET_INITIAL_BUFFER_LEN DEFAULT_FORMAT_BUFFER_LENGTH struct Socket { int sd; // The socket descriptor diff --git a/include/villas/signal.hpp b/include/villas/signal.hpp index c7fc48464..acd4f86b8 100644 --- a/include/villas/signal.hpp +++ b/include/villas/signal.hpp @@ -8,6 +8,7 @@ #pragma once #include +#include #include @@ -35,6 +36,8 @@ class Signal { union SignalData init; // The initial value of the signal. enum SignalType type; + Signal::Ptr previous; // Previous signal in vector. + // Initialize a signal with default values. Signal(const std::string &n = "", const std::string &u = "", enum SignalType t = SignalType::INVALID); @@ -47,8 +50,6 @@ class Signal { // Produce JSON representation of signal. virtual json_t *toJson() const; - bool isNext(const Signal &sig); - static Signal::Ptr fromJson(json_t *json); }; diff --git a/include/villas/signal_list.hpp b/include/villas/signal_list.hpp index ef6e08daf..7df07ea12 100644 --- a/include/villas/signal_list.hpp +++ b/include/villas/signal_list.hpp @@ -27,8 +27,10 @@ class SignalList : public std::vector { using Ptr = std::shared_ptr; SignalList() {} - SignalList(json_t *json, std::function parse_signal = - Signal::fromJson); + SignalList(json_t *json, std::function parse_signal = + [](json_t *json, unsigned index) { + return Signal::fromJson(json); + }); SignalList(std::string_view dt); SignalList(unsigned len, enum SignalType fmt); @@ -45,7 +47,9 @@ class SignalList : public std::vector { void parse(json_t *json_signals, - std::function parse_signal = Signal::fromJson); + std::function parse_signal = [](json_t *json, unsigned index) { + return Signal::fromJson(json); + }); }; } // namespace node diff --git a/lib/formats/column.cpp b/lib/formats/column.cpp index 4bf854645..851690862 100644 --- a/lib/formats/column.cpp +++ b/lib/formats/column.cpp @@ -13,6 +13,7 @@ #include #include #include +#include using namespace villas; using namespace villas::node; @@ -48,10 +49,8 @@ size_t ColumnLineFormat::sprintLine(char *buf, size_t len, if (flags & (int)SampleFlags::HAS_DATA) { if (smp->flags & (int)SampleFlags::HAS_DATA) { - for (unsigned i = 0; i < smp->length; i++) { + for (unsigned i = 0; i < MIN(smp->length, smp->signals->size()); i++) { auto sig = smp->signals->getByIndex(i); - if (!sig) - break; off += snprintf(buf + off, len - off, "%c", separator); off += smp->data[i].printString(sig->type, buf + off, len - off, @@ -107,17 +106,15 @@ size_t ColumnLineFormat::sscanLine(const char *buf, size_t len, smp->flags |= (int)SampleFlags::HAS_SEQUENCE; - for (ptr = end + 1, i = 0; i < smp->capacity; ptr = end + 1, i++) { + for (ptr = end + 1, i = 0; i < MIN(smp->capacity, signals->size()); ptr = end + 1, i++) { if (*end == delimiter) - goto out; + break; - auto sig = smp->signals->getByIndex(i); - if (!sig) - goto out; + auto sig = signals->getByIndex(i); ret = smp->data[i].parseString(sig->type, ptr, &end); if (ret || end == ptr) // There are no valid values anymore. - goto out; + break; } out: @@ -151,8 +148,6 @@ void ColumnLineFormat::header(FILE *f, const SignalList::Ptr sigs) { if (flags & (int)SampleFlags::HAS_DATA) { for (unsigned i = 0; i < sigs->size(); i++) { auto sig = sigs->getByIndex(i); - if (!sig) - break; if (!sig->name.empty()) fprintf(f, "%s", sig->name.c_str()); diff --git a/lib/formats/iotagent_ul.cpp b/lib/formats/iotagent_ul.cpp index 8ba278a1b..038cee581 100644 --- a/lib/formats/iotagent_ul.cpp +++ b/lib/formats/iotagent_ul.cpp @@ -15,6 +15,7 @@ #include #include #include +#include using namespace villas; using namespace villas::node; @@ -25,10 +26,8 @@ int IotAgentUltraLightFormat::sprint(char *buf, size_t len, size_t *wbytes, size_t printed = 0; const struct Sample *smp = smps[0]; - for (unsigned i = 0; (i < smp->length) && (printed < len); i++) { + for (unsigned i = 0; i < MIN(smp->length, smp->signals->size()) && printed < len; i++) { auto sig = smp->signals->getByIndex(i); - if (!sig) - return -1; if (!sig->name.empty()) printed += snprintf(buf + printed, len - printed, "%s|%f|", diff --git a/lib/formats/json.cpp b/lib/formats/json.cpp index 795c0c129..818bdf0d9 100644 --- a/lib/formats/json.cpp +++ b/lib/formats/json.cpp @@ -10,6 +10,7 @@ #include #include #include +#include using namespace villas; using namespace villas::node; @@ -165,10 +166,8 @@ int JsonFormat::packSample(json_t **json_smp, const struct Sample *smp) { if (flags & (int)SampleFlags::HAS_DATA) { json_t *json_data = json_array(); - for (unsigned i = 0; i < smp->length; i++) { + for (unsigned i = 0; i < MIN(smp->length, smp->signals->size()); i++) { auto sig = smp->signals->getByIndex(i); - if (!sig) - return -1; json_t *json_value = smp->data[i].toJson(sig->type); @@ -240,12 +239,10 @@ int JsonFormat::unpackSample(json_t *json_smp, struct Sample *smp) { } json_array_foreach(json_data, i, json_value) { - if (i >= smp->capacity) + if (i >= smp->capacity || i >= signals->size()) break; - auto sig = smp->signals->getByIndex(i); - if (!sig) - return -1; + auto sig = signals->getByIndex(i); enum SignalType fmt = detect(json_value); if (sig->type != fmt) diff --git a/lib/formats/json_edgeflex.cpp b/lib/formats/json_edgeflex.cpp index 861a2314c..01b270d26 100644 --- a/lib/formats/json_edgeflex.cpp +++ b/lib/formats/json_edgeflex.cpp @@ -9,6 +9,7 @@ #include #include +#include using namespace villas::node; @@ -22,10 +23,8 @@ int JsonEdgeflexFormat::packSample(json_t **json_smp, json_data = json_object(); - for (unsigned i = 0; i < smp->length; i++) { + for (unsigned i = 0; i < MIN(smp->length, smp->signals->size()); i++) { auto sig = smp->signals->getByIndex(i); - if (!sig) - return -1; json_value = smp->data[i].toJson(sig->type); json_object_set(json_data, sig->name.c_str(), json_value); diff --git a/lib/formats/json_reserve.cpp b/lib/formats/json_reserve.cpp index 42e91c5ac..250e0ced8 100644 --- a/lib/formats/json_reserve.cpp +++ b/lib/formats/json_reserve.cpp @@ -9,6 +9,7 @@ #include #include +#include using namespace villas::node; @@ -32,10 +33,8 @@ int JsonReserveFormat::packSample(json_t **json_smp, const struct Sample *smp) { json_data = json_array(); - for (unsigned i = 0; i < smp->length; i++) { + for (unsigned i = 0; i < MIN(smp->length, smp->signals->size()); i++) { auto sig = smp->signals->getByIndex(i); - if (!sig) - return -1; if (!sig->name.empty()) json_name = json_string(sig->name.c_str()); diff --git a/lib/formats/msg.cpp b/lib/formats/msg.cpp index 31818b8f2..6b5a4b413 100644 --- a/lib/formats/msg.cpp +++ b/lib/formats/msg.cpp @@ -69,8 +69,6 @@ int villas::node::msg_to_sample(const struct Message *msg, struct Sample *smp, unsigned len = MIN(msg->length, smp->capacity); for (i = 0; i < MIN(len, sigs->size()); i++) { auto sig = sigs->getByIndex(i); - if (!sig) - return -1; switch (sig->type) { case SignalType::FLOAT: @@ -111,10 +109,8 @@ int villas::node::msg_from_sample(struct Message *msg_in, msg_in->ts.sec = smp->ts.origin.tv_sec; msg_in->ts.nsec = smp->ts.origin.tv_nsec; - for (unsigned i = 0; i < smp->length; i++) { + for (unsigned i = 0; i < MIN(smp->length, sigs->size()); i++) { auto sig = sigs->getByIndex(i); - if (!sig) - return -1; switch (sig->type) { case SignalType::FLOAT: diff --git a/lib/formats/opal_asyncip.cpp b/lib/formats/opal_asyncip.cpp index aa05b4f19..6ca20a178 100644 --- a/lib/formats/opal_asyncip.cpp +++ b/lib/formats/opal_asyncip.cpp @@ -38,7 +38,7 @@ int OpalAsyncIPFormat::sprint(char *buf, size_t len, size_t *wbytes, "We only send the first {}..", MAXSIZE, MAXSIZE); - for (unsigned j = 0; j < MIN(MAXSIZE, smp->length); j++) { + for (unsigned j = 0; j < MIN(MAXSIZE, MIN(smp->length, smp->signals->size())); j++) { auto sig = smp->signals->getByIndex(j); auto d = smp->data[j]; @@ -78,7 +78,7 @@ int OpalAsyncIPFormat::sscan(const char *buf, size_t len, size_t *rbytes, smp->flags = (int)SampleFlags::HAS_SEQUENCE | (int)SampleFlags::HAS_DATA; smp->signals = signals; - for (unsigned j = 0; j < MIN(smp->length, smp->capacity); j++) { + for (unsigned j = 0; j < MIN(MIN(smp->length, signals->size()), smp->capacity); j++) { auto sig = signals->getByIndex(j); SignalData d; diff --git a/lib/formats/protobuf.cpp b/lib/formats/protobuf.cpp index c34913bcc..ccb0f84e0 100644 --- a/lib/formats/protobuf.cpp +++ b/lib/formats/protobuf.cpp @@ -184,14 +184,12 @@ int ProtobufFormat::sscan(const char *buf, size_t len, size_t *rbytes, smp->ts.origin.tv_nsec = pb_smp->ts_origin->nsec; } - for (j = 0; j < MIN(pb_smp->n_values, smp->capacity); j++) { + for (j = 0; j < MIN(pb_smp->n_values, MIN(smp->length, signals->size())); j++) { Villas__Node__Value *pb_val = pb_smp->values[j]; enum SignalType fmt = detect(pb_val); - auto sig = smp->signals->getByIndex(j); - if (!sig) - return -1; + auto sig = signals->getByIndex(j); if (sig->type != fmt) throw RuntimeError("Received invalid data type in Protobuf payload: " diff --git a/lib/formats/value.cpp b/lib/formats/value.cpp index e4970006d..bd9a503e1 100644 --- a/lib/formats/value.cpp +++ b/lib/formats/value.cpp @@ -8,6 +8,7 @@ #include #include #include +#include using namespace villas::node; @@ -22,10 +23,8 @@ int ValueFormat::sprint(char *buf, size_t len, size_t *wbytes, buf[0] = '\0'; - for (i = 0; i < smp->length; i++) { + for (i = 0; i < MIN(smp->length, smp->signals->size()); i++) { auto sig = smp->signals->getByIndex(i); - if (!sig) - return -1; off += smp->data[i].printString(sig->type, buf, len, real_precision); off += snprintf(buf + off, len - off, "\n"); @@ -49,10 +48,8 @@ int ValueFormat::sscan(const char *buf, size_t len, size_t *rbytes, printf("Reading: %s", buf); - if (smp->capacity >= 1) { + if (smp->capacity >= 1 && signals->size() >= 1) { auto sig = signals->getByIndex(i); - if (!sig) - return -1; ret = smp->data[i].parseString(sig->type, ptr, &end); if (ret || end == ptr) // There are no valid values anymore. diff --git a/lib/formats/villas_human.cpp b/lib/formats/villas_human.cpp index 6353e7488..c7cd0262e 100644 --- a/lib/formats/villas_human.cpp +++ b/lib/formats/villas_human.cpp @@ -49,10 +49,8 @@ size_t VILLASHumanFormat::sprintLine(char *buf, size_t len, } if (flags & (int)SampleFlags::HAS_DATA) { - for (unsigned i = 0; i < smp->length; i++) { + for (unsigned i = 0; i < MIN(smp->length, smp->signals->size()); i++) { auto sig = smp->signals->getByIndex(i); - if (!sig) - break; off += snprintf(buf + off, len - off, "\t"); off += smp->data[i].printString(sig->type, buf + off, len - off, @@ -131,13 +129,11 @@ size_t VILLASHumanFormat::sscanLine(const char *buf, size_t len, } unsigned i; - for (ptr = end + 1, i = 0; i < smp->capacity; ptr = end + 1, i++) { + for (ptr = end + 1, i = 0; i < MIN(smp->capacity, signals->size()); ptr = end + 1, i++) { if (*end == delimiter) goto out; auto sig = signals->getByIndex(i); - if (!sig) - goto out; ret = smp->data[i].parseString(sig->type, ptr, &end); if (ret || end == ptr) // There are no valid values anymore. diff --git a/lib/hooks/dp.cpp b/lib/hooks/dp.cpp index 75697c9e5..d5fc46042 100644 --- a/lib/hooks/dp.cpp +++ b/lib/hooks/dp.cpp @@ -226,8 +226,6 @@ class DPHook : public Hook { signals->insert(signals->begin() + signal_index, new_sig); } else { auto orig_sig = signals->getByIndex(signal_index); - if (!orig_sig) - throw RuntimeError("Failed to find signal"); if (orig_sig->type != SignalType::FLOAT) throw RuntimeError("Signal is not float"); diff --git a/lib/hooks/lua.cpp b/lib/hooks/lua.cpp index d33951e7c..4b41266ac 100644 --- a/lib/hooks/lua.cpp +++ b/lib/hooks/lua.cpp @@ -691,8 +691,6 @@ Hook::Reason LuaHook::process(struct Sample *smp) { for (unsigned i = 0; i < expressions.size(); i++) { auto sig = signalsExpressions->getByIndex(i); - if (!sig) - continue; expressions[i].evaluate(&smp->data[i], sig->type); } diff --git a/lib/hooks/stats.cpp b/lib/hooks/stats.cpp index d5c5edc2e..7c16dc5bd 100644 --- a/lib/hooks/stats.cpp +++ b/lib/hooks/stats.cpp @@ -132,7 +132,7 @@ class StatsHook : public Hook { state = State::STOPPED; } - virtual void restart() { + virtual void restart() override { assert(state == State::STARTED); stats->reset(); diff --git a/lib/node.cpp b/lib/node.cpp index f4dd16f1f..040a2f7ec 100644 --- a/lib/node.cpp +++ b/lib/node.cpp @@ -101,7 +101,7 @@ int Node::parse(json_t *json) { return parseCommon(json); } int Node::parseCommon( json_t *json, - std::function + std::function parse_signal) { assert(state == State::INITIALIZED || state == State::PARSED || state == State::CHECKED); @@ -173,7 +173,9 @@ int Node::parseCommon( } ret = dir.obj->parse(json_dir, - [&](json_t *j) { return parse_signal(j, dir.dir); }); + [&](json_t *json, unsigned index) { + return parse_signal(json, dir.dir, index); + }); if (ret) return ret; } diff --git a/lib/node_direction.cpp b/lib/node_direction.cpp index ff6d02473..747647a29 100644 --- a/lib/node_direction.cpp +++ b/lib/node_direction.cpp @@ -23,7 +23,7 @@ NodeDirection::NodeDirection(enum NodeDirection::Direction dir, Node *n) vectorize(1), config(nullptr) {} int NodeDirection::parse(json_t *json, - std::function parse_signal) { + std::function parse_signal) { int ret; json_error_t err; diff --git a/lib/nodes/amqp.cpp b/lib/nodes/amqp.cpp index 07da7a5a6..1f5e19b1a 100644 --- a/lib/nodes/amqp.cpp +++ b/lib/nodes/amqp.cpp @@ -338,7 +338,7 @@ int villas::node::amqp_write(NodeCompat *n, struct Sample *const smps[], unsigned cnt) { int ret; auto *a = n->getData(); - char data[1500]; + char data[DEFAULT_FORMAT_BUFFER_LENGTH]; size_t wbytes; ret = a->formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index bce209b8a..3fb50bf08 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -20,6 +20,8 @@ #include #include +#include + using namespace villas; using namespace villas::utils; using namespace villas::node; @@ -40,6 +42,8 @@ class KafkaNode : public Node, public RdKafka::EventCb { std::string ssl_ca; // SSL CA file. struct { + std::vector buffer; + std::unique_ptr client; std::unique_ptr topic; } producer; @@ -78,26 +82,25 @@ class KafkaNode : public Node, public RdKafka::EventCb { int _write(struct Sample *smps[], unsigned cnt) override { assert(producer.client != nullptr); - size_t wbytes; - - char data[DEFAULT_FORMAT_BUFFER_LENGTH]; + if (produce.empty()) { + logger->warn( + "No produce possible because no produce topic is configured"); + return 0; + } - auto ret = formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); + size_t wbytes; + auto ret = formatter->sprint(producer.buffer.data(), producer.buffer.size(), &wbytes, smps, cnt); if (ret < 0) return ret; - - if (!produce.empty()) { - auto ret = producer.client->produce( - producer.topic.get(), RdKafka::Topic::PARTITION_UA, - RdKafka::Producer::RK_MSG_COPY, data, wbytes, NULL, 0, NULL); - if (ret != RdKafka::ErrorCode::ERR_NO_ERROR) { - logger->warn("Publish failed"); - return -abs(ret); - } - } else - logger->warn( - "No produce possible because no produce topic is configured"); - + + auto pr = producer.client->produce( + producer.topic.get(), RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, producer.buffer.data(), wbytes, NULL, 0, NULL); + if (pr != RdKafka::ErrorCode::ERR_NO_ERROR) { + logger->warn("Publish failed"); + return -abs(pr); + } + return cnt; } @@ -163,12 +166,18 @@ class KafkaNode : public Node, public RdKafka::EventCb { uint64_t incr = 1; consumer.queue->io_event_enable(consumer.eventFd, &incr, sizeof(incr)); - auto ec = consumer.client->start(consumer.topic.get(), 0, 0, + auto ec = consumer.client->start(consumer.topic.get(), 0, RdKafka::Topic::OFFSET_END, consumer.queue.get()); if (ec != RdKafka::ErrorCode::ERR_NO_ERROR) throw RuntimeError("Error subscribing to {} at {}: {}", consume, server, RdKafka::err2str(ec)); + + ec = consumer.client->seek(consumer.topic.get(), 0, RdKafka::Topic::OFFSET_END, 0); + if (ec != RdKafka::ErrorCode::ERR_NO_ERROR) + throw RuntimeError("Error seeking to end of {} at {}: {}", consume, server, + RdKafka::err2str(ec)); + logger->info("Subscribed consumer from bootstrap server {}", server); return 0; @@ -224,7 +233,11 @@ class KafkaNode : public Node, public RdKafka::EventCb { public: KafkaNode(const uuid_t &id = {}, const std::string &name = "") : Node(id, name), timeout(1000), client_id("villas-node"), producer({}), - consumer({.eventFd = -1}) {} + consumer({}) { + consumer.eventFd = -1; + + producer.buffer = std::vector(MSG_LEN(100000)); + } virtual ~KafkaNode() {} diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index d76704ff0..f52215257 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -455,7 +455,7 @@ int villas::node::mqtt_write(NodeCompat *n, struct Sample *const smps[], size_t wbytes; - char data[1500]; + char data[DEFAULT_FORMAT_BUFFER_LENGTH]; ret = m->formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); if (ret < 0) diff --git a/lib/nodes/nanomsg.cpp b/lib/nodes/nanomsg.cpp index 15ad6076a..3e55494fa 100644 --- a/lib/nodes/nanomsg.cpp +++ b/lib/nodes/nanomsg.cpp @@ -224,7 +224,7 @@ int villas::node::nanomsg_read(NodeCompat *n, struct Sample *const smps[], unsigned cnt) { auto *m = n->getData(); int bytes; - char data[NANOMSG_MAX_PACKET_LEN]; + char data[DEFAULT_FORMAT_BUFFER_LENGTH]; // Receive payload bytes = nn_recv(m->in.socket, data, sizeof(data), 0); @@ -241,7 +241,7 @@ int villas::node::nanomsg_write(NodeCompat *n, struct Sample *const smps[], size_t wbytes; - char data[NANOMSG_MAX_PACKET_LEN]; + char data[DEFAULT_FORMAT_BUFFER_LENGTH]; ret = m->formatter->sprint(data, sizeof(data), &wbytes, smps, cnt); if (ret <= 0) diff --git a/lib/nodes/opal_orchestra.cpp b/lib/nodes/opal_orchestra.cpp index 715f916f7..4911ca499 100644 --- a/lib/nodes/opal_orchestra.cpp +++ b/lib/nodes/opal_orchestra.cpp @@ -47,47 +47,40 @@ class OpalOrchestraMapping { public: std::shared_ptr item; std::string path; - std::vector signals; + Signal::Ptr firstSignal; + Signal::Ptr lastSignal; - // Cached signal indices - // We keep a vector of indices to map the signal index in the signal list. - SignalList::Ptr signalList; // Signal list for which the indices are valid. - std::vector> indices; + unsigned villasOffset; + unsigned villasLength; // Run-time members which will be retrieved from Orchestra in prepare(). - unsigned short key; - char *buffer; - unsigned int typeSize; // sizeof() of the signal type. See RTSignalType. - unsigned short length; + unsigned short orchestraLength; + unsigned short orchestraKey; + char *orchestraBuffer; + unsigned int orchestraTypeSize; // sizeof() of the signal type. See RTSignalType. OpalOrchestraMapping(std::shared_ptr item, std::string path) - : item(item), path(std::move(path)), signals(), signalList(), indices(), - key(0), buffer(nullptr), typeSize(0), length(0) {} - - void addSignal(Signal::Ptr signal, std::optional orchestraIdx) { - if (!orchestraIdx) { - orchestraIdx = signals.size(); - } - - if (*orchestraIdx < signals.size()) { - if (signals[*orchestraIdx]) { - throw RuntimeError("Index {} of Orchestra signal already mapped", - *orchestraIdx); - } - } else { - signals.resize(*orchestraIdx + 1, nullptr); - item->length = signals.size(); - } - - signals[*orchestraIdx] = signal; + : item(item), path(std::move(path)), + orchestraKey(0), orchestraBuffer(nullptr), orchestraTypeSize(0) {} + + void addSignal(Signal::Ptr signal, unsigned index) { + if (!firstSignal) { + villasOffset = index; + firstSignal = signal; + } else if (signal->type != lastSignal->type) { + throw RuntimeError("Signal type mismatch: '{}' != '{}' for signal '{}'", + signalTypeToString(signal->type), + signalTypeToString(lastSignal->type), signal->name); + } else if (signal->init.f != lastSignal->init.f) { + throw RuntimeError("Signal default value mismatch for signal '{}'", + signal->name); + } + + lastSignal = signal; + villasLength++; } void check() { - if (signals.empty()) { - throw RuntimeError("No signal mapped to Orchestra signal '{}'", - item->name); - } - if (item->name.empty()) { throw RuntimeError("Signal name cannot be empty"); } @@ -105,24 +98,6 @@ class OpalOrchestraMapping { throw RuntimeError("Signal name '{}' is too long", item->name); } - auto firstSignal = signals[0]; - - for (auto &signal : signals) { - if (signal->type != firstSignal->type) { - throw RuntimeError("Signal type mismatch: {} vs {} for signal '{}'", - signalTypeToString(signal->type), - signalTypeToString(firstSignal->type), signal->name); - } - - if (signal->init.f != firstSignal->init.f) { - throw RuntimeError("Signal default value mismatch: {} vs {} for signal " - "'{}'", - signal->init.toString(signal->type), - firstSignal->init.toString(firstSignal->type), - signal->name); - } - } - auto orchestraType = toOrchestraSignalType(firstSignal->type); if (item->type != orchestraType) { throw RuntimeError("Signal type mismatch: {} vs {} for signal '{}'", @@ -131,106 +106,59 @@ class OpalOrchestraMapping { } } - void prepare(unsigned int connectionKey) { - auto ret = RTGetInfoForItem(path.c_str(), &typeSize, &length); + void prepare(unsigned int orchestraConnectionKey) { + auto ret = RTGetInfoForItem(path.c_str(), &orchestraTypeSize, &orchestraLength); if (ret != RTAPI_SUCCESS) { throw RTError(ret, "Failed to get info for signal '{}'", item->name); } - ret = RTGetKeyForItem(path.c_str(), &key); + ret = RTGetKeyForItem(path.c_str(), &orchestraKey); if (ret != RTAPI_SUCCESS) { throw RTError(ret, "Failed to get key for signal '{}'", item->name); } - ret = RTGetBuffer(connectionKey, key, (void **)&buffer); + ret = RTGetBuffer(orchestraConnectionKey, orchestraKey, (void **)&orchestraBuffer); if (ret != RTAPI_SUCCESS) { throw RTError(ret, "Failed to get buffer for signal '{}'", item->name); } auto logger = Log::get("orchestra"); logger->trace( - "Prepared mapping: path='{}', type={}, typeSize={}, key={}, buffer={}" - "length={}, default={}", - path, orchestra::signalTypeToString(item->type), key, buffer, typeSize, - length, item->defaultValue); + "Prepared mapping: path='{}', type={}, orchestra.type_size={}, orchestra.key={}, orchestra.buffer={}" + "orchestra.length={}, villas.length={}, villas.offset={}, default={}", + path, orchestra::signalTypeToString(item->type), orchestraTypeSize, orchestraKey, orchestraBuffer, + orchestraLength, villasLength, villasOffset, item->defaultValue); } void publish(struct Sample *smp) { - updateIndices(smp->signals); + unsigned length; - auto *orchestraDataPtr = buffer; - for (auto &index : indices) { - if (!index || *index >= smp->length) { - orchestraDataPtr += typeSize; - continue; // Unused index or index out of range. - } - - auto signal = smp->signals->getByIndex(*index); - if (!signal) { - throw RuntimeError("Signal {} not found", index); - } - - toOrchestraSignalData(orchestraDataPtr, item->type, smp->data[*index], - signal->type); + length = MIN(villasLength, orchestraLength); + length = MIN(length, smp->length); - orchestraDataPtr += typeSize; + for (unsigned index = villasOffset; index < length; index++) { + toOrchestraSignalData(orchestraBuffer + index * orchestraTypeSize, item->type, smp->data[villasOffset + index], + firstSignal->type); } } void subscribe(struct Sample *smp) { - updateIndices(smp->signals); - - auto *orchestraDataPtr = buffer; - for (auto &index : indices) { - if (!index || *index >= smp->capacity) { - continue; // Unused index or index out of range. - } - - for (unsigned i = smp->length; i < *index; i++) { - smp->data[i].i = 0; - } - - auto signal = smp->signals->getByIndex(*index); - if (!signal) { - throw RuntimeError("Signal {} not found", *index); - } + unsigned length; + + length = MIN(villasLength, orchestraLength); + length = MIN(length, smp->capacity); + for (unsigned index = 0; index < length; index++) { node::SignalType villasType; SignalData villasData = - toNodeSignalData(orchestraDataPtr, item->type, villasType); - - smp->data[*index] = villasData.cast(villasType, signal->type); - - if (index >= static_cast(smp->length)) { - smp->length = *index + 1; - } - - orchestraDataPtr += typeSize; + toNodeSignalData(orchestraBuffer + index * orchestraTypeSize, item->type, villasType); + + smp->data[villasOffset + index] = villasData.cast(villasType, firstSignal->type); } - } -protected: - void updateIndices(SignalList::Ptr newSignalList) { - if (signalList == newSignalList) { - return; // Already up to date. - } - - indices.clear(); - - for (const auto &signal : signals) { - if (signal) { - auto idx = newSignalList->getIndexByName(signal->name); - if (idx < 0) { - throw RuntimeError("Signal '{}' not found", signal->name); - } - - indices.push_back(idx); - } else { - indices.emplace_back(); // Unused index - } + if (villasOffset + length > smp->length) { + smp->length = villasOffset + length; } - - signalList = newSignalList; } }; @@ -328,14 +256,34 @@ class OpalOrchestraNode : public Node { } } + void checkDuplicateNames() { + for (auto &[name, item] : domain.publish.items) { + if (domain.subscribe.items.find(name) != domain.subscribe.items.end()) { + throw RuntimeError( + "Orchestra signal '{}' is used for both publish and subscribe", + name); + } + } + } + + void checkMappings() { + for (auto &mapping : subscribeMappings) { + mapping.second.check(); + } + + for (auto &mapping : publishMappings) { + mapping.second.check(); + } + } + public: OpalOrchestraNode(const uuid_t &id = {}, const std::string &name = "", unsigned int key = 0) : Node(id, name), task(), connectionKey(key), status(nullptr), domain(), - subscribeMappings(), publishMappings(), rate(1), connectTimeout(5), + subscribeMappings(), publishMappings(), rate(1000), connectTimeout(5), skipWaitToGo(false), dataDefinitionFileOverwrite(false) {} - Signal::Ptr parseSignal(json_t *json_signal, NodeDirection::Direction dir) { + Signal::Ptr parseSignal(json_t *json_signal, NodeDirection::Direction dir, unsigned index) { auto signal = Signal::fromJson(json_signal); DataSet &dataSet = @@ -346,22 +294,17 @@ class OpalOrchestraNode : public Node { const char *nme = nullptr; const char *typ = nullptr; - int oi = -1; + int length = 1; json_error_t err; - auto ret = json_unpack_ex(json_signal, &err, 0, "{ s?: s, s?: s, s?: i }", - "orchestra_name", &nme, "orchestra_type", &typ, - "orchestra_index", &oi); + auto ret = json_unpack_ex(json_signal, &err, 0, "{ s?: i, s?: s, s?: s }", + "length", &length,"orchestra_name", &nme, "orchestra_type", &typ); if (ret) { throw ConfigError(json_signal, err, "node-config-node-opal-orchestra-signals"); } - - std::optional orchestraIdx; - - if (oi >= 0) { - orchestraIdx = oi; - } + + bool isVector = length > 1; auto defaultValue = signal->init.cast(signal->type, node::SignalType::FLOAT); @@ -379,10 +322,12 @@ class OpalOrchestraNode : public Node { item->defaultValue = defaultValue.f; mappings.emplace(item, OpalOrchestraMapping(item, orchestraName)); + } else if (!isVector) { + throw ConfigError(json_signal, "node-config-node-opal-orchestra.signals", "Only vectors can have same Orchestra item name"); } auto &mapping = mappings.at(item); - mapping.addSignal(signal, orchestraIdx); + mapping.addSignal(signal, index); return signal; } @@ -393,8 +338,8 @@ class OpalOrchestraNode : public Node { subscribeMappings.clear(); int reti = parseCommon( - json, [&](json_t *json_signal, NodeDirection::Direction dir) { - return parseSignal(json_signal, dir); + json, [&](json_t *json_signal, NodeDirection::Direction dir, unsigned index) { + return parseSignal(json_signal, dir, index); }); if (reti) return reti; @@ -467,22 +412,12 @@ class OpalOrchestraNode : public Node { return 0; } - void checkDuplicateNames(const DataSet &a, const DataSet &b) { - for (auto &[name, item] : a.items) { - if (b.items.find(name) != b.items.end()) { - throw RuntimeError( - "Orchestra signal '{}' is used for both publish and subscribe", - name); - } - } - } - int check() override { if (dataDefinitionFilename) { if (!fs::exists(*dataDefinitionFilename) && !dataDefinitionFileOverwrite) { throw RuntimeError("OPAL-RT Orchestra Data Definition file (DDF) at " - "'{}' does not exist", + "{} does not exist", *dataDefinitionFilename); } } else { @@ -492,7 +427,8 @@ class OpalOrchestraNode : public Node { } } - checkDuplicateNames(domain.publish, domain.subscribe); + checkDuplicateNames(); + checkMappings(); return Node::check(); } diff --git a/lib/nodes/redis.cpp b/lib/nodes/redis.cpp index 1a69c8a40..c33c3b73c 100644 --- a/lib/nodes/redis.cpp +++ b/lib/nodes/redis.cpp @@ -565,7 +565,7 @@ int villas::node::redis_write(NodeCompat *n, struct Sample *const smps[], switch (r->mode) { case RedisMode::CHANNEL: for (unsigned i = 0; i < cnt; i++) { - char buf[1500]; + char buf[DEFAULT_FORMAT_BUFFER_LENGTH]; size_t wbytes; ret = r->formatter->sprint(buf, sizeof(buf), &wbytes, &smps[i], cnt); @@ -579,7 +579,7 @@ int villas::node::redis_write(NodeCompat *n, struct Sample *const smps[], break; case RedisMode::KEY: { - char buf[1500]; + char buf[DEFAULT_FORMAT_BUFFER_LENGTH]; size_t wbytes; ret = r->formatter->sprint(buf, sizeof(buf), &wbytes, smps, cnt); diff --git a/lib/signal.cpp b/lib/signal.cpp index 02cc3b405..71449b86d 100644 --- a/lib/signal.cpp +++ b/lib/signal.cpp @@ -24,7 +24,7 @@ int Signal::parse(json_t *json) { json_t *json_init = nullptr; const char *name_str = nullptr; const char *unit_str = nullptr; - const char *type_str = "float"; + const char *type_str = nullptr; ret = json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: s, s?: o }", "name", &name_str, "unit", &unit_str, "type", &type_str, "init", @@ -42,7 +42,8 @@ int Signal::parse(json_t *json) { type = signalTypeFromString(type_str); if (type == SignalType::INVALID) return -1; - } + } else + type = SignalType::FLOAT; if (json_init) { ret = init.parseJson(type, json_init); @@ -88,42 +89,6 @@ std::string Signal::toString(const union SignalData *d) const { return ss.str(); } -/* Check if two signal names are numbered ascendingly - * - * E.g. signal3 -> signal4 - */ -static bool isNextName(const std::string &a, const std::string &b) { - // Find common prefix - std::string::const_iterator ia, ib; - for (ia = a.cbegin(), ib = b.cbegin(); - ia != b.cend() && ib != b.cend() && *ia == *ib; ++ia, ++ib) - ; - - // Suffixes - auto sa = std::string(ia, a.cend()); - auto sb = std::string(ib, b.cend()); - - try { - size_t ea, eb; - auto na = std::stoul(sa, &ea, 10); - auto nb = std::stoul(sb, &eb, 10); - - return na + 1 == nb; - } catch (std::exception &) { - return false; - } -} - -bool Signal::isNext(const Signal &sig) { - if (type != sig.type) - return false; - - if (!unit.empty() && !sig.unit.empty() && unit != sig.unit) - return false; - - return isNextName(name, sig.name); -} - Signal::Ptr Signal::fromJson(json_t *json) { auto signal = std::make_shared(); diff --git a/lib/signal_list.cpp b/lib/signal_list.cpp index 6ef977521..f6611636d 100644 --- a/lib/signal_list.cpp +++ b/lib/signal_list.cpp @@ -18,15 +18,15 @@ using namespace villas::node; using namespace villas::utils; SignalList::SignalList(json_t *json_signals, - std::function parse_signal) { + std::function parse_signal) { parse(json_signals, parse_signal); } -SignalList::SignalList(unsigned len, enum SignalType typ) { +SignalList::SignalList(unsigned length, enum SignalType typ) { auto typ_str = signalTypeToString(typ); auto *json_signals = json_pack("{ s: s, s: s, s: i }", "name", "signal", - "type", typ_str.c_str(), "count", len); + "type", typ_str.c_str(), "length", length); parse(json_signals); } @@ -40,9 +40,9 @@ SignalList::SignalList(std::string_view dt) { auto *dtc = dt.data(); for (const char *t = dtc; *t; t = e + 1) { - auto len = strtoul(t, &e, 10); + auto length = strtoul(t, &e, 10); if (t == e) - len = 1; + length = 1; auto name = fmt::format("signal_{}", i++); @@ -53,7 +53,7 @@ SignalList::SignalList(std::string_view dt) { auto typ_str = signalTypeToString(typ); auto *json_signal = json_pack("{ s: s, s: s, s: i }", "name", name.c_str(), - "type", typ_str.c_str(), "count", len); + "type", typ_str.c_str(), "length", length); json_array_append_new(json_signals, json_signal); } @@ -62,7 +62,7 @@ SignalList::SignalList(std::string_view dt) { } void SignalList::parse(json_t *json_signals, - std::function parse_signal) { + std::function parse_signal) { clear(); if (json_is_string(json_signals)) { @@ -77,6 +77,8 @@ void SignalList::parse(json_t *json_signals, "Invalid signal list"); } + unsigned index = 0; + size_t i; json_t *json_signal; json_array_foreach(json_signals, i, json_signal) { @@ -89,18 +91,17 @@ void SignalList::parse(json_t *json_signals, std::string baseName = "signal"; bool appendIndex = false; - int count = 1; + int length = 1; const char *nme = nullptr; - int ret = json_unpack(json_signal, "{ s?: i, s?: s }", "count", &count, + int ret = json_unpack(json_signal, "{ s?: i, s?: s }", "length", &length, "name", &nme); if (ret) { throw ConfigError(json_signal, "node-config-node-signal", "Failed to parse signal definition"); } - if (count > 1) { - json_object_del(json_signal, "count"); + if (length > 1) { appendIndex = true; } @@ -108,18 +109,22 @@ void SignalList::parse(json_t *json_signals, baseName = nme; } - for (int j = 0; j < count; j++) { + Signal::Ptr previousSignal; + for (int j = 0; j < length; j++) { if (appendIndex) { auto name = fmt::format("{}_{}", baseName, j); json_object_set_new(json_signal, "name", json_string(name.c_str())); } - auto signal = parse_signal(json_signal); + auto signal = parse_signal(json_signal, index++); if (!signal) throw ConfigError(json_signal, "node-config-node-signal", "Failed to parse signal definition"); push_back(signal); + + signal->previous = previousSignal; + previousSignal = signal; } } } @@ -129,12 +134,12 @@ void SignalList::dump(Logger logger, const union SignalData *data, const char *pfx; bool abbrev = false; - Signal::Ptr prevSig; + Signal::Ptr previousSignal; unsigned i = 0; - for (auto sig : *this) { + for (auto signal : *this) { // Check if this is a sequence of similar signals which can be abbreviated if (i >= 1 && i < size() - 1) { - if (prevSig->isNext(*sig)) { + if (signal->previous == previousSignal) { abbrev = true; goto skip; } @@ -147,10 +152,10 @@ void SignalList::dump(Logger logger, const union SignalData *data, pfx = " "; logger->info(" {}{:>3}: {}", pfx, i, - sig->toString(i < len ? &data[i] : nullptr)); + signal->toString(i < len ? &data[i] : nullptr)); skip: - prevSig = sig; + previousSignal = signal; i++; } } diff --git a/tests/integration/hook-average.sh b/tests/integration/hook-average.sh index f2c5aa37f..196d30a61 100755 --- a/tests/integration/hook-average.sh +++ b/tests/integration/hook-average.sh @@ -31,7 +31,7 @@ cat > input.dat < expect.dat < expect.dat < output.dat +villas hook -o offset=0 -o signals=signal_0,signal_1,signal_2,signal_3,signal_4 average < input.dat > output.dat villas compare output.dat expect.dat diff --git a/tests/integration/hook-cast.sh b/tests/integration/hook-cast.sh index 70e9b1557..a79d25594 100755 --- a/tests/integration/hook-cast.sh +++ b/tests/integration/hook-cast.sh @@ -18,7 +18,7 @@ function finish { trap finish EXIT cat > input.dat < input.dat < expect.dat < expect.dat < output.dat +villas hook cast -o new_name=test -o new_unit=V -o new_type=integer -o signal=signal_1 < input.dat > output.dat villas compare output.dat expect.dat diff --git a/tests/integration/hook-frame.sh b/tests/integration/hook-frame.sh index b2c058d04..a0607adcd 100755 --- a/tests/integration/hook-frame.sh +++ b/tests/integration/hook-frame.sh @@ -32,17 +32,17 @@ cat > input.dat < expect.dat < output.dat diff --git a/tests/integration/hook-gate.sh b/tests/integration/hook-gate.sh index 8fe5647f5..c7b294ac7 100755 --- a/tests/integration/hook-gate.sh +++ b/tests/integration/hook-gate.sh @@ -32,7 +32,7 @@ cat > input.dat < expect.dat < config.json <= 0", "type": "boolean" }, - { "name": "abs(signal1)", "expression": "math.abs(smp.data.signal1)" }, - { "name": "signal4_scaled", "expression": "smp.data.signal4 * 100 + 55" }, - { "name": "sequence", "expression": "smp.sequence", "type": "integer" }, - { "name": "ts_origin", "expression": "smp.ts_origin[0] + smp.ts_origin[1] * 1e-9" } + { "name": "signal_1_positive", "expression": "smp.data.signal_1 >= 0", "type": "boolean" }, + { "name": "abs(signal_1)", "expression": "math.abs(smp.data.signal_1)" }, + { "name": "signal_4_scaled", "expression": "smp.data.signal_4 * 100 + 55" }, + { "name": "sequence", "expression": "smp.sequence", "type": "integer" }, + { "name": "ts_origin", "expression": "smp.ts_origin[0] + smp.ts_origin[1] * 1e-9" } ] } EOF @@ -44,7 +44,7 @@ cat > input.dat < expect.dat < config.json < input.dat < expect.dat < input.dat < input.dat < expect.dat < expect.dat < output.dat +villas hook scale -o scale=100 -o offset=55 -o signal=signal_4 < input.dat > output.dat villas compare output.dat expect.dat diff --git a/tests/integration/node-loopback-socket.sh b/tests/integration/node-loopback-socket.sh index 338938323..9bf442513 100755 --- a/tests/integration/node-loopback-socket.sh +++ b/tests/integration/node-loopback-socket.sh @@ -35,7 +35,7 @@ cat > config.json < config.json < config.json < config.json < config.json << EOF "sv_id": "1234", "signals": { "iec_type": "float32", - "count": 64 + "length": 64 } }, "in": { "signals": { "iec_type": "float32", - "count": 64 + "length": 64 } } } diff --git a/tests/integration/pipe-loopback-nanomsg.sh b/tests/integration/pipe-loopback-nanomsg.sh index dbf19bbd1..26d2136a7 100755 --- a/tests/integration/pipe-loopback-nanomsg.sh +++ b/tests/integration/pipe-loopback-nanomsg.sh @@ -36,7 +36,7 @@ cat > config.json << EOF "signals": { "type": "float", - "count": 5 + "length": 5 } }, "out": { diff --git a/tests/integration/pipe-loopback-rtp-dual.sh b/tests/integration/pipe-loopback-rtp-dual.sh index de52f205f..b18e07688 100755 --- a/tests/integration/pipe-loopback-rtp-dual.sh +++ b/tests/integration/pipe-loopback-rtp-dual.sh @@ -47,7 +47,7 @@ cat > src.json << EOF "in": { "address": "0.0.0.0:12002", "signals": { - "count": 5, + "length": 5, "type": "float" } }, @@ -80,7 +80,7 @@ cat > dest.json << EOF "in": { "address": "0.0.0.0:12000", "signals": { - "count": 5, + "length": 5, "type": "float" } }, diff --git a/tests/integration/pipe-loopback-rtp-remote.sh b/tests/integration/pipe-loopback-rtp-remote.sh index f846065df..ad01771ec 100755 --- a/tests/integration/pipe-loopback-rtp-remote.sh +++ b/tests/integration/pipe-loopback-rtp-remote.sh @@ -58,7 +58,7 @@ cat > src.json << EOF "in": { "address": "0.0.0.0:33466", "signals": { - "count": 5, + "length": 5, "type": "float" } }, @@ -95,7 +95,7 @@ cat > dest.json << EOF "in": { "address": "0.0.0.0:33464", "signals": { - "count": 5, + "length": 5, "type": "float" } }, diff --git a/tests/integration/pipe-loopback-rtp-tbf.sh b/tests/integration/pipe-loopback-rtp-tbf.sh index 97b741a67..2f8f20388 100755 --- a/tests/integration/pipe-loopback-rtp-tbf.sh +++ b/tests/integration/pipe-loopback-rtp-tbf.sh @@ -52,7 +52,7 @@ cat > src.json << EOF "in": { "address": "0.0.0.0:12002", "signals": { - "count": ${NUM_VALUES}, + "length": ${NUM_VALUES}, "type": "float" } }, @@ -84,7 +84,7 @@ cat > dest.json << EOF "in": { "address": "0.0.0.0:12000", "signals": { - "count": ${NUM_VALUES}, + "length": ${NUM_VALUES}, "type": "float" } }, diff --git a/tests/integration/pipe-loopback-rtp.sh b/tests/integration/pipe-loopback-rtp.sh index 426bdfeb9..1e3d81bf0 100755 --- a/tests/integration/pipe-loopback-rtp.sh +++ b/tests/integration/pipe-loopback-rtp.sh @@ -54,7 +54,7 @@ cat > config.json << EOF "signals": { "type": "float", - "count": 5 + "length": 5 } }, "out": { diff --git a/tests/integration/pipe-loopback-socket.sh b/tests/integration/pipe-loopback-socket.sh index 94121fd6f..9ccafee86 100755 --- a/tests/integration/pipe-loopback-socket.sh +++ b/tests/integration/pipe-loopback-socket.sh @@ -69,7 +69,7 @@ cat > config.json << EOF "in": { "address": "${LOCAL}", "signals": { - "count": ${NUM_VALUES}, + "length": ${NUM_VALUES}, "type": "float" } } diff --git a/tests/integration/pipe-loopback-zeromq.sh b/tests/integration/pipe-loopback-zeromq.sh index 9e3c299b6..c3b3b417f 100755 --- a/tests/integration/pipe-loopback-zeromq.sh +++ b/tests/integration/pipe-loopback-zeromq.sh @@ -38,7 +38,7 @@ cat > config.json << EOF "subscribe": "tcp://127.0.0.1:12000", "signals": { "type": "float", - "count": 5 + "length": 5 } } }