This commit is contained in:
Joshua Coles 2025-02-12 07:29:47 +00:00
parent 70d58995ec
commit 5f6f8b6a43
4 changed files with 121 additions and 0 deletions

View File

@ -1,3 +1,5 @@
mod udp;
use crate::topology::DeviceCapabilities;
use serde::{Deserialize, Serialize};
use std::{

98
src/discovery/udp.rs Normal file
View File

@ -0,0 +1,98 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use crate::topology::DeviceCapabilities;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use crate::discovery::PeerInfo;
struct UdpDiscovery {
listen_port: u16,
listen_buffer_size: usize,
allowed_interface_types: Option<Vec<String>>,
known_peers: RwLock<HashMap<String, PeerInfo>>,
listen_handle: JoinHandle<()>,
presence_handle: JoinHandle<()>,
}
#[derive(Debug, Serialize, Deserialize)]
struct DiscoveryMessage {
#[serde(rename = "type")]
message_type: String,
node_id: String,
grpc_port: u16,
device_capabilities: DeviceCapabilities,
priority: i32,
interface_name: String,
interface_type: String,
}
impl UdpDiscovery {
async fn listen(&self) {
let listen_socket = tokio::net::UdpSocket::bind("0.0.0.0:42069").await.unwrap();
let mut buf = vec![0; self.listen_buffer_size];
loop {
// This will block waiting for a message.
// If this fails it will end the loop and the task, which is what we want.
let Ok((len, addr)) = listen_socket.recv_from(&mut buf).await.unwrap();
let Some(message) = serde_json::from_slice::<DiscoveryMessage>(&buf[..len]) else {
error!(
"Received invalid discovery message from {} {:?}",
addr,
str::from_utf8(&buf[..len])
);
continue;
};
if let Some(ref allowed_interface_types) = self.allowed_interface_types {
if !allowed_interface_types.contains(&message.interface_type) {
debug!("Ignoring message from {} because interface type {} is not allowed", addr, message.interface_type);
continue;
}
}
self.on_discovery_message(message, addr).await;
}
}
async fn on_discovery_message(&self, message: DiscoveryMessage, addr: SocketAddr) {
let known_peers = self.known_peers.write().await;
let existing = known_peers.get(&message.node_id);
if let Some(existing) = existing {
}
/*
if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
if peer_id in self.known_peers:
existing_peer_prio = self.known_peers[peer_id][3]
if existing_peer_prio >= peer_prio:
if DEBUG >= 1:
print(
f"Ignoring peer {peer_id} at {peer_host}:{peer_port} with priority {peer_prio} because we already know about a peer with higher or equal priority: {existing_peer_prio}")
return
new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}",
f"{peer_interface_type} ({peer_interface_name})",
device_capabilities)
if not await new_peer_handle.health_check():
if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Skipping.")
return
if DEBUG >= 1: print(
f"Adding {peer_id=} at {peer_host}:{peer_port}. Replace existing peer_id: {peer_id in self.known_peers}")
self.known_peers[peer_id] = (new_peer_handle, time.time(), time.time(), peer_prio)
else:
if not await self.known_peers[peer_id][0].health_check():
if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Removing.")
if peer_id in self.known_peers: del self.known_peers[peer_id]
return
if peer_id in self.known_peers: self.known_peers[peer_id] = (
self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time(), peer_prio)
*/
}
}
}

View File

@ -1,5 +1,6 @@
mod topology;
mod discovery;
mod orchestration;
use serde::{Deserialize, Serialize};
use serde_json::Value;

20
src/orchestration.rs Normal file
View File

@ -0,0 +1,20 @@
use std::net::SocketAddr;
use crate::topology::DeviceCapabilities;
struct PeerHandle {
node_id: String,
address: SocketAddr,
device_capabilities: DeviceCapabilities,
}
impl PeerHandle {
fn new(node_id: String, address: SocketAddr, device_capabilities: DeviceCapabilities) -> Self {
crate::node_service::node_service_client::NodeServiceClient::connect(address);
Self {
node_id,
address,
device_capabilities,
}
}
}