diff --git a/src/udp_listen.rs b/src/udp_listen.rs index 6bbc98c..e532599 100644 --- a/src/udp_listen.rs +++ b/src/udp_listen.rs @@ -2,6 +2,7 @@ use crate::orchestration::PeerHandle; use crate::{discovery::DiscoveryMessage, node_service::HealthCheckRequest}; use std::collections::HashMap; use std::net::SocketAddr; +use std::time::Duration; use system_configuration::sys::libc::disconnectx; use tokio::net::UdpSocket; use tokio::select; @@ -15,6 +16,7 @@ struct NodeInfo { listen_port: u16, allowed_peer_ids: Option>, allowed_interfaces: Option>, + discovery_timeout: Duration, } pub async fn listen_for_discovery( @@ -79,62 +81,95 @@ struct PeerInfo { priority: u8, } +enum Action { + HealthChecks, + NewPeer(SocketAddr, DiscoveryMessage), +} + pub async fn manage_discovery(node_info: NodeInfo) { let mut peers: HashMap = HashMap::new(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(SocketAddr, DiscoveryMessage)>(); tokio::spawn(listen_for_discovery(node_info.clone(), tx)); - while let Some((addr, message)) = rx.recv().await { - info!("Received discovery message from {}", message.node_id); - let existing = peers.get(&message.node_id); - let insert_new = match existing { - None => true, - Some(existing) => { - existing.address != addr && existing.address_priority < message.priority - } + loop { + let action = select! { + _ = tokio::time::sleep(node_info.discovery_timeout) => Action::HealthChecks, + Some((addr, message)) = rx.recv() => Action::NewPeer(addr, message), }; - if !insert_new { - continue; - } + match action { + Action::NewPeer(addr, message) => { + let existing = peers.get(&message.node_id); + let insert_new = match existing { + None => true, + Some(existing) => { + existing.address != addr && existing.address_priority < message.priority + } + }; - let description = format!("{} ({})", message.interface_type, message.interface_name); + if !insert_new { + continue; + } - let a = PeerHandle::new( - message.node_id.clone(), - addr.clone(), - message.priority, - Some(description), - message.device_capabilities.clone(), - ) - .await; + let description = format!("{} ({})", message.interface_type, message.interface_name); - let a = match a { - Ok(a) => a, - Err(error) => { - error!( + let a = PeerHandle::new( + message.node_id.clone(), + addr.clone(), + message.priority, + Some(description), + message.device_capabilities.clone(), + ) + .await; + + let a = match a { + Ok(a) => a, + Err(error) => { + error!( "Failed to connect to new peer {} at {}: {}", message.node_id, addr, error ); - continue; + continue; + } + }; + + let is_healthy = a + .client + .lock() + .await + .health_check(HealthCheckRequest::default()) + .await + .ok() + .map(|x| x.into_inner().is_healthy) + .unwrap_or(false); + + if !is_healthy { + error!("Peer {} is not healthy", message.node_id); + continue; + } + + peers.insert(message.node_id, a); } - }; - let is_healthy = a - .client - .lock() - .await - .health_check(HealthCheckRequest::default()) - .await - .ok() - .map(|x| x.into_inner().is_healthy) - .unwrap_or(false); + Action::HealthChecks => { + let mut to_remove = Vec::new(); + + for peer in peers.values() { + let is_healthy = peer.client.lock().await.health_check(HealthCheckRequest::default()) + .await + .ok() + .map(|x| x.into_inner().is_healthy) + .unwrap_or(false); - if !is_healthy { - error!("Peer {} is not healthy", message.node_id); - continue; + if !is_healthy { + to_remove.push(peer.node_id.clone()); + } + } + + for peer_id in to_remove { + peers.remove(&peer_id); + } + } } - - peers.insert(message.node_id, a); } }