@@ -22,7 +22,7 @@ use net::flow_key::{IcmpProtoKey, Uni};
2222use net:: flows:: { ExtractRef , FlowInfo } ;
2323use net:: headers:: { Net , Transport , TryIp , TryIpMut , TryTransportMut } ;
2424use net:: packet:: { DoneReason , Packet , VpcDiscriminant } ;
25- use net:: { FlowKey , FlowKeyData , IpProtoKey } ;
25+ use net:: { FlowKey , IpProtoKey } ;
2626use pipeline:: { NetworkFunction , PipelineData } ;
2727use std:: fmt:: { Debug , Display } ;
2828use std:: net:: { IpAddr , Ipv4Addr , Ipv6Addr } ;
@@ -173,36 +173,53 @@ impl StatefulNat {
173173 Instant :: now ( ) + timeout
174174 }
175175
176- fn create_session < I : NatIpWithBitmap > (
177- & self ,
178- flow_key : & FlowKey ,
176+ fn setup_flow_nat_state < I : NatIpWithBitmap > (
177+ flow_info : & FlowInfo ,
179178 state : NatFlowState < I > ,
180179 dst_vpcd : VpcDiscriminant ,
181- idle_timeout : Duration ,
182180 ) {
183- // Clear the destination VPC so we can make lookups without knowing it
184- let new_flow_key = FlowKey :: Unidirectional ( FlowKeyData :: new (
185- flow_key. data ( ) . src_vpcd ( ) ,
186- * flow_key. data ( ) . src_ip ( ) ,
187- * flow_key. data ( ) . dst_ip ( ) ,
188- * flow_key. data ( ) . proto_key_info ( ) ,
189- ) ) ;
190- debug ! (
191- "{}: Creating new flow session entry: {} -> {}" ,
192- self . name( ) ,
193- new_flow_key. data( ) ,
194- state
195- ) ;
196-
197- let flow_info = FlowInfo :: new ( Self :: session_timeout_time ( idle_timeout) ) ;
181+ let flow_key = flow_info. flowkey ( ) . unwrap_or_else ( || unreachable ! ( ) ) ;
182+ debug ! ( "Setting up new flow: {flow_key} -> {state}" ) ;
198183 if let Ok ( mut write_guard) = flow_info. locked . write ( ) {
199184 write_guard. nat_state = Some ( Box :: new ( state) ) ;
200185 write_guard. dst_vpcd = Some ( Box :: new ( dst_vpcd) ) ;
201186 } else {
202187 // flow info is just locally created
203188 unreachable ! ( )
204189 }
205- self . sessions . insert ( new_flow_key, flow_info) ;
190+ }
191+
192+ fn create_flow_pair < Buf : PacketBufferMut , I : NatIpWithBitmap > (
193+ & self ,
194+ packet : & mut Packet < Buf > ,
195+ flow_key : & FlowKey ,
196+ alloc : AllocationResult < AllocatedIpPort < I > > ,
197+ ) -> Result < ( ) , StatefulNatError > {
198+ // Given that at least one of alloc.src or alloc.dst is set, we should always have at least one timeout set.
199+ let idle_timeout = alloc. idle_timeout ( ) . unwrap_or_else ( || unreachable ! ( ) ) ;
200+
201+ // src and dst vpc of this packet
202+ let src_vpc_id = packet. meta ( ) . src_vpcd . unwrap_or_else ( || unreachable ! ( ) ) ;
203+ let dst_vpc_id = packet. meta ( ) . dst_vpcd . unwrap_or_else ( || unreachable ! ( ) ) ;
204+
205+ // build key for reverse flow
206+ let reverse_key = Self :: new_reverse_session ( flow_key, & alloc, dst_vpc_id) ?;
207+
208+ // build NAT state for both flows
209+ let ( forward_state, reverse_state) = Self :: new_states_from_alloc ( alloc, idle_timeout) ;
210+
211+ // build a flow pair from the keys (without NAT state)
212+ let expires_at = Self :: session_timeout_time ( idle_timeout) ;
213+ let ( forward, reverse) = FlowInfo :: related_pair ( expires_at, * flow_key, reverse_key) ;
214+
215+ // set up their NAT state
216+ Self :: setup_flow_nat_state ( & forward, forward_state, dst_vpc_id) ;
217+ Self :: setup_flow_nat_state ( & reverse, reverse_state, src_vpc_id) ;
218+
219+ // insert in flow-table
220+ self . sessions . insert_from_arc ( * flow_key, & forward) ;
221+ self . sessions . insert_from_arc ( reverse_key, & reverse) ;
222+ Ok ( ( ) )
206223 }
207224
208225 #[ allow( clippy:: unnecessary_wraps) ]
@@ -433,7 +450,6 @@ impl StatefulNat {
433450 let flow_key =
434451 FlowKey :: try_from ( Uni ( & * packet) ) . map_err ( |_| StatefulNatError :: TupleParseError ) ?;
435452
436- let src_vpc_id = packet. meta ( ) . src_vpcd . unwrap_or_else ( || unreachable ! ( ) ) ;
437453 let dst_vpc_id = packet. meta ( ) . dst_vpcd . unwrap_or_else ( || unreachable ! ( ) ) ;
438454
439455 // build extended flow key, with the dst vpc discriminant
@@ -449,22 +465,9 @@ impl StatefulNat {
449465
450466 debug ! ( "{}: Allocated translation data: {alloc}" , self . name( ) ) ;
451467
452- // Given that at least one of alloc.src or alloc.dst is set, we should always have at
453- // least one timeout set.
454- let idle_timeout = alloc. idle_timeout ( ) . unwrap_or_else ( || unreachable ! ( ) ) ;
455-
456468 let translation_data = Self :: get_translation_data ( & alloc. src , & alloc. dst ) ;
457469
458- let reverse_flow_key = Self :: new_reverse_session ( & flow_key, & alloc, dst_vpc_id) ?;
459- let ( forward_state, reverse_state) = Self :: new_states_from_alloc ( alloc, idle_timeout) ;
460-
461- self . create_session ( & flow_key, forward_state, dst_vpc_id, idle_timeout) ;
462- self . create_session (
463- & reverse_flow_key,
464- reverse_state. clone ( ) ,
465- src_vpc_id,
466- idle_timeout,
467- ) ;
470+ self . create_flow_pair ( packet, & flow_key, alloc) ?;
468471
469472 Self :: stateful_translate :: < Buf > ( self . name ( ) , packet, & translation_data) . and ( Ok ( true ) )
470473 }
0 commit comments