This commit is contained in:
Joshua Coles 2025-02-12 11:21:19 +00:00
parent cf15c1097e
commit f56e719e95
2 changed files with 84 additions and 81 deletions

View File

@ -1,7 +1,8 @@
use crate::node_service::node_service_client::NodeServiceClient;
use crate::node_service::HealthCheckRequest;
use crate::topology::DeviceCapabilities;
use std::net::SocketAddr;
use tonic::codec::CompressionEncoding;
use crate::node_service::node_service_client::NodeServiceClient;
use crate::topology::DeviceCapabilities;
pub struct PeerHandle {
pub node_id: String,
@ -13,9 +14,16 @@ pub struct PeerHandle {
}
impl PeerHandle {
pub async fn new(node_id: String, address: SocketAddr, address_priority: u8, description: Option<String>, device_capabilities: DeviceCapabilities) -> Result<Self, tonic::transport::Error> {
pub async fn new(
node_id: String,
address: SocketAddr,
address_priority: u8,
description: Option<String>,
device_capabilities: DeviceCapabilities,
) -> Result<Self, tonic::transport::Error> {
let endpoint = format!("http://{}", address);
let client = NodeServiceClient::connect(endpoint).await?
let client = NodeServiceClient::connect(endpoint)
.await?
.accept_compressed(CompressionEncoding::Gzip);
Ok(Self {
@ -27,4 +35,15 @@ impl PeerHandle {
device_capabilities,
})
}
pub async fn is_healthy(&self) -> bool {
self.client
.lock()
.await
.health_check(HealthCheckRequest::default())
.await
.ok()
.map(|x| x.into_inner().is_healthy)
.unwrap_or(false)
}
}

View File

@ -76,16 +76,56 @@ pub async fn listen_for_discovery(
}
}
struct PeerInfo {
address: SocketAddr,
priority: u8,
}
enum Action {
HealthChecks,
NewPeer(SocketAddr, DiscoveryMessage),
}
async fn handle_new_peer(
peers: &mut HashMap<String, PeerHandle>,
addr: SocketAddr,
message: DiscoveryMessage,
) {
let existing = peers.get(&message.node_id);
let insert_new = match existing {
None => true,
Some(existing) => existing.address != addr && existing.address_priority < message.priority,
};
if !insert_new {
return;
}
let description = format!("{} ({})", message.interface_type, message.interface_name);
let new_peer = PeerHandle::new(
message.node_id.clone(),
addr.clone(),
message.priority,
Some(description),
message.device_capabilities.clone(),
)
.await;
let new_peer = match new_peer {
Ok(v) => v,
Err(error) => {
error!(
"Failed to connect to new peer {} at {}: {}",
message.node_id, addr, error
);
return;
}
};
if !new_peer.is_healthy().await {
error!("New peer {} is not healthy", message.node_id);
return;
}
peers.insert(message.node_id, new_peer);
}
pub async fn manage_discovery(node_info: NodeInfo) {
let mut peers: HashMap<String, PeerHandle> = HashMap::new();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(SocketAddr, DiscoveryMessage)>();
@ -98,78 +138,22 @@ pub async fn manage_discovery(node_info: NodeInfo) {
};
match action {
Action::NewPeer(addr, message) => {
let existing = peers.get(&message.node_id);
let insert_new = match existing {
None => true,
Some(existing) => {
existing.address != addr && existing.address_priority < message.priority
}
};
if !insert_new {
continue;
}
let description = format!("{} ({})", message.interface_type, message.interface_name);
let a = PeerHandle::new(
message.node_id.clone(),
addr.clone(),
message.priority,
Some(description),
message.device_capabilities.clone(),
)
.await;
let a = match a {
Ok(a) => a,
Err(error) => {
error!(
"Failed to connect to new peer {} at {}: {}",
message.node_id, addr, error
);
continue;
}
};
let is_healthy = a
.client
.lock()
.await
.health_check(HealthCheckRequest::default())
.await
.ok()
.map(|x| x.into_inner().is_healthy)
.unwrap_or(false);
if !is_healthy {
error!("Peer {} is not healthy", message.node_id);
continue;
}
peers.insert(message.node_id, a);
}
Action::HealthChecks => {
let mut to_remove = Vec::new();
for peer in peers.values() {
let is_healthy = peer.client.lock().await.health_check(HealthCheckRequest::default())
.await
.ok()
.map(|x| x.into_inner().is_healthy)
.unwrap_or(false);
if !is_healthy {
to_remove.push(peer.node_id.clone());
}
}
for peer_id in to_remove {
peers.remove(&peer_id);
}
}
Action::NewPeer(addr, message) => handle_new_peer(&mut peers, addr, message).await,
Action::HealthChecks => perform_health_checks(&mut peers).await,
}
}
}
async fn perform_health_checks(peers: &mut HashMap<String, PeerHandle>) {
let mut to_remove = Vec::new();
for peer in peers.values() {
if !peer.is_healthy().await {
to_remove.push(peer.node_id.clone());
}
}
for peer_id in to_remove {
peers.remove(&peer_id);
}
}