use crate::orchestration::PeerHandle; use crate::{discovery::DiscoveryMessage, node_service::HealthCheckRequest}; use std::collections::HashMap; use std::net::SocketAddr; use system_configuration::sys::libc::disconnectx; use tokio::net::UdpSocket; use tokio::select; use tokio::sync::mpsc::UnboundedSender; use tonic::transport::Error; use tracing::{debug, error, info}; #[derive(Debug, Clone)] struct NodeInfo { id: String, listen_port: u16, allowed_peer_ids: Option>, allowed_interfaces: Option>, } pub async fn listen_for_discovery( node_info: NodeInfo, tx: UnboundedSender<(SocketAddr, DiscoveryMessage)>, ) { let socket = UdpSocket::bind(format!("0.0.0.0:{}", node_info.listen_port)) .await .unwrap(); let mut buf = vec![0u8; 65535]; loop { let (len, addr) = socket.recv_from(&mut buf).await.unwrap(); if len == 0 { continue; } let Ok(message) = String::from_utf8(buf[..len].to_vec()) else { error!("Invalid UTF-8 message from {}", addr); continue; }; let Ok(message) = serde_json::from_str::(&message) else { error!("Invalid discovery message from {}", addr); continue; }; // Validate message if message.message_type != "discovery" || message.node_id == node_info.id { continue; } if node_info .allowed_peer_ids .as_ref() .map(|ids| !ids.contains(&message.node_id)) .unwrap_or(false) { debug!( "Ignoring peer {peer_id} as it's not in the allowed node IDs list", peer_id = message.node_id ); continue; } if node_info .allowed_interfaces .as_ref() .map(|interfaces| !interfaces.contains(&message.interface_name)) .unwrap_or(false) { debug!("Ignoring peer {peer_id} as it's interface {interface} is not in the allowed interfaces list", peer_id = message.node_id, interface = message.interface_name); continue; } tx.send((addr, message)).unwrap(); } } struct PeerInfo { address: SocketAddr, priority: u8, } pub async fn manage_discovery(node_info: NodeInfo) { let mut peers: HashMap = HashMap::new(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(SocketAddr, DiscoveryMessage)>(); tokio::spawn(listen_for_discovery(node_info.clone(), tx)); while let Some((addr, message)) = rx.recv().await { info!("Received discovery message from {}", message.node_id); let existing = peers.get(&message.node_id); let insert_new = match existing { None => true, Some(existing) => { existing.address != addr && existing.address_priority < message.priority } }; if !insert_new { continue; } let description = format!("{} ({})", message.interface_type, message.interface_name); let a = PeerHandle::new( message.node_id.clone(), addr.clone(), message.priority, Some(description), message.device_capabilities.clone(), ) .await; let a = match a { Ok(a) => a, Err(error) => { error!( "Failed to connect to new peer {} at {}: {}", message.node_id, addr, error ); continue; } }; let is_healthy = a .client .lock() .await .health_check(HealthCheckRequest::default()) .await .ok() .map(|x| x.into_inner().is_healthy) .unwrap_or(false); if !is_healthy { error!("Peer {} is not healthy", message.node_id); continue; } peers.insert(message.node_id, a); } }