Add livelihood checking

This commit is contained in:
Joshua Coles 2025-02-12 11:15:57 +00:00
parent 203cb50380
commit cf15c1097e

View File

@ -2,6 +2,7 @@ use crate::orchestration::PeerHandle;
use crate::{discovery::DiscoveryMessage, node_service::HealthCheckRequest}; use crate::{discovery::DiscoveryMessage, node_service::HealthCheckRequest};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration;
use system_configuration::sys::libc::disconnectx; use system_configuration::sys::libc::disconnectx;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::select; use tokio::select;
@ -15,6 +16,7 @@ struct NodeInfo {
listen_port: u16, listen_port: u16,
allowed_peer_ids: Option<Vec<String>>, allowed_peer_ids: Option<Vec<String>>,
allowed_interfaces: Option<Vec<String>>, allowed_interfaces: Option<Vec<String>>,
discovery_timeout: Duration,
} }
pub async fn listen_for_discovery( pub async fn listen_for_discovery(
@ -79,62 +81,95 @@ struct PeerInfo {
priority: u8, priority: u8,
} }
enum Action {
HealthChecks,
NewPeer(SocketAddr, DiscoveryMessage),
}
pub async fn manage_discovery(node_info: NodeInfo) { pub async fn manage_discovery(node_info: NodeInfo) {
let mut peers: HashMap<String, PeerHandle> = HashMap::new(); let mut peers: HashMap<String, PeerHandle> = HashMap::new();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(SocketAddr, DiscoveryMessage)>(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(SocketAddr, DiscoveryMessage)>();
tokio::spawn(listen_for_discovery(node_info.clone(), tx)); tokio::spawn(listen_for_discovery(node_info.clone(), tx));
while let Some((addr, message)) = rx.recv().await { loop {
info!("Received discovery message from {}", message.node_id); let action = select! {
let existing = peers.get(&message.node_id); _ = tokio::time::sleep(node_info.discovery_timeout) => Action::HealthChecks,
let insert_new = match existing { Some((addr, message)) = rx.recv() => Action::NewPeer(addr, message),
None => true,
Some(existing) => {
existing.address != addr && existing.address_priority < message.priority
}
}; };
if !insert_new { match action {
continue; Action::NewPeer(addr, message) => {
} let existing = peers.get(&message.node_id);
let insert_new = match existing {
None => true,
Some(existing) => {
existing.address != addr && existing.address_priority < message.priority
}
};
let description = format!("{} ({})", message.interface_type, message.interface_name); if !insert_new {
continue;
}
let a = PeerHandle::new( let description = format!("{} ({})", message.interface_type, message.interface_name);
message.node_id.clone(),
addr.clone(),
message.priority,
Some(description),
message.device_capabilities.clone(),
)
.await;
let a = match a { let a = PeerHandle::new(
Ok(a) => a, message.node_id.clone(),
Err(error) => { addr.clone(),
error!( message.priority,
Some(description),
message.device_capabilities.clone(),
)
.await;
let a = match a {
Ok(a) => a,
Err(error) => {
error!(
"Failed to connect to new peer {} at {}: {}", "Failed to connect to new peer {} at {}: {}",
message.node_id, addr, error message.node_id, addr, error
); );
continue; continue;
}
};
let is_healthy = a
.client
.lock()
.await
.health_check(HealthCheckRequest::default())
.await
.ok()
.map(|x| x.into_inner().is_healthy)
.unwrap_or(false);
if !is_healthy {
error!("Peer {} is not healthy", message.node_id);
continue;
}
peers.insert(message.node_id, a);
} }
};
let is_healthy = a Action::HealthChecks => {
.client let mut to_remove = Vec::new();
.lock()
.await for peer in peers.values() {
.health_check(HealthCheckRequest::default()) let is_healthy = peer.client.lock().await.health_check(HealthCheckRequest::default())
.await .await
.ok() .ok()
.map(|x| x.into_inner().is_healthy) .map(|x| x.into_inner().is_healthy)
.unwrap_or(false); .unwrap_or(false);
if !is_healthy { if !is_healthy {
error!("Peer {} is not healthy", message.node_id); to_remove.push(peer.node_id.clone());
continue; }
}
for peer_id in to_remove {
peers.remove(&peer_id);
}
}
} }
peers.insert(message.node_id, a);
} }
} }