Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ serde = { version = "1.0", features = ["derive"], optional = true }
# Parser dependencies #
#######################
bytes = { version = "1.7", optional = true }
zerocopy = { version = "0.8", features = ["derive"], optional = true }
hex = { version = "0.4.3", optional = true } # bmp/openbmp parsing
oneio = { version = "0.20.0", default-features = false, features = ["http", "gz", "bz"], optional = true }
regex = { version = "1", optional = true } # used in parser filter
Expand All @@ -60,6 +61,7 @@ parser = [
"bytes",
"chrono",
"regex",
"zerocopy",
]
cli = [
"clap",
Expand Down
26 changes: 26 additions & 0 deletions benches/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,19 @@ pub fn criterion_benchmark(c: &mut Criterion) {
})
});

c.bench_function("updates into_raw_record_iter", |b| {
b.iter(|| {
let mut reader = black_box(&updates[..]);

BgpkitParser::from_reader(&mut reader)
.into_raw_record_iter()
.take(RECORD_LIMIT)
.for_each(|x| {
black_box(x);
});
})
});

c.bench_function("rib into_record_iter", |b| {
b.iter(|| {
let mut reader = black_box(&rib_dump[..]);
Expand Down Expand Up @@ -129,6 +142,19 @@ pub fn criterion_benchmark(c: &mut Criterion) {
});
})
});

c.bench_function("rib into_raw_record_iter", |b| {
b.iter(|| {
let mut reader = black_box(&rib_dump[..]);

BgpkitParser::from_reader(&mut reader)
.into_raw_record_iter()
.take(RECORD_LIMIT)
.for_each(|x| {
black_box(x);
});
})
});
}

criterion_group! {
Expand Down
111 changes: 80 additions & 31 deletions src/parser/mrt/mrt_header.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,71 @@
use crate::models::{CommonHeader, EntryType};
use crate::ParserError;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use bytes::Bytes;
use std::io::Read;
use zerocopy::big_endian::{U16, U32};
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};

/// On-wire MRT common header layout (12 bytes, network byte order).
#[derive(IntoBytes, FromBytes, KnownLayout, Immutable)]
#[repr(C)]
struct RawMrtCommonHeader {
timestamp: U32,
entry_type: U16,
entry_subtype: U16,
length: U32,
}

const _: () = assert!(size_of::<RawMrtCommonHeader>() == 12);

Comment on lines +18 to +19
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size_of is used in the const assertions but isn’t imported or fully qualified, which will fail to compile. Import core::mem::size_of/std::mem::size_of (or change these calls to core::mem::size_of::<...>()).

Copilot uses AI. Check for mistakes.
/// On-wire MRT header with microseconds included (16 bytes, network byte order)
#[derive(IntoBytes, FromBytes, KnownLayout, Immutable)]
#[repr(C)]
struct RawMrtEtCommonHeader {
timestamp: U32,
entry_type: U16,
entry_subtype: U16,
length: U32,
microseconds: U32,
}

const _: () = assert!(size_of::<RawMrtEtCommonHeader>() == 16);

enum RawMrtHeader {
Standard(RawMrtCommonHeader),
Et(RawMrtEtCommonHeader),
}

impl From<&CommonHeader> for RawMrtHeader {
fn from(header: &CommonHeader) -> Self {
match header.microsecond_timestamp {
None => RawMrtHeader::Standard(RawMrtCommonHeader {
timestamp: U32::new(header.timestamp),
entry_type: U16::new(header.entry_type as u16),
entry_subtype: U16::new(header.entry_subtype),
length: U32::new(header.length),
}),
Some(microseconds) => RawMrtHeader::Et(RawMrtEtCommonHeader {
timestamp: U32::new(header.timestamp),
entry_type: U16::new(header.entry_type as u16),
entry_subtype: U16::new(header.entry_subtype),
// Internally, we use the length of the MRT payload.
// However in the header, the length includes the space used by the extra timestamp
// data.
length: U32::new(header.length + 4),
microseconds: U32::new(microseconds),
}),
}
}
}

impl RawMrtHeader {
fn as_bytes(&self) -> &[u8] {
match self {
RawMrtHeader::Standard(raw) => raw.as_bytes(),
RawMrtHeader::Et(raw) => raw.as_bytes(),
}
}
}

/// Result of parsing a common header, including the raw bytes.
pub struct ParsedHeader {
Expand Down Expand Up @@ -56,14 +120,16 @@ pub fn parse_common_header<T: Read>(input: &mut T) -> Result<CommonHeader, Parse
pub fn parse_common_header_with_bytes<T: Read>(input: &mut T) -> Result<ParsedHeader, ParserError> {
let mut base_bytes = [0u8; 12];
input.read_exact(&mut base_bytes)?;
let mut data = &base_bytes[..];

let timestamp = data.get_u32();
let entry_type_raw = data.get_u16();
let entry_type = EntryType::try_from(entry_type_raw)?;
let entry_subtype = data.get_u16();
// Single bounds check via zerocopy instead of four sequential cursor reads.
let raw = RawMrtCommonHeader::ref_from_bytes(&base_bytes)
.expect("base_bytes is exactly 12 bytes with no alignment requirement");

let timestamp = raw.timestamp.get();
let entry_type = EntryType::try_from(raw.entry_type.get())?;
let entry_subtype = raw.entry_subtype.get();
// the length field does not include the length of the common header
let mut length = data.get_u32();
let mut length = raw.length.get();

let (microsecond_timestamp, raw_bytes) = match &entry_type {
EntryType::BGP4MP_ET => {
Expand All @@ -76,15 +142,11 @@ pub fn parse_common_header_with_bytes<T: Read>(input: &mut T) -> Result<ParsedHe
));
}
length -= 4;
let mut et_bytes = [0u8; 4];
input.read_exact(&mut et_bytes)?;
let microseconds = (&et_bytes[..]).get_u32();

// Combine base header bytes + ET bytes
let mut combined = BytesMut::with_capacity(16);
combined.put_slice(&base_bytes);
combined.put_slice(&et_bytes);
(Some(microseconds), combined.freeze())
let mut combined = [0u8; 16];
combined[..12].copy_from_slice(&base_bytes);
input.read_exact(&mut combined[12..])?;
let microseconds = u32::from_be_bytes(combined[12..16].try_into().unwrap());
(Some(microseconds), Bytes::copy_from_slice(&combined))
}
_ => (None, Bytes::copy_from_slice(&base_bytes)),
};
Expand All @@ -103,21 +165,8 @@ pub fn parse_common_header_with_bytes<T: Read>(input: &mut T) -> Result<ParsedHe

impl CommonHeader {
pub fn encode(&self) -> Bytes {
let mut bytes = BytesMut::new();
bytes.put_slice(&self.timestamp.to_be_bytes());
bytes.put_u16(self.entry_type as u16);
bytes.put_u16(self.entry_subtype);

match self.microsecond_timestamp {
None => bytes.put_u32(self.length),
Some(microseconds) => {
// When the microsecond timestamp is present, the length must be adjusted to account
// for the stace used by the extra timestamp data.
bytes.put_u32(self.length + 4);
bytes.put_u32(microseconds);
}
};
bytes.freeze()
let raw = RawMrtHeader::from(self);
Bytes::copy_from_slice(raw.as_bytes())
}
}

Expand Down