From f2fe73b6e30be323c7cf247d991d668094dabe9c Mon Sep 17 00:00:00 2001 From: xdustinface Date: Tue, 10 Mar 2026 12:13:12 +0700 Subject: [PATCH] refactor: remove `current_sync_peer` from network manager Simplify peer selection in `send_to_single_peer` by removing the sticky sync peer tracking. Peers are now selected directly based on message type requirements and the round-robin counter. --- dash-spv/src/network/manager.rs | 111 +++++++------------------------- 1 file changed, 25 insertions(+), 86 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 8146066a5..cd807ad8e 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -58,8 +58,6 @@ pub struct PeerNetworkManager { tasks: Arc>>, /// Initial peer addresses initial_peers: Vec, - /// Current sync peer (sticky during sync operations) - current_sync_peer: Arc>>, /// Data directory for storage data_dir: PathBuf, /// Mempool strategy from config @@ -114,7 +112,6 @@ impl PeerNetworkManager { shutdown_token: CancellationToken::new(), tasks: Arc::new(Mutex::new(JoinSet::new())), initial_peers: config.peers.clone(), - current_sync_peer: Arc::new(Mutex::new(None)), data_dir, mempool_strategy: config.mempool_strategy, user_agent: config.user_agent.clone(), @@ -963,7 +960,7 @@ impl PeerNetworkManager { }); } - /// Send a message to a single peer (using sticky peer selection for sync consistency) + /// Send a message to a single peer selected by message type requirements. async fn send_to_single_peer(&self, message: NetworkMessage) -> NetworkResult<()> { let peers = self.pool.get_all_peers().await; @@ -971,92 +968,33 @@ impl PeerNetworkManager { return Err(NetworkError::ConnectionFailed("No connected peers".to_string())); } - let required_service = match &message { + let preferred_service = match &message { NetworkMessage::GetCFHeaders(_) | NetworkMessage::GetCFilters(_) => { - Some(ServiceFlags::COMPACT_FILTERS) + Some((ServiceFlags::COMPACT_FILTERS, true)) + } + NetworkMessage::GetHeaders(_) | NetworkMessage::GetHeaders2(_) => { + Some((ServiceFlags::NODE_HEADERS_COMPRESSED, false)) } _ => None, }; - let check_headers2 = - matches!(&message, NetworkMessage::GetHeaders(_) | NetworkMessage::GetHeaders2(_)); - let selected_peer = if let Some(flags) = required_service { + let (addr, peer) = if let Some((flags, required)) = preferred_service { match self.pool.peer_with_service(flags).await { - Some((address, _)) => { + Some((address, peer)) => { log::debug!("Selected peer {} with {} for {}", address, flags, message.cmd()); - address + (address, peer) } - None => { + None if required => { log::warn!("No peers support {}, cannot send {}", flags, message.cmd()); return Err(NetworkError::ProtocolError(format!("No peers support {}", flags))); } + None => self.next_peer(&peers), } - } else if check_headers2 { - // Prefer a peer that advertises headers2 support - let mut current_sync_peer = self.current_sync_peer.lock().await; - let mut selected: Option = None; - - if let Some(current_addr) = *current_sync_peer { - if let Some((_, peer)) = peers.iter().find(|(addr, _)| *addr == current_addr) { - let peer_guard = peer.read().await; - if peer_guard.supports_headers2() { - selected = Some(current_addr); - } - } - } - - if selected.is_none() { - selected = self - .pool - .peer_with_service(ServiceFlags::NODE_HEADERS_COMPRESSED) - .await - .map(|(addr, _)| addr); - } - - let chosen = selected.unwrap_or(peers[0].0); - if Some(chosen) != *current_sync_peer { - log::info!("Sync peer selected for Headers2: {}", chosen); - *current_sync_peer = Some(chosen); - } - drop(current_sync_peer); - chosen } else { - // For non-filter messages, use the sticky sync peer - let mut current_sync_peer = self.current_sync_peer.lock().await; - let selected = if let Some(current_addr) = *current_sync_peer { - // Check if current sync peer is still connected - if peers.iter().any(|(addr, _)| *addr == current_addr) { - // Keep using the same peer for sync consistency - current_addr - } else { - // Current sync peer disconnected, pick a new one - let new_addr = peers[0].0; - log::info!( - "Sync peer switched from {} to {} (previous peer disconnected)", - current_addr, - new_addr - ); - *current_sync_peer = Some(new_addr); - new_addr - } - } else { - // No current sync peer, pick the first available - let new_addr = peers[0].0; - log::info!("Sync peer selected: {}", new_addr); - *current_sync_peer = Some(new_addr); - new_addr - }; - drop(current_sync_peer); - selected + self.next_peer(&peers) }; - // Find the peer for the selected address - let (addr, peer) = peers - .iter() - .find(|(a, _)| *a == selected_peer) - .ok_or_else(|| NetworkError::ConnectionFailed("Selected peer not found".to_string()))?; - - self.send_message_to_peer(addr, peer, message).await + self.send_message_to_peer(&addr, &peer, message).await } /// Send a message distributed across connected peers using round-robin selection. @@ -1106,18 +1044,20 @@ impl PeerNetworkManager { }; } - // Round-robin selection - let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % selected_peers.len(); - let (addr, peer) = &selected_peers[idx]; + let (addr, peer) = self.next_peer(&selected_peers); - log::debug!( - "Distributing {} request to peer {} (round-robin idx {})", - message.cmd(), - addr, - idx - ); + log::debug!("Distributing {} request to peer {}", message.cmd(), addr); - self.send_message_to_peer(addr, peer, message).await + self.send_message_to_peer(&addr, &peer, message).await + } + + /// Pick the next peer from `peers` using round-robin rotation. + fn next_peer( + &self, + peers: &[(SocketAddr, Arc>)], + ) -> (SocketAddr, Arc>) { + let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % peers.len(); + (peers[idx].0, peers[idx].1.clone()) } /// Send a message to the given peer. @@ -1280,7 +1220,6 @@ impl Clone for PeerNetworkManager { shutdown_token: self.shutdown_token.clone(), tasks: self.tasks.clone(), initial_peers: self.initial_peers.clone(), - current_sync_peer: self.current_sync_peer.clone(), data_dir: self.data_dir.clone(), mempool_strategy: self.mempool_strategy, user_agent: self.user_agent.clone(),