Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions dataplane/src/packet_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::packet_processor::ipforward::IpForwarder;

use concurrency::sync::Arc;

use flow_entry::flow_table::{ExpirationsNF, FlowLookup, FlowTable};
use flow_entry::flow_table::{FlowLookup, FlowTable};
use flow_filter::{FlowFilter, FlowFilterTableWriter};

use nat::portfw::{PortForwarder, PortFwTableWriter};
Expand Down Expand Up @@ -90,15 +90,14 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
let stats_stage = Stats::new("stats", writer.clone());
let flow_filter = FlowFilter::new("flow-filter", flowfiltertablesr_factory.handle());
let flow_lookup = FlowLookup::new("flow-lookup", flow_table.clone());
let flow_expirations_nf = ExpirationsNF::new(flow_table.clone());
let portfw = PortForwarder::new(
"port-forwarder",
portfw_factory.handle(),
flow_table.clone(),
);

// Build the pipeline for a router. The composition of the pipeline (in stages) is currently
// hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last
// hard-coded. Flow expiration is handled by per-flow tokio timers; no ExpirationsNF needed.
DynPipeline::new()
.add_stage(stage_ingress)
.add_stage(iprouter1)
Expand All @@ -109,7 +108,6 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
.add_stage(stateful_nat)
.add_stage(iprouter2)
.add_stage(stage_egress)
.add_stage(flow_expirations_nf)
.add_stage(pktdump)
.add_stage(stats_stage)
};
Expand Down
4 changes: 2 additions & 2 deletions flow-entry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ etherparse = { workspace = true }
linkme = { workspace = true }
net = { workspace = true }
pipeline = { workspace = true }
priority-queue = { workspace = true }
thiserror = { workspace = true }
thread_local = { workspace = true }
tokio = { workspace = true, features = ["rt", "time"] }
tracectl = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
bolero = { workspace = true, default-features = false }
net = { workspace = true, features = ["bolero"] }
tokio = { workspace = true, features = ["macros", "rt", "time"] }
tracing-test = { workspace = true, features = [] }
shuttle = { workspace = true }
49 changes: 26 additions & 23 deletions flow-entry/src/flow_table/README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
# Flow Table

The current implementation of flow table uses `dash_map` and per-thread priority queue's (for timeouts) along with `Arc` and `Weak` to get a reasonable flow table with timeouts.
However, it leaves a lot of room for optimizations.
The flow table uses `DashMap` for concurrent key→value storage and per-flow
tokio timers for expiration.

## Flow Table Implementation

The main `DashMap` holds `Weak` references to all the flow entries so that the memory gets automatically deallocated when the entry times out.

The priority queue's hold `Arc` references to the flow entries to keep them alive when they are not in any packet meta data.
When the entry times-out and is removed from the priority queue and the last packet referencing that flow is dropped, the memory for the entry is freed.

Note that in the current implementation, a flow is not removed from the flow table until the last Arc to the flow_info is dropped or the flow entry is replaced. This can be changed if needed, or even have it be an option on the flow as to whether timeout removes the flow or not.

## Optimizations

In the current implementation, there has to be periodic or on-timeout reaping the Weak reference in the hash table.
This is better done by having a version of `DashMap` that can reap the dead `Weak` reference as it walks the table on lookups, instead of waiting for key collisions.
The hope, for now, is that the entries in the hash table array will contain a small pointer and not take up too much extra memory.
Those dead `Weak` pointers will prevent shrinking of the hash table though, if the implementation supports that.

Second, the `priority_queue` crate uses a `HashMap` inside the queue in order to allow fast removal and re-insertion.
However, this wastes space and requires extra hashes.
The better way to do this is to have a custom priority queue integrated with the custom weak-reaping hash map so that the same hash table can be used for both operations.
This improves cache locality, reduces memory utlization, and avoids multiple hash table lookups in many cases.

However, in the interest of time to completion for the code, this module currently uses existing data structures instead of full custom implementations of everything.
However, the interface should be able to hide a change from the current to the optimized implementation.
The `DashMap` stores `Arc<FlowInfo>` values directly, so the table is the
primary strong-reference keeper for each flow entry. Callers that need to
retain a flow (e.g. pipeline stages that tag packets) clone the `Arc` out of
`lookup()`.

When a flow is inserted, a `tokio::task` is spawned (if a tokio runtime is
present) that sleeps until the flow's deadline and then calls
`update_status(FlowStatus::Expired)`. The DashMap entry is not removed by the
timer; instead, `lookup()` performs lazy cleanup: if a looked-up entry is
`Expired` or `Cancelled` it is removed from the map and `None` is returned.
The same lazy path covers the case where a deadline passes without a timer
firing (e.g. in non-tokio test contexts).

`FlowInfo::related` holds a `Weak<FlowInfo>` pointing to the reverse-direction
flow of a bidirectional pair. This avoids reference cycles: the two entries
are independent `Arc`s in the table, and the `Weak` merely lets one side
observe the other without keeping it alive.

## Non-tokio contexts (shuttle / sync tests)

When no tokio runtime is present `Handle::try_current()` returns `Err` and no
timer is spawned. Tests simulate expiration by calling
`flow_info.update_status(FlowStatus::Expired)` directly, after which the next
`lookup()` call performs the lazy removal.
6 changes: 2 additions & 4 deletions flow-entry/src/flow_table/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ impl Display for FlowTable {
Heading(format!("Flow Table ({} entries)", table.len())).fmt(f)?;
for entry in table.iter() {
let key = entry.key();
match entry.value().upgrade() {
Some(value) => writeln!(f, "key = {key}\ndata = {value}")?,
None => writeln!(f, "key = {key} NONE")?,
}
let value = entry.value();
writeln!(f, "key = {key}\ndata = {value}")?;
}
} else {
write!(f, "Failed to lock flow table")?;
Expand Down
5 changes: 1 addition & 4 deletions flow-entry/src/flow_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@
// Copyright Open Network Fabric Authors

mod display;
pub mod nf_expirations;
pub mod nf_lookup;
pub mod table;
mod thread_local_pq;

pub use nf_lookup::FlowLookup;
pub use table::FlowTable;

pub use net::flows::atomic_instant::AtomicInstant;
pub use net::flows::flow_info::*;
pub use nf_expirations::ExpirationsNF;
pub use nf_lookup::FlowLookup;

use tracectl::trace_target;
trace_target!("flow-table", LevelFilter::INFO, &["pipeline"]);
142 changes: 0 additions & 142 deletions flow-entry/src/flow_table/nf_expirations.rs

This file was deleted.

Loading
Loading