Skip to content

Commit eb36728

Browse files
authored
feat: add packet monitoring 'stats' example (#20)
* feat: add packet monitoring 'stats' example * chore: update testing peripheral
1 parent 1be63bc commit eb36728

8 files changed

Lines changed: 357 additions & 5 deletions

File tree

CMakeLists.txt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,22 @@ install(
112112

113113
if ("examples" IN_LIST VCPKG_MANIFEST_FEATURES)
114114
add_executable(stream_out examples/stream_out/main.cpp)
115+
file(GLOB_RECURSE TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/examples/stream_out/*.cpp")
116+
target_sources(stream_out PRIVATE ${TEST_SOURCES})
117+
target_include_directories(stream_out PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/examples)
115118
target_link_libraries(stream_out PRIVATE ${PROJECT_NAME} science::scipp)
116119
set_target_properties(stream_out PROPERTIES
117120
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/examples"
118121
)
122+
123+
add_executable(stats examples/stats/main.cpp)
124+
file(GLOB_RECURSE TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/examples/stats/*.cpp")
125+
target_sources(stats PRIVATE ${TEST_SOURCES})
126+
target_include_directories(stats PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/examples)
127+
target_link_libraries(stats PRIVATE ${PROJECT_NAME} science::scipp)
128+
set_target_properties(stats PROPERTIES
129+
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/examples"
130+
)
119131
endif()
120132

121133
if ("tests" IN_LIST VCPKG_MANIFEST_FEATURES)

examples/stats/main.cpp

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#include <memory>
2+
#include <chrono>
3+
4+
#include "science/scipp/status.h"
5+
#include "science/synapse/channel.h"
6+
#include "science/synapse/data.h"
7+
#include "science/synapse/device.h"
8+
#include "science/synapse/nodes/broadband_source.h"
9+
#include "science/synapse/nodes/stream_out.h"
10+
#include "packet_monitoring.h"
11+
12+
using science::libndtp::NDTPHeader;
13+
using synapse::Ch;
14+
using synapse::Config;
15+
using synapse::Device;
16+
using synapse::DeviceInfo;
17+
using synapse::Electrodes;
18+
using synapse::NodeType;
19+
using synapse::Signal;
20+
using synapse::StreamOut;
21+
using synapse::SynapseData;
22+
using synapse::NodeConfig;
23+
using synapse::Node;
24+
25+
auto configure_stream(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> science::Status {
26+
const uint32_t N_CHANNELS = 32; // Using more channels for stats testing
27+
if (stream_out_ptr == nullptr) {
28+
return { science::StatusCode::kInvalidArgument, "stream out pointer is null" };
29+
}
30+
31+
science::Status s;
32+
DeviceInfo info;
33+
s = device.info(&info);
34+
if (!s.ok()) return s;
35+
36+
// Configure signal with more channels for statistics gathering
37+
Signal signal{
38+
Electrodes{
39+
.channels = {},
40+
.low_cutoff_hz = 500,
41+
.high_cutoff_hz = 6000
42+
}
43+
};
44+
auto& electrodes = std::get<Electrodes>(signal.signal);
45+
electrodes.channels.reserve(N_CHANNELS);
46+
for (unsigned int i = 0; i < N_CHANNELS; i++) {
47+
electrodes.channels.push_back(Ch{
48+
.id = i,
49+
.electrode_id = i * 2,
50+
.reference_id = i * 2 + 1
51+
});
52+
}
53+
54+
Config config;
55+
auto broadband_source = std::make_shared<synapse::BroadbandSource>(100, 16, 30000, 20.0, signal);
56+
57+
NodeConfig stream_out_config;
58+
auto* stream_out_proto = stream_out_config.mutable_stream_out();
59+
stream_out_proto->set_multicast_group("224.0.0.115");
60+
61+
std::shared_ptr<Node> stream_out_node;
62+
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
63+
if (!s.ok()) return s;
64+
65+
*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
66+
if (!*stream_out_ptr) {
67+
return { science::StatusCode::kInternal, "failed to cast stream out node" };
68+
}
69+
70+
s = config.add_node(broadband_source);
71+
if (!s.ok()) return s;
72+
73+
s = config.add_node(*stream_out_ptr);
74+
if (!s.ok()) return s;
75+
76+
s = config.connect(broadband_source, *stream_out_ptr);
77+
if (!s.ok()) return s;
78+
79+
s = device.configure(&config);
80+
if (!s.ok()) return s;
81+
82+
std::cout << "Configured device..." << std::endl;
83+
84+
s = device.start();
85+
if (!s.ok()) return s;
86+
87+
std::cout << "Started device..." << std::endl;
88+
return s;
89+
}
90+
91+
auto stream(const std::string& uri) -> int {
92+
synapse::Device device(uri);
93+
science::Status s;
94+
95+
std::shared_ptr<synapse::StreamOut> stream_out;
96+
s = configure_stream(device, &stream_out);
97+
if (!s.ok()) {
98+
std::cout << "error configuring stream: ("
99+
<< static_cast<int>(s.code()) << ") " << s.message() << std::endl;
100+
return 1;
101+
}
102+
103+
if (stream_out == nullptr) {
104+
std::cout << "stream out node not initialized" << std::endl;
105+
return 1;
106+
}
107+
108+
std::cout << "Monitoring packet statistics..." << std::endl;
109+
110+
// Initialize packet monitor
111+
PacketMonitor monitor;
112+
monitor.start_monitoring();
113+
auto last_stats_time = std::chrono::steady_clock::now();
114+
115+
while (true) {
116+
size_t bytes_read;
117+
NDTPHeader header;
118+
SynapseData out;
119+
s = stream_out->read(&out, &header, &bytes_read);
120+
if (s.code() == science::StatusCode::kUnavailable) {
121+
continue;
122+
}
123+
124+
if (!s.ok()) {
125+
std::cout << "error reading from stream out node: ("
126+
<< static_cast<int>(s.code()) << ") " << s.message() << std::endl;
127+
continue;
128+
}
129+
130+
monitor.process_packet(header.seq_number, bytes_read);
131+
132+
auto now = std::chrono::steady_clock::now();
133+
if (std::chrono::duration_cast<std::chrono::seconds>(now - last_stats_time).count() >= 1) {
134+
monitor.print_stats();
135+
last_stats_time = now;
136+
}
137+
}
138+
139+
return 0;
140+
}
141+
142+
int main(int argc, char** argv) {
143+
if (argc != 2) {
144+
std::cout << "Usage: " << argv[0] << " <uri>" << std::endl;
145+
std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl;
146+
return 1;
147+
}
148+
149+
std::string uri = argv[1];
150+
return stream(uri);
151+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#include "stats/packet_monitoring.h"
2+
#include <iomanip>
3+
#include <sstream>
4+
5+
PacketMonitor::PacketMonitor()
6+
: packet_count_(0)
7+
, last_seq_number_(0)
8+
, dropped_packets_(0)
9+
, out_of_order_packets_(0)
10+
, bytes_received_(0)
11+
, bytes_received_in_interval_(0)
12+
, last_jitter_(0)
13+
, avg_jitter_(0) {}
14+
15+
void PacketMonitor::start_monitoring() {
16+
start_time_ = std::chrono::steady_clock::now();
17+
last_stats_time_ = start_time_;
18+
last_bandwidth_time_ = start_time_;
19+
}
20+
21+
bool PacketMonitor::process_packet(uint16_t seq_number, size_t bytes_read) {
22+
auto now = std::chrono::steady_clock::now();
23+
24+
if (packet_count_ == 0) {
25+
first_packet_time_ = now;
26+
last_packet_time_ = now;
27+
auto elapsed = std::chrono::duration<double>(now - start_time_).count();
28+
std::cout << "First packet received after " << std::fixed << std::setprecision(3)
29+
<< elapsed << " seconds\n\n";
30+
} else {
31+
// Calculate jitter
32+
auto interval = std::chrono::duration<double>(now - last_packet_time_).count();
33+
if (packet_count_ > 1) {
34+
double jitter_diff = std::abs(interval - last_jitter_);
35+
avg_jitter_ += (jitter_diff - avg_jitter_) / 16.0; // RFC 3550 algorithm
36+
}
37+
last_jitter_ = interval;
38+
last_packet_time_ = now;
39+
40+
// Check for dropped or out-of-order packets
41+
uint16_t expected = (last_seq_number_ + 1) % (1 << 16);
42+
if (seq_number != expected) {
43+
if (seq_number > expected) {
44+
dropped_packets_ += (seq_number - expected) % (1 << 16);
45+
} else {
46+
out_of_order_packets_++;
47+
}
48+
}
49+
}
50+
51+
packet_count_++;
52+
bytes_received_ += bytes_read;
53+
bytes_received_in_interval_ += bytes_read;
54+
last_seq_number_ = seq_number;
55+
56+
return true;
57+
}
58+
59+
void PacketMonitor::clear_line() const {
60+
// Move to start of line and clear it
61+
std::cout << "\r" << std::string(80, ' ') << "\r";
62+
}
63+
64+
std::string PacketMonitor::format_stats() const {
65+
auto now = std::chrono::steady_clock::now();
66+
std::stringstream ss;
67+
68+
// Runtime
69+
auto runtime = std::chrono::duration<double>(now - start_time_).count();
70+
ss << "Runtime " << std::fixed << std::setprecision(1) << runtime << "s | ";
71+
72+
// Drop calculation
73+
double drop_percent = (static_cast<double>(dropped_packets_) / std::max<uint64_t>(1, packet_count_)) * 100.0;
74+
ss << "Dropped: " << dropped_packets_ << "/" << packet_count_
75+
<< " (" << std::setprecision(1) << drop_percent << "%) | ";
76+
77+
// Bandwidth calculation
78+
auto dt_sec = std::chrono::duration<double>(now - last_bandwidth_time_).count();
79+
if (dt_sec > 0) {
80+
double bytes_per_second = bytes_received_in_interval_ / dt_sec;
81+
double megabits_per_second = (bytes_per_second * 8) / 1'000'000;
82+
ss << "Mbit/sec: " << std::setprecision(1) << megabits_per_second << " | ";
83+
}
84+
85+
// Jitter (in milliseconds)
86+
double jitter_ms = avg_jitter_ * 1000;
87+
ss << "Jitter: " << std::setprecision(2) << jitter_ms << " ms | ";
88+
89+
// Out of order packets
90+
ss << "Out of Order: " << out_of_order_packets_;
91+
92+
return ss.str();
93+
}
94+
95+
void PacketMonitor::print_stats() {
96+
clear_line();
97+
std::cout << format_stats() << std::flush;
98+
99+
// Reset interval counters
100+
bytes_received_in_interval_ = 0;
101+
last_bandwidth_time_ = std::chrono::steady_clock::now();
102+
}

examples/stats/packet_monitoring.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#pragma once
2+
3+
#include <chrono>
4+
#include <cstdint>
5+
#include <iostream>
6+
#include <string>
7+
8+
class PacketMonitor {
9+
public:
10+
PacketMonitor();
11+
12+
void start_monitoring();
13+
void print_stats();
14+
bool process_packet(uint16_t seq_number, size_t bytes_read);
15+
16+
private:
17+
// Packet tracking
18+
uint64_t packet_count_;
19+
uint16_t last_seq_number_;
20+
uint64_t dropped_packets_;
21+
uint64_t out_of_order_packets_;
22+
23+
// Timing metrics
24+
std::chrono::steady_clock::time_point start_time_;
25+
std::chrono::steady_clock::time_point first_packet_time_;
26+
std::chrono::steady_clock::time_point last_packet_time_;
27+
std::chrono::steady_clock::time_point last_stats_time_;
28+
29+
// Bandwidth tracking
30+
uint64_t bytes_received_;
31+
uint64_t bytes_received_in_interval_;
32+
std::chrono::steady_clock::time_point last_bandwidth_time_;
33+
34+
// Jitter tracking
35+
double last_jitter_;
36+
double avg_jitter_;
37+
38+
// Helper methods
39+
void clear_line() const;
40+
std::string format_stats() const;
41+
};

examples/stream_out/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
4949
}
5050

5151
Config config;
52-
auto broadband_source = std::make_shared<synapse::BroadbandSource>(1, 16, 30000, 20.0, signal);
52+
auto broadband_source = std::make_shared<synapse::BroadbandSource>(100, 16, 30000, 20.0, signal);
5353
*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);
5454

5555
s = config.add_node(broadband_source);

include/science/synapse/nodes/stream_out.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class StreamOut : public UdpNode {
1616
public:
1717
StreamOut(const std::string& label, const std::string& multicast_group);
1818

19-
auto read(science::libndtp::SynapseData* out) -> science::Status;
19+
auto read(science::libndtp::SynapseData* out, science::libndtp::NDTPHeader* header = nullptr, size_t* bytes_read = nullptr) -> science::Status;
2020

2121
[[nodiscard]] static auto from_proto(
2222
const synapse::NodeConfig& proto,

src/science/synapse/nodes/stream_out.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ using science::libndtp::SynapseData;
2020

2121
auto unpack(
2222
const std::vector<uint8_t>& bytes,
23-
SynapseData* data
23+
SynapseData* data,
24+
science::libndtp::NDTPHeader* header,
25+
size_t* bytes_read
2426
) -> science::Status {
2527
NDTPMessage msg;
2628
try {
@@ -30,6 +32,13 @@ auto unpack(
3032
return { science::StatusCode::kInternal, "error unpacking NDTP message: " + std::string(e.what()) };
3133
}
3234

35+
if (header != nullptr) {
36+
*header = msg.header;
37+
}
38+
if (bytes_read != nullptr) {
39+
*bytes_read = bytes.size();
40+
}
41+
3342
switch (msg.header.data_type) {
3443
case DataType::kBroadband:
3544
*data = ElectricalBroadbandData::unpack(msg);
@@ -116,7 +125,7 @@ auto StreamOut::init() -> science::Status {
116125
return {};
117126
}
118127

119-
auto StreamOut::read(SynapseData* data) -> science::Status {
128+
auto StreamOut::read(SynapseData* data, science::libndtp::NDTPHeader* header, size_t* bytes_read) -> science::Status {
120129
if (!sock() || !addr()) {
121130
auto s = init();
122131
if (!s.ok()) {
@@ -155,7 +164,7 @@ auto StreamOut::read(SynapseData* data) -> science::Status {
155164
}
156165

157166
buf.resize(rc);
158-
return unpack(buf, data);
167+
return unpack(buf, data, header, bytes_read);
159168
}
160169

161170
auto StreamOut::p_to_proto(synapse::NodeConfig* proto) -> science::Status {

0 commit comments

Comments
 (0)