Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 25 additions & 86 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ pub struct PeerNetworkManager {
tasks: Arc<Mutex<JoinSet<()>>>,
/// Initial peer addresses
initial_peers: Vec<SocketAddr>,
/// Current sync peer (sticky during sync operations)
current_sync_peer: Arc<Mutex<Option<SocketAddr>>>,
/// Data directory for storage
data_dir: PathBuf,
/// Mempool strategy from config
Expand Down Expand Up @@ -114,7 +112,6 @@ impl PeerNetworkManager {
shutdown_token: CancellationToken::new(),
tasks: Arc::new(Mutex::new(JoinSet::new())),
initial_peers: config.peers.clone(),
current_sync_peer: Arc::new(Mutex::new(None)),
data_dir,
mempool_strategy: config.mempool_strategy,
user_agent: config.user_agent.clone(),
Expand Down Expand Up @@ -963,100 +960,41 @@ impl PeerNetworkManager {
});
}

/// Send a message to a single peer (using sticky peer selection for sync consistency)
/// Send a message to a single peer selected by message type requirements.
async fn send_to_single_peer(&self, message: NetworkMessage) -> NetworkResult<()> {
let peers = self.pool.get_all_peers().await;

if peers.is_empty() {
return Err(NetworkError::ConnectionFailed("No connected peers".to_string()));
}

let required_service = match &message {
let preferred_service = match &message {
NetworkMessage::GetCFHeaders(_) | NetworkMessage::GetCFilters(_) => {
Some(ServiceFlags::COMPACT_FILTERS)
Some((ServiceFlags::COMPACT_FILTERS, true))
}
NetworkMessage::GetHeaders(_) | NetworkMessage::GetHeaders2(_) => {
Some((ServiceFlags::NODE_HEADERS_COMPRESSED, false))
}
_ => None,
};
let check_headers2 =
matches!(&message, NetworkMessage::GetHeaders(_) | NetworkMessage::GetHeaders2(_));

let selected_peer = if let Some(flags) = required_service {
let (addr, peer) = if let Some((flags, required)) = preferred_service {
match self.pool.peer_with_service(flags).await {
Some((address, _)) => {
Some((address, peer)) => {
log::debug!("Selected peer {} with {} for {}", address, flags, message.cmd());
address
(address, peer)
}
None => {
None if required => {
log::warn!("No peers support {}, cannot send {}", flags, message.cmd());
return Err(NetworkError::ProtocolError(format!("No peers support {}", flags)));
}
None => self.next_peer(&peers),
}
} else if check_headers2 {
// Prefer a peer that advertises headers2 support
let mut current_sync_peer = self.current_sync_peer.lock().await;
let mut selected: Option<SocketAddr> = None;

if let Some(current_addr) = *current_sync_peer {
if let Some((_, peer)) = peers.iter().find(|(addr, _)| *addr == current_addr) {
let peer_guard = peer.read().await;
if peer_guard.supports_headers2() {
selected = Some(current_addr);
}
}
}

if selected.is_none() {
selected = self
.pool
.peer_with_service(ServiceFlags::NODE_HEADERS_COMPRESSED)
.await
.map(|(addr, _)| addr);
}

let chosen = selected.unwrap_or(peers[0].0);
if Some(chosen) != *current_sync_peer {
log::info!("Sync peer selected for Headers2: {}", chosen);
*current_sync_peer = Some(chosen);
}
drop(current_sync_peer);
chosen
} else {
// For non-filter messages, use the sticky sync peer
let mut current_sync_peer = self.current_sync_peer.lock().await;
let selected = if let Some(current_addr) = *current_sync_peer {
// Check if current sync peer is still connected
if peers.iter().any(|(addr, _)| *addr == current_addr) {
// Keep using the same peer for sync consistency
current_addr
} else {
// Current sync peer disconnected, pick a new one
let new_addr = peers[0].0;
log::info!(
"Sync peer switched from {} to {} (previous peer disconnected)",
current_addr,
new_addr
);
*current_sync_peer = Some(new_addr);
new_addr
}
} else {
// No current sync peer, pick the first available
let new_addr = peers[0].0;
log::info!("Sync peer selected: {}", new_addr);
*current_sync_peer = Some(new_addr);
new_addr
};
drop(current_sync_peer);
selected
self.next_peer(&peers)
};

// Find the peer for the selected address
let (addr, peer) = peers
.iter()
.find(|(a, _)| *a == selected_peer)
.ok_or_else(|| NetworkError::ConnectionFailed("Selected peer not found".to_string()))?;

self.send_message_to_peer(addr, peer, message).await
self.send_message_to_peer(&addr, &peer, message).await
}

/// Send a message distributed across connected peers using round-robin selection.
Expand Down Expand Up @@ -1106,18 +1044,20 @@ impl PeerNetworkManager {
};
}

// Round-robin selection
let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % selected_peers.len();
let (addr, peer) = &selected_peers[idx];
let (addr, peer) = self.next_peer(&selected_peers);

log::debug!(
"Distributing {} request to peer {} (round-robin idx {})",
message.cmd(),
addr,
idx
);
log::debug!("Distributing {} request to peer {}", message.cmd(), addr);

self.send_message_to_peer(addr, peer, message).await
self.send_message_to_peer(&addr, &peer, message).await
}

/// Pick the next peer from `peers` using round-robin rotation.
fn next_peer(
&self,
peers: &[(SocketAddr, Arc<RwLock<Peer>>)],
) -> (SocketAddr, Arc<RwLock<Peer>>) {
let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % peers.len();
(peers[idx].0, peers[idx].1.clone())
}

/// Send a message to the given peer.
Expand Down Expand Up @@ -1280,7 +1220,6 @@ impl Clone for PeerNetworkManager {
shutdown_token: self.shutdown_token.clone(),
tasks: self.tasks.clone(),
initial_peers: self.initial_peers.clone(),
current_sync_peer: self.current_sync_peer.clone(),
data_dir: self.data_dir.clone(),
mempool_strategy: self.mempool_strategy,
user_agent: self.user_agent.clone(),
Expand Down
Loading