From a08d1091a585e65e74da1ce4a70bf427fbe6d6cd Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Wed, 4 Mar 2026 16:49:28 +0400 Subject: [PATCH] chore: Move flow expiration from PQ to timers Signed-off-by: Sergey Matov --- Cargo.lock | 13 +- dataplane/src/packet_processor/mod.rs | 6 +- flow-entry/Cargo.toml | 4 +- flow-entry/src/flow_table/README.md | 49 +- flow-entry/src/flow_table/display.rs | 6 +- flow-entry/src/flow_table/mod.rs | 5 +- flow-entry/src/flow_table/nf_expirations.rs | 142 ------ flow-entry/src/flow_table/nf_lookup.rs | 44 +- flow-entry/src/flow_table/table.rs | 445 +++++++++++-------- flow-entry/src/flow_table/thread_local_pq.rs | 327 -------------- nat/src/portfw/test.rs | 11 +- 11 files changed, 305 insertions(+), 747 deletions(-) delete mode 100644 flow-entry/src/flow_table/nf_expirations.rs delete mode 100644 flow-entry/src/flow_table/thread_local_pq.rs diff --git a/Cargo.lock b/Cargo.lock index d882a5173..4e0d89e29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1354,10 +1354,9 @@ dependencies = [ "dataplane-tracectl", "etherparse", "linkme", - "priority-queue", "shuttle", "thiserror 2.0.18", - "thread_local", + "tokio", "tracing", "tracing-test", ] @@ -4349,16 +4348,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "priority-queue" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93980406f12d9f8140ed5abe7155acb10bb1e69ea55c88960b9c2f117445ef96" -dependencies = [ - "equivalent", - "indexmap 2.13.0", -] - [[package]] name = "proc-macro-crate" version = "2.0.0" diff --git a/dataplane/src/packet_processor/mod.rs b/dataplane/src/packet_processor/mod.rs index b318e2cb8..52d7e6ba7 100644 --- a/dataplane/src/packet_processor/mod.rs +++ b/dataplane/src/packet_processor/mod.rs @@ -12,7 +12,7 @@ use super::packet_processor::ipforward::IpForwarder; use concurrency::sync::Arc; -use flow_entry::flow_table::{ExpirationsNF, FlowLookup, FlowTable}; +use flow_entry::flow_table::{FlowLookup, FlowTable}; use flow_filter::{FlowFilter, FlowFilterTableWriter}; use nat::portfw::{PortForwarder, PortFwTableWriter}; @@ -90,7 +90,6 @@ pub(crate) fn start_router( let stats_stage = Stats::new("stats", writer.clone()); let flow_filter = FlowFilter::new("flow-filter", flowfiltertablesr_factory.handle()); let flow_lookup = FlowLookup::new("flow-lookup", flow_table.clone()); - let flow_expirations_nf = ExpirationsNF::new(flow_table.clone()); let portfw = PortForwarder::new( "port-forwarder", portfw_factory.handle(), @@ -98,7 +97,7 @@ pub(crate) fn start_router( ); // Build the pipeline for a router. The composition of the pipeline (in stages) is currently - // hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last + // hard-coded. Flow expiration is handled by per-flow tokio timers; no ExpirationsNF needed. DynPipeline::new() .add_stage(stage_ingress) .add_stage(iprouter1) @@ -109,7 +108,6 @@ pub(crate) fn start_router( .add_stage(stateful_nat) .add_stage(iprouter2) .add_stage(stage_egress) - .add_stage(flow_expirations_nf) .add_stage(pktdump) .add_stage(stats_stage) }; diff --git a/flow-entry/Cargo.toml b/flow-entry/Cargo.toml index 570d953fd..22ddd9699 100644 --- a/flow-entry/Cargo.toml +++ b/flow-entry/Cargo.toml @@ -20,14 +20,14 @@ etherparse = { workspace = true } linkme = { workspace = true } net = { workspace = true } pipeline = { workspace = true } -priority-queue = { workspace = true } thiserror = { workspace = true } -thread_local = { workspace = true } +tokio = { workspace = true, features = ["rt", "time"] } tracectl = { workspace = true } tracing = { workspace = true } [dev-dependencies] bolero = { workspace = true, default-features = false } net = { workspace = true, features = ["bolero"] } +tokio = { workspace = true, features = ["macros", "rt", "time"] } tracing-test = { workspace = true, features = [] } shuttle = { workspace = true } diff --git a/flow-entry/src/flow_table/README.md b/flow-entry/src/flow_table/README.md index 5637b050e..9452545fb 100644 --- a/flow-entry/src/flow_table/README.md +++ b/flow-entry/src/flow_table/README.md @@ -1,28 +1,31 @@ # Flow Table -The current implementation of flow table uses `dash_map` and per-thread priority queue's (for timeouts) along with `Arc` and `Weak` to get a reasonable flow table with timeouts. -However, it leaves a lot of room for optimizations. +The flow table uses `DashMap` for concurrent key→value storage and per-flow +tokio timers for expiration. ## Flow Table Implementation -The main `DashMap` holds `Weak` references to all the flow entries so that the memory gets automatically deallocated when the entry times out. - -The priority queue's hold `Arc` references to the flow entries to keep them alive when they are not in any packet meta data. -When the entry times-out and is removed from the priority queue and the last packet referencing that flow is dropped, the memory for the entry is freed. - -Note that in the current implementation, a flow is not removed from the flow table until the last Arc to the flow_info is dropped or the flow entry is replaced. This can be changed if needed, or even have it be an option on the flow as to whether timeout removes the flow or not. - -## Optimizations - -In the current implementation, there has to be periodic or on-timeout reaping the Weak reference in the hash table. -This is better done by having a version of `DashMap` that can reap the dead `Weak` reference as it walks the table on lookups, instead of waiting for key collisions. -The hope, for now, is that the entries in the hash table array will contain a small pointer and not take up too much extra memory. -Those dead `Weak` pointers will prevent shrinking of the hash table though, if the implementation supports that. - -Second, the `priority_queue` crate uses a `HashMap` inside the queue in order to allow fast removal and re-insertion. -However, this wastes space and requires extra hashes. -The better way to do this is to have a custom priority queue integrated with the custom weak-reaping hash map so that the same hash table can be used for both operations. -This improves cache locality, reduces memory utlization, and avoids multiple hash table lookups in many cases. - -However, in the interest of time to completion for the code, this module currently uses existing data structures instead of full custom implementations of everything. -However, the interface should be able to hide a change from the current to the optimized implementation. +The `DashMap` stores `Arc` values directly, so the table is the +primary strong-reference keeper for each flow entry. Callers that need to +retain a flow (e.g. pipeline stages that tag packets) clone the `Arc` out of +`lookup()`. + +When a flow is inserted, a `tokio::task` is spawned (if a tokio runtime is +present) that sleeps until the flow's deadline and then calls +`update_status(FlowStatus::Expired)`. The DashMap entry is not removed by the +timer; instead, `lookup()` performs lazy cleanup: if a looked-up entry is +`Expired` or `Cancelled` it is removed from the map and `None` is returned. +The same lazy path covers the case where a deadline passes without a timer +firing (e.g. in non-tokio test contexts). + +`FlowInfo::related` holds a `Weak` pointing to the reverse-direction +flow of a bidirectional pair. This avoids reference cycles: the two entries +are independent `Arc`s in the table, and the `Weak` merely lets one side +observe the other without keeping it alive. + +## Non-tokio contexts (shuttle / sync tests) + +When no tokio runtime is present `Handle::try_current()` returns `Err` and no +timer is spawned. Tests simulate expiration by calling +`flow_info.update_status(FlowStatus::Expired)` directly, after which the next +`lookup()` call performs the lazy removal. diff --git a/flow-entry/src/flow_table/display.rs b/flow-entry/src/flow_table/display.rs index 0937af90d..cf88a87b2 100644 --- a/flow-entry/src/flow_table/display.rs +++ b/flow-entry/src/flow_table/display.rs @@ -23,10 +23,8 @@ impl Display for FlowTable { Heading(format!("Flow Table ({} entries)", table.len())).fmt(f)?; for entry in table.iter() { let key = entry.key(); - match entry.value().upgrade() { - Some(value) => writeln!(f, "key = {key}\ndata = {value}")?, - None => writeln!(f, "key = {key} NONE")?, - } + let value = entry.value(); + writeln!(f, "key = {key}\ndata = {value}")?; } } else { write!(f, "Failed to lock flow table")?; diff --git a/flow-entry/src/flow_table/mod.rs b/flow-entry/src/flow_table/mod.rs index 58a23a9f0..e92f447e1 100644 --- a/flow-entry/src/flow_table/mod.rs +++ b/flow-entry/src/flow_table/mod.rs @@ -2,17 +2,14 @@ // Copyright Open Network Fabric Authors mod display; -pub mod nf_expirations; pub mod nf_lookup; pub mod table; -mod thread_local_pq; +pub use nf_lookup::FlowLookup; pub use table::FlowTable; pub use net::flows::atomic_instant::AtomicInstant; pub use net::flows::flow_info::*; -pub use nf_expirations::ExpirationsNF; -pub use nf_lookup::FlowLookup; use tracectl::trace_target; trace_target!("flow-table", LevelFilter::INFO, &["pipeline"]); diff --git a/flow-entry/src/flow_table/nf_expirations.rs b/flow-entry/src/flow_table/nf_expirations.rs deleted file mode 100644 index c0d95103a..000000000 --- a/flow-entry/src/flow_table/nf_expirations.rs +++ /dev/null @@ -1,142 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Open Network Fabric Authors - -//! Network Function specific flow table. - -use concurrency::sync::Arc; -use net::buffer::PacketBufferMut; -use net::packet::Packet; -use pipeline::NetworkFunction; - -use crate::flow_table::FlowTable; - -use tracectl::trace_target; -trace_target!("flow-expiration", LevelFilter::INFO, &["pipeline"]); - -/// Network Function that reap expired entries from the flow table for the current thread. -/// -/// Note: This only reaps expired entries on the priority queue for the current thread. -/// It does not reap expired entries on other threads. -/// -/// This stage should be run after all other pipeline stages to reap any expired entries. -pub struct ExpirationsNF { - flow_table: Arc, -} - -impl ExpirationsNF { - pub fn new(flow_table: Arc) -> Self { - Self { flow_table } - } -} - -impl NetworkFunction for ExpirationsNF { - fn process<'a, Input: Iterator> + 'a>( - &'a mut self, - input: Input, - ) -> impl Iterator> + 'a { - self.flow_table.reap_expired(); - input - } -} - -#[cfg(test)] -mod test { - use net::buffer::TestBuffer; - use net::flows::FlowInfo; - use net::ip::UnicastIpAddr; - use net::packet::Packet; - use net::packet::VpcDiscriminant; - use net::tcp::TcpPort; - use net::vxlan::Vni; - use pipeline::NetworkFunction; - use std::net::IpAddr; - use std::sync::Arc; - use std::time::{Duration, Instant}; - - use crate::flow_table::FlowTable; - use crate::flow_table::nf_expirations::ExpirationsNF; - use net::{FlowKey, IpProtoKey, TcpProtoKey}; - - #[test] - fn test_expirations_nf() { - let flow_table = Arc::new(FlowTable::default()); - let mut expirations_nf = ExpirationsNF::new(flow_table.clone()); - let src_vpcd = VpcDiscriminant::VNI(Vni::new_checked(100).unwrap()); - let src_ip = "1.2.3.4".parse::().unwrap(); - let dst_ip = "5.6.7.8".parse::().unwrap(); - let src_port = TcpPort::new_checked(1025).unwrap(); - let dst_port = TcpPort::new_checked(2048).unwrap(); - - let flow_key = FlowKey::uni( - Some(src_vpcd), - src_ip.into(), - dst_ip, - IpProtoKey::Tcp(TcpProtoKey { src_port, dst_port }), - ); - - // Insert an already expired flow entry and check that entry is there by looking it up - let flow_info = FlowInfo::new(Instant::now().checked_sub(Duration::from_secs(10)).unwrap()); - flow_table.insert(flow_key, flow_info); - assert!(flow_table.lookup(&flow_key).is_some()); - - // call process() on the NF (no packet is actually needed). NF should expire entry - let _output_iter = expirations_nf.process(std::iter::empty::>()); - assert!(flow_table.lookup(&flow_key).is_none()); - } - - #[test] - fn test_aggressive_expiration() { - const REAP_THRESHOLD_TEST: usize = 10_000; // must be < 64K for testing - - let mut flow_table = FlowTable::default(); - - // set the aggressive reap threshold - flow_table.set_reap_threshold(REAP_THRESHOLD_TEST); - - let flow_table = Arc::from(flow_table); - let mut expirations_nf = ExpirationsNF::new(flow_table.clone()); - let src_vpcd = VpcDiscriminant::VNI(Vni::new_checked(100).unwrap()); - let src_ip = "1.2.3.4".parse::().unwrap(); - let dst_ip = "5.6.7.8".parse::().unwrap(); - - // create REAP_THRESHOLD_TEST + 100 flows - for src_port in 1..=REAP_THRESHOLD_TEST + 100 { - #[allow(clippy::cast_possible_truncation)] - let src_port = TcpPort::new_checked(src_port as u16).unwrap(); - let dst_port = TcpPort::new_checked(100).unwrap(); - let flow_key = FlowKey::uni( - Some(src_vpcd), - src_ip.into(), - dst_ip, - IpProtoKey::Tcp(TcpProtoKey { src_port, dst_port }), - ); - let flow_info = - FlowInfo::new(Instant::now().checked_add(Duration::from_mins(60)).unwrap()); - flow_table.insert(flow_key, flow_info); - } - // check we inserted more flows than the threshold - assert!(flow_table.len().unwrap() > REAP_THRESHOLD_TEST); - - // expire: no flow should be reaped because all are Active - let _: Vec<_> = expirations_nf - .process(std::iter::empty::>()) - .collect(); - assert!(flow_table.len().unwrap() > REAP_THRESHOLD_TEST); - - // pretend that all flows -but one- get Cancelled - for (num, flow_info) in flow_table.table.read().unwrap().iter().enumerate() { - if num != 13 { - flow_info - .upgrade() - .unwrap() - .update_status(net::flows::FlowStatus::Cancelled); - } - } - - // reap again, only one flow should be there - let _: Vec<_> = expirations_nf - .process(std::iter::empty::>()) - .collect(); - assert_eq!(flow_table.len().unwrap(), 1); - } -} diff --git a/flow-entry/src/flow_table/nf_lookup.rs b/flow-entry/src/flow_table/nf_lookup.rs index 485408e94..375686985 100644 --- a/flow-entry/src/flow_table/nf_lookup.rs +++ b/flow-entry/src/flow_table/nf_lookup.rs @@ -77,7 +77,6 @@ mod test { use std::time::{Duration, Instant}; use tracing_test::traced_test; - use crate::flow_table::ExpirationsNF; use crate::flow_table::FlowTable; use crate::flow_table::nf_lookup::FlowLookup; @@ -111,7 +110,7 @@ mod test { assert!(output.meta().flow_info.is_some()); } - // A dummy NF that creates a flow entry for each packet, with a lifetime of 2 seconds + // A dummy NF that creates a flow entry for each packet, with a configurable lifetime struct FlowInfoCreator { flow_table: Arc, timeout: Duration, @@ -139,16 +138,14 @@ mod test { } #[traced_test] - #[test] - fn test_lookup_nf_with_expiration_nf() { + #[tokio::test] + async fn test_lookup_nf_with_expiration() { let flow_table = Arc::new(FlowTable::default()); let lookup_nf = FlowLookup::new("lookup_nf", flow_table.clone()); let flowinfo_creator = FlowInfoCreator::new(flow_table.clone(), Duration::from_secs(1)); - let expirations_nf = ExpirationsNF::new(flow_table.clone()); let mut pipeline: DynPipeline = DynPipeline::new() .add_stage(lookup_nf) - .add_stage(flowinfo_creator) - .add_stage(expirations_nf); + .add_stage(flowinfo_creator); const NUM_PACKETS: u16 = 1000; @@ -161,29 +158,23 @@ mod test { // process the NUM_PACKETS let packets_out = pipeline.process(packets_in.clone()); assert_eq!(packets_out.count(), NUM_PACKETS as usize); - let num_entries = flow_table.len().unwrap(); + let num_entries = flow_table.active_len().unwrap(); assert_eq!(num_entries, NUM_PACKETS as usize); - // wait twice as much as entry lifetimes. All flow entries should be gone after this. - std::thread::sleep(std::time::Duration::from_secs(2)); - pipeline - .process(std::iter::empty::>()) - .count(); + // wait twice as much as entry lifetimes — all flow timers should have fired by now + tokio::time::sleep(Duration::from_secs(2)).await; - // Entries are all gone - let num_entries = flow_table.len().unwrap(); + // All flows are now expired (marked by per-flow tokio timers) + let num_entries = flow_table.active_len().unwrap(); assert_eq!(num_entries, 0); } //#[traced_test] - #[test] - fn test_lookups_with_related_flows() { + #[tokio::test] + async fn test_lookups_with_related_flows() { let flow_table = Arc::new(FlowTable::default()); let lookup_nf = FlowLookup::new("lookup_nf", flow_table.clone()); - let expirations_nf = ExpirationsNF::new(flow_table.clone()); - let mut pipeline: DynPipeline = DynPipeline::new() - .add_stage(lookup_nf) - .add_stage(expirations_nf); + let mut pipeline: DynPipeline = DynPipeline::new().add_stage(lookup_nf); { let mut packet_1 = build_test_udp_ipv4_packet("10.0.0.1", "20.0.0.1", 80, 500); @@ -231,16 +222,13 @@ mod test { assert!(Arc::ptr_eq(&related_1, flow_2_pkt)); assert!(Arc::ptr_eq(&related_2, &flow_1)); assert!(Arc::ptr_eq(&related_2, flow_1_pkt)); - assert_eq!(flow_table.len().unwrap(), 2); + assert_eq!(flow_table.active_len().unwrap(), 2); } - // wait 3 secs. Flow 1 should have been removed - std::thread::sleep(Duration::from_secs(3)); - pipeline - .process(std::iter::empty::>()) - .count(); + // wait 3 secs. Flow 1 should have been expired by its tokio timer + tokio::time::sleep(Duration::from_secs(3)).await; - assert_eq!(flow_table.len().unwrap(), 1); + assert_eq!(flow_table.active_len().unwrap(), 1); // build identical packets and process them again let mut packet_1 = build_test_udp_ipv4_packet("10.0.0.1", "20.0.0.1", 80, 500); diff --git a/flow-entry/src/flow_table/table.rs b/flow-entry/src/flow_table/table.rs index 6bdc98eca..8c1716b06 100644 --- a/flow-entry/src/flow_table/table.rs +++ b/flow-entry/src/flow_table/table.rs @@ -8,11 +8,10 @@ use std::borrow::Borrow; use std::fmt::Debug; use std::hash::Hash; use std::time::Instant; -use tracing::debug; +use tracing::{debug, warn}; -use concurrency::sync::{Arc, RwLock, RwLockReadGuard, Weak}; +use concurrency::sync::{Arc, RwLock, RwLockReadGuard}; -use crate::flow_table::thread_local_pq::{PQAction, ThreadLocalPriorityQueue}; use net::flows::{FlowInfo, FlowStatus}; #[derive(Debug, thiserror::Error)] @@ -21,14 +20,13 @@ pub enum FlowTableError { InvalidShardCount(usize), } -type PriorityQueue = ThreadLocalPriorityQueue>; -type Table = DashMap, RandomState>; +type Table = DashMap, RandomState>; #[derive(Debug)] pub struct FlowTable { // TODO(mvachhar) move this to a cross beam sharded lock pub(crate) table: RwLock, - pub(crate) priority_queue: PriorityQueue, + reap_threshold: usize, } impl Default for FlowTable { @@ -44,6 +42,11 @@ fn hasher_state() -> &'static RandomState { } impl FlowTable { + /// When the raw `DashMap` entry count exceeds this threshold, `insert_common` will + /// proactively purge all stale (Expired / Cancelled / deadline-passed) entries to + /// prevent unbounded memory growth. + pub const AGGRESSIVE_REAP_THRESHOLD: usize = 1_000_000; + #[must_use] pub fn new(num_shards: usize) -> Self { Self { @@ -51,12 +54,12 @@ impl FlowTable { hasher_state().clone(), num_shards, )), - priority_queue: PriorityQueue::new(None), + reap_threshold: Self::AGGRESSIVE_REAP_THRESHOLD, } } pub fn set_reap_threshold(&mut self, reap_threshold: usize) { - self.priority_queue.set_reap_threshold(reap_threshold); + self.reap_threshold = reap_threshold; } /// Reshard the flow table into the given number of shards. @@ -171,17 +174,73 @@ impl FlowTable { fn insert_common(&self, flow_key: FlowKey, val: &Arc) -> Option> { let table = self.table.read().unwrap(); - let expires_at = val.expires_at(); - let result = table.insert(flow_key, Arc::downgrade(val)); - self.priority_queue.push(flow_key, val.clone(), expires_at); - let ret = match result { - Some(w) => w.upgrade(), - None => None, - }; - - let Some(ret) = ret else { - return ret; - }; + let result = table.insert(flow_key, val.clone()); + + // Proactively purge stale entries when the raw table size exceeds the threshold. + // This bounds memory growth when flows expire faster than they are looked up, + // since expired entries otherwise accumulate in the `DashMap` until a lookup hits them. + let raw_len = table.len(); + if raw_len > self.reap_threshold { + warn!( + "The number of flows ({raw_len}) exceeds {}. Reaping stale entries...", + self.reap_threshold + ); + Self::drain_stale_with_read_lock(&table); + } + + // Spawn a per-flow expiration timer when running inside a tokio runtime. + // The timer marks the flow as Expired; the `DashMap` entry is cleaned up + // lazily the next time lookup() is called for this key. + // In non-tokio contexts (shuttle tests, sync unit tests) the guard fails + // gracefully and lazy time-checking in `lookup` handles expiration instead. + // + // Only spawn a timer for a genuinely new Arc. If the same Arc is being + // reinserted (e.g. via reinsert()), its existing timer loop already handles + // extended deadlines via the `new_deadline > deadline` re-check, so spawning + // a second task would be redundant and would cause unbounded task growth. + // + // The timer holds a Weak rather than Arc and drops the + // upgrade before sleeping, so the timer task does not extend the lifetime of + // the FlowInfo allocation. Once the DashMap entry is removed (drain_stale, + // lookup lazy cleanup, or explicit remove) and all other callers drop their + // Arc clones, the allocation is freed even if the timer has not yet woken up. + // The status check after each sleep avoids redundant work for flows that were + // already Cancelled before their deadline elapsed. + let need_timer = result.as_ref().is_none_or(|old| !Arc::ptr_eq(old, val)); + if need_timer && tokio::runtime::Handle::try_current().is_ok() { + let fi_weak = Arc::downgrade(val); + tokio::task::spawn(async move { + loop { + // Upgrade to check status and read the deadline. If the Arc has + // already been dropped (no DashMap entry, no in-flight holders), + // there is nothing left to expire. + let Some(fi) = fi_weak.upgrade() else { break }; + if fi.status() != FlowStatus::Active { + // Already Cancelled or Expired by another path; nothing to do. + break; + } + let deadline = fi.expires_at(); + // Drop the strong ref before sleeping so this task does not + // prevent the FlowInfo allocation from being freed. + drop(fi); + tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)).await; + // Re-acquire after sleeping and re-check before committing. + let Some(fi) = fi_weak.upgrade() else { break }; + if fi.status() != FlowStatus::Active { + break; + } + let new_deadline = fi.expires_at(); + if new_deadline > deadline { + // Deadline was extended (e.g. by StatefulNat); sleep again. + continue; + } + fi.update_status(FlowStatus::Expired); + break; + } + }); + } + + let ret = result?; if ret.status() == FlowStatus::Expired { return None; @@ -192,6 +251,11 @@ impl FlowTable { /// Lookup a flow in the table. /// + /// Performs lazy time-based expiration: if the matched entry is still + /// `Active` but its deadline has passed (e.g. because the tokio timer has + /// not yet fired, or no tokio runtime is present), the entry is marked + /// `Expired` and removed here. + /// /// # Panics /// /// Panics if this thread already holds the read lock on the table or @@ -203,18 +267,22 @@ impl FlowTable { { debug!("lookup: Looking up flow key {:?}", flow_key); let table = self.table.read().unwrap(); - let item = table.get(flow_key)?.upgrade(); - let Some(item) = item else { - debug!( - "lookup: Removing flow key {:?}, found empty weak reference", - flow_key - ); - Self::remove_with_read_lock(&table, flow_key); - return None; - }; + let item = table.get(flow_key)?.value().clone(); let status = item.status(); match status { - FlowStatus::Active => Some(item), + FlowStatus::Active => { + // Lazy expiration: cover non-tokio contexts and timer scheduling lag. + if item.expires_at() <= Instant::now() { + debug!( + "lookup: Flow key {:?} has passed its deadline, expiring", + flow_key + ); + item.update_status(FlowStatus::Expired); + Self::remove_with_read_lock(&table, flow_key); + return None; + } + Some(item) + } FlowStatus::Expired | FlowStatus::Cancelled => { debug!("lookup: Flow key {:?} is '{status}', removing", flow_key); Self::remove_with_read_lock(&table, flow_key); @@ -240,115 +308,79 @@ impl FlowTable { } fn remove_with_read_lock( - table: &RwLockReadGuard, RandomState>>, + table: &RwLockReadGuard, RandomState>>, flow_key: &Q, ) -> Option<(FlowKey, Arc)> where FlowKey: Borrow, Q: Hash + Eq + ?Sized + Debug, { - let result = table.remove(flow_key); - let (k, w) = result?; - let old_val = w.upgrade()?; - if old_val.status() == FlowStatus::Expired { + let (k, v) = table.remove(flow_key)?; + if v.status() == FlowStatus::Expired { return None; } - Some((k, old_val)) - } - - fn decide_expiry(now: &Instant, k: &FlowKey, v: &Arc) -> PQAction { - // Note(mvachhar) - // - //I'm not sure if marking the entry as expired is worthwhile here - // nor am I sure of the performance cost of doing this. - // It isn't strictly needed, though it means other holders of the Arc may - // be able to read stale data and wouldn't know the entry is expired. - // - // If the common case is that the entry has no other references here, - // then this operation should be cheap, though not free due to the - // dereference of the value and the lock acquisition. - let expires_at = v.expires_at(); - if now >= &expires_at { - debug!("decide_expiry: Reap for flow key {k:?} with expires_at {expires_at:?}"); - PQAction::Reap - } else if v.status() == FlowStatus::Cancelled { - debug!("decide_expiry: Cancel for flow key {k:?}, which was cancelled"); - PQAction::Cancel - } else { - debug!("decide_expiry: Update for flow key {k:?} with time {expires_at:?}"); - PQAction::Update(expires_at) - } - } - - // Pass by value here since the PQ doesn't know the value is an Arc - // and we get ownership of the value here - #[allow(clippy::needless_pass_by_value)] - fn do_reap(k: &FlowKey, v: Arc) { - v.update_status(FlowStatus::Expired); - debug!("do_reap: Updated flow status for {k:?} to expired"); - } - - /// Remove all of the flow entries for the provided `FlowKey`s, returning the number of - /// entries removed - /// - /// # Panics - /// - /// Panics if this thread already holds the read lock on the table or - /// if the table lock is poisoned. - fn remove_flow_entries(&self, reaped_keys: &Vec) -> usize { - let num_keys = reaped_keys.len(); - let mut removed = 0; - let table = self.table.read().unwrap(); - for flow_key in reaped_keys { - if let Some((_key, _flow_info)) = table.remove(flow_key) { - removed += 1; - } - } - debug!("Removed {removed} flow-entries out of {num_keys} keys"); - num_keys + Some((k, v)) } - /// Reap expired entries from the priority queue for the current thread. + /// Remove all stale entries from the table (entries that are `Expired`, `Cancelled`, or + /// whose deadline has already passed). /// - /// # Thread Safety - /// - /// This method is thread-safe but should not be called if the current thread is - /// holding a lock on any element in the flow table. + /// Returns the number of entries removed. /// /// # Panics /// - /// Panics if any lock acquired by this method is poisoned. - pub fn reap_expired(&self) -> usize { - let reaped_keys = self - .priority_queue - .reap_expired(Self::decide_expiry, Self::do_reap); - self.remove_flow_entries(&reaped_keys) - } - - pub fn reap_all_expired(&self) -> usize { - let reaped_keys = self - .priority_queue - .reap_all_expired(Self::decide_expiry, Self::do_reap); - self.remove_flow_entries(&reaped_keys) + /// Panics if this thread already holds the read lock on the table or if the lock is poisoned. + pub fn drain_stale(&self) -> usize { + let table = self.table.read().unwrap(); + Self::drain_stale_with_read_lock(&table) } - #[cfg(all(test, feature = "shuttle"))] - pub fn reap_all_expired_with_time(&self, time: &Instant) -> usize { - let reaped_keys = self.priority_queue.reap_all_expired_with_time( - time, - Self::decide_expiry, - Self::do_reap, - ); - self.remove_flow_entries(&reaped_keys) + fn drain_stale_with_read_lock( + table: &RwLockReadGuard, RandomState>>, + ) -> usize { + let now = Instant::now(); + let to_remove: Vec = table + .iter() + .filter_map(|entry| { + let val = entry.value(); + match val.status() { + FlowStatus::Expired | FlowStatus::Cancelled => Some(*entry.key()), + FlowStatus::Active if val.expires_at() <= now => { + // Deadline passed but the tokio timer has not fired yet; mark and remove. + val.update_status(FlowStatus::Expired); + Some(*entry.key()) + } + FlowStatus::Active => None, + } + }) + .collect(); + let removed = to_remove.len(); + for key in &to_remove { + table.remove(key); + } + debug!("drain_stale: Removed {removed} stale flows"); + removed } #[allow(clippy::len_without_is_empty)] - /// Tell how many flows are in the table if it can be locked - /// This is mostly for testing + /// Returns the total number of entries physically stored in the table, regardless of + /// their expiration status. This is mostly for testing. pub fn len(&self) -> Option { let table = self.table.try_read().ok()?; Some(table.len()) } + + /// Returns the number of *active* (non-expired, non-cancelled) flows in the table. + /// This is mostly for testing. + pub fn active_len(&self) -> Option { + let table = self.table.try_read().ok()?; + Some( + table + .iter() + .filter(|e| e.value().status() == FlowStatus::Active) + .count(), + ) + } } #[cfg(test)] @@ -413,20 +445,16 @@ mod tests { let flow_info = FlowInfo::new(now + two_seconds); flow_table.insert(flow_key, flow_info); - // Wait 1 second, should still be present + // Wait 1 second — flow not yet expired, lazy lookup should return Some. thread::sleep(one_second); - // Reap expired entries after 1 second (should not reap our entry) - flow_table.reap_expired(); assert!( flow_table.lookup(&flow_key).is_some(), "Flow key should still be present after 1 second" ); - // Wait another 2 seconds (total 3s), should be expired + // Wait another 2 seconds (total 3s) — flow expired. + // Lazy expiration in lookup cleans it up. thread::sleep(two_seconds); - // Reap expired entries - flow_table.reap_expired(); - assert!( flow_table.lookup(&flow_key).is_none(), "Flow key should have expired and been removed" @@ -434,7 +462,7 @@ mod tests { } #[test] - fn test_flow_table_weak_ref_replaced_on_insert() { + fn test_flow_table_entry_replaced_on_insert() { let now = Instant::now(); let first_expiry_time = now + Duration::from_secs(5); let second_expiry_time = now + Duration::from_secs(10); @@ -450,51 +478,32 @@ mod tests { }), )); - // Insert first entry - let first_flow_info = FlowInfo::new(first_expiry_time); - let first_flow_info_arc = Arc::new(first_flow_info); - let weak_flow_info_reference = Arc::downgrade(&first_flow_info_arc); - flow_table.insert_from_arc(flow_key, &first_flow_info_arc); - drop(first_flow_info_arc); + // Insert first entry. + let first_arc = Arc::new(FlowInfo::new(first_expiry_time)); + flow_table.insert_from_arc(flow_key, &first_arc); - // The weak reference stored in the table should still resolve + // The entry stored in the table should be the first arc. { let table = flow_table.table.read().unwrap(); let entry = table .get(&flow_key) .expect("entry should exist after first insert"); - let resolved = entry - .upgrade() - .expect("weak ref should resolve after first insert"); - assert_eq!(resolved.as_ref().expires_at(), first_expiry_time); - } // drops `entry` (shard read lock) and `table` (outer RwLock read guard) - - // The weak reference we kept outside of the table should still resolve, too. Upgrade - // it: we now have two strong references, one from the priority queue and one from the - // upgrade. - let upgrade = weak_flow_info_reference.upgrade(); - assert_eq!(Arc::strong_count(&upgrade.unwrap()), 2); - - // Insert a second entry under the same key but with a different value. - let second_flow_info = FlowInfo::new(second_expiry_time); - flow_table.insert_from_arc(flow_key, &Arc::new(second_flow_info)); - - // The weak reference should now resolve to second_arc, not first_arc. + assert_eq!(entry.value().expires_at(), first_expiry_time); + } + + // Insert a second entry under the same key. + let second_arc = Arc::new(FlowInfo::new(second_expiry_time)); + flow_table.insert_from_arc(flow_key, &second_arc); + + // The table should now point to the second entry. { let table = flow_table.table.read().unwrap(); let entry = table .get(&flow_key) .expect("entry should exist after second insert"); - let resolved = entry - .upgrade() - .expect("weak ref should resolve after second insert"); - assert_ne!(resolved.as_ref().expires_at(), first_expiry_time); - assert_eq!(resolved.as_ref().expires_at(), second_expiry_time); + assert_ne!(entry.value().expires_at(), first_expiry_time); + assert_eq!(entry.value().expires_at(), second_expiry_time); } - - // The strong reference from the priority queue for the first entry has been dropped. - assert_eq!(Weak::strong_count(&weak_flow_info_reference), 0); - assert!(weak_flow_info_reference.upgrade().is_none()); } #[test] @@ -503,21 +512,22 @@ mod tests { bolero::check!() .with_type::() .for_each(|flow_key| { - flow_table.insert(*flow_key, FlowInfo::new(Instant::now())); - let flow_info_str = format!("{:?}", flow_table.lookup(flow_key).unwrap()); - - // We purposely keep the flow alive here to make sure lookup reaps it - let _flow_info = flow_table.lookup(flow_key).unwrap(); + // Insert with a future expiry so early lookups see the flow. + flow_table.insert( + *flow_key, + FlowInfo::new(Instant::now() + Duration::from_secs(60)), + ); + let flow_info = flow_table.lookup(flow_key).unwrap(); assert!(flow_table.lookup(&flow_key.reverse(None)).is_none()); - thread::sleep(Duration::from_millis(100)); - flow_table.reap_all_expired(); + // Simulate expiration (what the tokio timer would do). + flow_info.update_status(FlowStatus::Expired); + // Lazy cleanup on next lookup. let result = flow_table.lookup(flow_key); assert!( result.is_none(), - "flow_key lookup is not none {result:#?}, inserted {flow_info_str}, now: {:?}", - Instant::now() + "expired flow should be removed by lookup, inserted {flow_info:?}" ); }); } @@ -528,7 +538,11 @@ mod tests { bolero::check!() .with_type::() .for_each(|flow_key| { - flow_table.insert(*flow_key, FlowInfo::new(Instant::now())); + // Use a future expiry so the flow stays active long enough for remove(). + flow_table.insert( + *flow_key, + FlowInfo::new(Instant::now() + Duration::from_secs(60)), + ); let flow_info = flow_table.lookup(flow_key).unwrap(); assert!(flow_table.lookup(&flow_key.reverse(None)).is_none()); @@ -540,6 +554,56 @@ mod tests { assert!(flow_table.lookup(flow_key).is_none()); }); } + + #[test] + fn test_aggressive_reap_threshold() { + // Must be small enough to stay within u16 port range (< 65_535). + const REAP_THRESHOLD_TEST: usize = 10_000; + + let mut flow_table = FlowTable::default(); + flow_table.set_reap_threshold(REAP_THRESHOLD_TEST); + + let src_vpcd = VpcDiscriminant::VNI(Vni::new_checked(100).unwrap()); + let src_ip: IpAddr = "1.2.3.4".parse().unwrap(); + let dst_ip: IpAddr = "5.6.7.8".parse().unwrap(); + + // Insert REAP_THRESHOLD_TEST + 100 flows, all Active with a far-future expiry. + for src_port in 1..=REAP_THRESHOLD_TEST + 100 { + #[allow(clippy::cast_possible_truncation)] + let src_port = TcpPort::new_checked(src_port as u16).unwrap(); + let dst_port = TcpPort::new_checked(100).unwrap(); + let flow_key = FlowKey::Unidirectional(FlowKeyData::new( + Some(src_vpcd), + src_ip, + dst_ip, + IpProtoKey::Tcp(TcpProtoKey { src_port, dst_port }), + )); + let flow_info = FlowInfo::new(Instant::now() + Duration::from_secs(3600)); + flow_table.insert(flow_key, flow_info); + } + + // We inserted more flows than the threshold. + assert!(flow_table.active_len().unwrap() > REAP_THRESHOLD_TEST); + + // drain_stale: nothing should be reaped because all are Active with far-future expiry. + let reaped = flow_table.drain_stale(); + assert_eq!(reaped, 0); + assert!(flow_table.active_len().unwrap() > REAP_THRESHOLD_TEST); + + // Mark all flows except the first one as Cancelled. + let mut kept = 0usize; + for entry in flow_table.table.read().unwrap().iter() { + if kept == 0 { + kept += 1; + continue; + } + entry.value().update_status(FlowStatus::Cancelled); + } + + // drain_stale: all Cancelled flows should be purged, leaving exactly 1. + flow_table.drain_stale(); + assert_eq!(flow_table.active_len().unwrap(), 1); + } } #[concurrency_mode(shuttle)] @@ -554,7 +618,6 @@ mod tests { move || { let now = Instant::now(); let two_seconds = Duration::from_secs(2); - let one_second = Duration::from_secs(1); let flow_table = FlowTable::default(); let flow_key = FlowKey::Unidirectional(FlowKeyData::new( @@ -570,21 +633,21 @@ mod tests { let flow_info = FlowInfo::new(now + two_seconds); flow_table.insert(flow_key, flow_info); - // Reap expired entries after 1 second (should not reap our entry) - // Shuttle does not model time, hence this hack - flow_table.reap_all_expired_with_time(&(now + one_second)); + // Flow is active; lookup should return Some. assert!( flow_table.lookup(&flow_key).is_some(), - "Flow key should still be present after 1 second" + "Flow key should be present" ); - // Reap expired entries - // Shuttle does not model time, hence this hack - flow_table.reap_all_expired_with_time(&(now + two_seconds)); + // Simulate timer expiration by marking the flow directly. + if let Some(fi) = flow_table.lookup(&flow_key) { + fi.update_status(FlowStatus::Expired); + } + // Lazy cleanup on next lookup. assert!( flow_table.lookup(&flow_key).is_none(), - "Flow key should have expired and been removed" + "Flow key should be gone after expiration" ); }, 100, @@ -594,7 +657,7 @@ mod tests { #[allow(clippy::too_many_lines)] #[test] #[tracing_test::traced_test] - fn test_flow_table_concurrent_insert_remove_lookup_timeout() { + fn test_flow_table_concurrent_insert_remove_lookup_expire() { const N: usize = 3; let two_seconds = Duration::from_secs(2); @@ -630,15 +693,20 @@ mod tests { let mut flow_info_holder = Some(flow_info); let mut handles = vec![]; + + // "expirer" thread — simulates what the tokio timer would do. handles.push( thread::Builder::new() - .name("timeout_reaper".to_string()) + .name("expirer".to_string()) .spawn({ let flow_table = flow_table.clone(); + let flow_key = flow_keys[0]; move || { for _ in 0..N { thread::yield_now(); - flow_table.reap_expired(); + if let Some(fi) = flow_table.lookup(&flow_key) { + fi.update_status(FlowStatus::Expired); + } } } }) @@ -703,22 +771,11 @@ mod tests { handle.join().unwrap(); } - // Shuttle does not model time so we need this hack - let reap_time = now + two_seconds; - flow_table.reap_all_expired_with_time(&reap_time); - - // After all threads, all keys should be either gone or expired - for key in &flow_keys { - let result = flow_table.lookup(key); - assert!( - result.is_none(), - "Flow key {:#?} should have expired at {:?} and been removed, now at create: {:?}, reap time: {:?}", - *key, - result.unwrap().expires_at(), - now, - reap_time - ); - } + // After all threads, flow[0] should be expired/gone (expirer thread ran). + assert!( + flow_table.lookup(&flow_keys[0]).is_none(), + "Flow key[0] should have been expired" + ); }, 100, ); diff --git a/flow-entry/src/flow_table/thread_local_pq.rs b/flow-entry/src/flow_table/thread_local_pq.rs deleted file mode 100644 index 60664e67d..000000000 --- a/flow-entry/src/flow_table/thread_local_pq.rs +++ /dev/null @@ -1,327 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Open Network Fabric Authors - -use std::cmp::Ordering; -use std::hash::{Hash, Hasher}; -use std::time::Instant; - -use ahash::RandomState; -use concurrency::sync::RwLock; -// Should we just move this to std::collections::BinaryHeap? -// We aren't using the hash table feature right now, though we may want it later. -use priority_queue::PriorityQueue; -use thread_local::ThreadLocal; -use tracing::{debug, warn}; - -use tracectl::trace_target; -trace_target!( - "flow-table-pq", - LevelFilter::INFO, - &["flow-expiration", "pipeline"] -); - -#[repr(transparent)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -struct Priority(Instant); - -impl PartialOrd for Priority { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for Priority { - fn cmp(&self, other: &Self) -> Ordering { - match self.0.cmp(&other.0) { - Ordering::Equal => Ordering::Equal, - Ordering::Less => Ordering::Greater, - Ordering::Greater => Ordering::Less, - } - } -} - -#[derive(Debug, Clone)] -pub(crate) struct Entry -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ - key: K, - value: V, -} - -impl Hash for Entry -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ - fn hash(&self, state: &mut H) { - self.key.hash(state); - } -} - -impl PartialEq for Entry -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ - fn eq(&self, other: &Self) -> bool { - self.key == other.key - } -} - -impl Eq for Entry -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ -} - -#[derive(Debug)] -pub(crate) struct ThreadLocalPriorityQueue -where - K: Send + Hash + PartialEq + Eq, - V: Send, -{ - #[allow(clippy::type_complexity)] - pqs: ThreadLocal, Priority, RandomState>>>, - reap_threshold: usize, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum PQAction { - Reap, - Cancel, - Update(Instant), -} - -impl ThreadLocalPriorityQueue -where - K: Send + Sync + Hash + PartialEq + Eq, - V: Send + Sync, -{ - pub(crate) const AGRESSIVE_REAP_THRESHOLD: usize = 1_000_000; - pub fn new(reap_threshold: Option) -> Self { - Self { - pqs: ThreadLocal::new(), - reap_threshold: reap_threshold.unwrap_or(Self::AGRESSIVE_REAP_THRESHOLD), - } - } - - pub fn set_reap_threshold(&mut self, reap_threahold: usize) { - self.reap_threshold = reap_threahold; - } - - fn get_pq_lock(&self) -> &RwLock, Priority, RandomState>> { - self.pqs - .get_or(|| RwLock::new(PriorityQueue::with_default_hasher())) - } - - /// Insert an entry into the priority queue. If the queue already contains an entry with the - /// same key, the old entry is first removed from the queue, and then the new entry is inserted. - /// - /// # Thread Safety - /// - /// This method is thread-safe but should not be called if the current thread is - /// holding a lock on any element in the priority queue. - /// - /// # Panics - /// - /// Panics if any lock acquired by this method is poisoned. - pub fn push(&self, key: K, value: V, expires_at: Instant) { - let new_entry = Entry { key, value }; - let pq = self.get_pq_lock(); - let mut pq_lock = pq.write().unwrap(); - - // Calling the PriorityQueue .push() does not ensure that the entry will be in the queue. - // Its documentation mentions: - // - // "If an element equal to item is already in the queue, its priority is updated and the - // old priority is returned in Some; otherwise, item is inserted with priority and None is - // returned." - // - // But "equal to item" actually means "hashing to the same value", and in the case of a - // struct Entry, the hash function only hashes the key, not the value, so if we try to push - // a new entry with the same key but a different value, the old entry will be updated with - // the new priority but the value will not be updated. To avoid this, always remove any - // entry with a similar key (whatever the value) before trying to push the new one. - let _ = pq_lock.remove(&new_entry); - pq_lock.push(new_entry, Priority(expires_at)); - } - - /// Reap expired entries from the priority queue. - /// - /// # Thread Safety - /// - /// This method is thread-safe but should not be called if the current thread is - /// holding a lock on any element in the priority queue. - /// - /// # Panics - /// - /// Panics if any lock acquired by this method is poisoned. - pub fn reap_expired( - &self, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - let pql = self.get_pq_lock(); - let mut pq = pql.write().unwrap(); - Self::reap_expired_locked_with_time( - &mut pq, - &Instant::now(), - on_expired, - on_reaped, - self.reap_threshold, - ) - } - - /// Reap expired entries from all priority queues (regardless of current thread) - /// - /// # Thread Safety - /// - /// This method is thread-safe but should not be called if the current thread is - /// holding a lock on any element in the priority queue. - /// - /// # Panics - /// - /// Panics if any lock acquired by this method is poisoned. - pub fn reap_all_expired( - &self, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - self.reap_all_expired_with_time_internal(&Instant::now(), &on_expired, &on_reaped) - } - - #[allow(unused)] // This is unused for now if shuttle is not enabled - #[cfg(test)] - pub fn reap_all_expired_with_time( - &self, - now: &Instant, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - self.reap_all_expired_with_time_internal(now, &on_expired, &on_reaped) - } - - fn reap_all_expired_with_time_internal( - &self, - now: &Instant, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - let pqs = self.pqs.iter(); - let mut all_reaped = vec![]; - for pq in pqs { - let mut pq = pq.write().unwrap(); - let mut reaped = Self::reap_expired_locked_with_time( - &mut pq, - now, - &on_expired, - &on_reaped, - self.reap_threshold, - ); - all_reaped.append(&mut reaped); - } - all_reaped - } - - fn aggressive_reap( - pq: &mut concurrency::sync::RwLockWriteGuard< - PriorityQueue, Priority, RandomState>, - >, - now: &Instant, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - ) -> Vec { - let reaped: Vec<_> = pq - .extract_if(|entry, _prio| { - let action = on_expired(now, &entry.key, &entry.value); - action == PQAction::Reap || action == PQAction::Cancel - }) - .map(|(entry, _prio)| { - let key = entry.key; - on_reaped(&key, entry.value); - key - }) - .collect(); - debug!("Reaped {} flows", reaped.len()); - reaped - } - - fn reap_expired_locked_with_time( - pq: &mut concurrency::sync::RwLockWriteGuard< - PriorityQueue, Priority, RandomState>, - >, - now: &Instant, - on_expired: impl Fn(&Instant, &K, &V) -> PQAction, - on_reaped: impl Fn(&K, V), - reap_threshold: usize, - ) -> Vec { - let len = pq.len(); - if len > reap_threshold { - warn!("The number of flows ({len}) exceeds {reap_threshold}. Reaping..."); - return Self::aggressive_reap(pq, now, on_expired, on_reaped); - } - - let mut expired = Vec::new(); - debug!( - "Reaping expired flows at {:?}, queue size {}", - now, - pq.len() - ); - - while let Some((_, expires_at)) = pq.peek() { - if *now >= expires_at.0 { - let ret = pq.pop(); - let Some(entry) = ret else { - break; - }; - // This is going to copy the entry and key, even if it is to be reinserted, - // which sucks. Find a better way to do this and placate the rust - // borrow checker. Without this copy, the borrow checker will - // complain that you cannot pop the entry because of the borrow in the peek. - // - // This is probably fine for now though as we use K that is a FlowKey, - // copying it isn't ideal but probably cheap enough, and the value is an Arc. - expired.push(entry); - } else { - break; - } - } - - debug!( - "Found {} expired flows at {:?}, queue size {}", - expired.len(), - now, - pq.len() - ); - - let mut reaped = vec![]; - for (entry, _) in expired { - match on_expired(now, &entry.key, &entry.value) { - PQAction::Reap | PQAction::Cancel => { - // entry.value is consumed here, but the key is kept so that - // the corresponding flow-entry, pointing to a dropped flow-info - // can be removed from the flow-table. - on_reaped(&entry.key, entry.value); - reaped.push(entry.key); - } - PQAction::Update(new_expires_at) => { - pq.push(entry, Priority(new_expires_at)); - } - } - } - reaped - } -} - -impl Default for ThreadLocalPriorityQueue -where - K: Send + Sync + Hash + PartialEq + Eq, - V: Send + Sync, -{ - fn default() -> Self { - Self::new(None) - } -} diff --git a/nat/src/portfw/test.rs b/nat/src/portfw/test.rs index ec90764b2..e05a76947 100644 --- a/nat/src/portfw/test.rs +++ b/nat/src/portfw/test.rs @@ -6,7 +6,7 @@ mod nf_test { use crate::portfw::protocol::PortFwFlowStatus; use crate::portfw::{PortForwarder, PortFwEntry, PortFwKey, PortFwState, PortFwTableWriter}; - use flow_entry::flow_table::{ExpirationsNF, FlowLookup, FlowTable}; + use flow_entry::flow_table::{FlowLookup, FlowTable}; use net::buffer::TestBuffer; use net::flows::FlowStatus; use net::flows::flow_info_item::ExtractRef; @@ -186,14 +186,11 @@ mod nf_test { ) -> (Arc, DynPipeline, PortFwTableWriter) { // build a pipeline with flow lookup + port forwarder let mut writer = PortFwTableWriter::new(); - let flow_table = Arc::new(FlowTable::new(1024)); + let flow_table = Arc::new(FlowTable::default()); let flow_lookup_nf = FlowLookup::new("flow-lookup", flow_table.clone()); let nf = PortForwarder::new("port-forwarder", writer.reader(), flow_table.clone()); - let flow_expirations = ExpirationsNF::new(flow_table.clone()); - let pipeline: DynPipeline = DynPipeline::new() - .add_stage(flow_lookup_nf) - .add_stage(nf) - .add_stage(flow_expirations); + let pipeline: DynPipeline = + DynPipeline::new().add_stage(flow_lookup_nf).add_stage(nf); // set port-forwarding rules writer.update_table(ruleset).unwrap();