From f56e719e95a9cdfb1cac2871a40b6b89465861f1 Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Wed, 12 Feb 2025 11:21:19 +0000 Subject: [PATCH] Refactor --- src/orchestration.rs | 27 +++++++-- src/udp_listen.rs | 138 +++++++++++++++++++------------------------ 2 files changed, 84 insertions(+), 81 deletions(-) diff --git a/src/orchestration.rs b/src/orchestration.rs index bdab95c..45dda57 100644 --- a/src/orchestration.rs +++ b/src/orchestration.rs @@ -1,7 +1,8 @@ +use crate::node_service::node_service_client::NodeServiceClient; +use crate::node_service::HealthCheckRequest; +use crate::topology::DeviceCapabilities; use std::net::SocketAddr; use tonic::codec::CompressionEncoding; -use crate::node_service::node_service_client::NodeServiceClient; -use crate::topology::DeviceCapabilities; pub struct PeerHandle { pub node_id: String, @@ -13,9 +14,16 @@ pub struct PeerHandle { } impl PeerHandle { - pub async fn new(node_id: String, address: SocketAddr, address_priority: u8, description: Option, device_capabilities: DeviceCapabilities) -> Result { + pub async fn new( + node_id: String, + address: SocketAddr, + address_priority: u8, + description: Option, + device_capabilities: DeviceCapabilities, + ) -> Result { let endpoint = format!("http://{}", address); - let client = NodeServiceClient::connect(endpoint).await? + let client = NodeServiceClient::connect(endpoint) + .await? .accept_compressed(CompressionEncoding::Gzip); Ok(Self { @@ -27,4 +35,15 @@ impl PeerHandle { device_capabilities, }) } + + pub async fn is_healthy(&self) -> bool { + self.client + .lock() + .await + .health_check(HealthCheckRequest::default()) + .await + .ok() + .map(|x| x.into_inner().is_healthy) + .unwrap_or(false) + } } diff --git a/src/udp_listen.rs b/src/udp_listen.rs index e532599..f289f51 100644 --- a/src/udp_listen.rs +++ b/src/udp_listen.rs @@ -76,16 +76,56 @@ pub async fn listen_for_discovery( } } -struct PeerInfo { - address: SocketAddr, - priority: u8, -} - enum Action { HealthChecks, NewPeer(SocketAddr, DiscoveryMessage), } +async fn handle_new_peer( + peers: &mut HashMap, + addr: SocketAddr, + message: DiscoveryMessage, +) { + let existing = peers.get(&message.node_id); + let insert_new = match existing { + None => true, + Some(existing) => existing.address != addr && existing.address_priority < message.priority, + }; + + if !insert_new { + return; + } + + let description = format!("{} ({})", message.interface_type, message.interface_name); + + let new_peer = PeerHandle::new( + message.node_id.clone(), + addr.clone(), + message.priority, + Some(description), + message.device_capabilities.clone(), + ) + .await; + + let new_peer = match new_peer { + Ok(v) => v, + Err(error) => { + error!( + "Failed to connect to new peer {} at {}: {}", + message.node_id, addr, error + ); + return; + } + }; + + if !new_peer.is_healthy().await { + error!("New peer {} is not healthy", message.node_id); + return; + } + + peers.insert(message.node_id, new_peer); +} + pub async fn manage_discovery(node_info: NodeInfo) { let mut peers: HashMap = HashMap::new(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(SocketAddr, DiscoveryMessage)>(); @@ -98,78 +138,22 @@ pub async fn manage_discovery(node_info: NodeInfo) { }; match action { - Action::NewPeer(addr, message) => { - let existing = peers.get(&message.node_id); - let insert_new = match existing { - None => true, - Some(existing) => { - existing.address != addr && existing.address_priority < message.priority - } - }; - - if !insert_new { - continue; - } - - let description = format!("{} ({})", message.interface_type, message.interface_name); - - let a = PeerHandle::new( - message.node_id.clone(), - addr.clone(), - message.priority, - Some(description), - message.device_capabilities.clone(), - ) - .await; - - let a = match a { - Ok(a) => a, - Err(error) => { - error!( - "Failed to connect to new peer {} at {}: {}", - message.node_id, addr, error - ); - continue; - } - }; - - let is_healthy = a - .client - .lock() - .await - .health_check(HealthCheckRequest::default()) - .await - .ok() - .map(|x| x.into_inner().is_healthy) - .unwrap_or(false); - - if !is_healthy { - error!("Peer {} is not healthy", message.node_id); - continue; - } - - peers.insert(message.node_id, a); - } - - Action::HealthChecks => { - let mut to_remove = Vec::new(); - - for peer in peers.values() { - let is_healthy = peer.client.lock().await.health_check(HealthCheckRequest::default()) - .await - .ok() - .map(|x| x.into_inner().is_healthy) - .unwrap_or(false); - - if !is_healthy { - to_remove.push(peer.node_id.clone()); - } - } - - for peer_id in to_remove { - peers.remove(&peer_id); - } - } + Action::NewPeer(addr, message) => handle_new_peer(&mut peers, addr, message).await, + Action::HealthChecks => perform_health_checks(&mut peers).await, } } } + +async fn perform_health_checks(peers: &mut HashMap) { + let mut to_remove = Vec::new(); + + for peer in peers.values() { + if !peer.is_healthy().await { + to_remove.push(peer.node_id.clone()); + } + } + + for peer_id in to_remove { + peers.remove(&peer_id); + } +}