From 5f6f8b6a43d1f64fe1a664d6b459805bf2dad062 Mon Sep 17 00:00:00 2001 From: Joshua Coles Date: Wed, 12 Feb 2025 07:29:47 +0000 Subject: [PATCH] Stash --- src/{discovery.rs => discovery/mod.rs} | 2 + src/discovery/udp.rs | 98 ++++++++++++++++++++++++++ src/main.rs | 1 + src/orchestration.rs | 20 ++++++ 4 files changed, 121 insertions(+) rename src/{discovery.rs => discovery/mod.rs} (99%) create mode 100644 src/discovery/udp.rs create mode 100644 src/orchestration.rs diff --git a/src/discovery.rs b/src/discovery/mod.rs similarity index 99% rename from src/discovery.rs rename to src/discovery/mod.rs index e562c21..a9c54b1 100644 --- a/src/discovery.rs +++ b/src/discovery/mod.rs @@ -1,3 +1,5 @@ +mod udp; + use crate::topology::DeviceCapabilities; use serde::{Deserialize, Serialize}; use std::{ diff --git a/src/discovery/udp.rs b/src/discovery/udp.rs new file mode 100644 index 0000000..8c3bf5c --- /dev/null +++ b/src/discovery/udp.rs @@ -0,0 +1,98 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use crate::topology::DeviceCapabilities; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tracing::{debug, error, info}; +use crate::discovery::PeerInfo; + +struct UdpDiscovery { + listen_port: u16, + listen_buffer_size: usize, + allowed_interface_types: Option>, + known_peers: RwLock>, + + listen_handle: JoinHandle<()>, + presence_handle: JoinHandle<()>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct DiscoveryMessage { + #[serde(rename = "type")] + message_type: String, + node_id: String, + grpc_port: u16, + device_capabilities: DeviceCapabilities, + priority: i32, + interface_name: String, + interface_type: String, +} + +impl UdpDiscovery { + async fn listen(&self) { + let listen_socket = tokio::net::UdpSocket::bind("0.0.0.0:42069").await.unwrap(); + let mut buf = vec![0; self.listen_buffer_size]; + + loop { + // This will block waiting for a message. + // If this fails it will end the loop and the task, which is what we want. + let Ok((len, addr)) = listen_socket.recv_from(&mut buf).await.unwrap(); + + let Some(message) = serde_json::from_slice::(&buf[..len]) else { + error!( + "Received invalid discovery message from {} {:?}", + addr, + str::from_utf8(&buf[..len]) + ); + continue; + }; + + if let Some(ref allowed_interface_types) = self.allowed_interface_types { + if !allowed_interface_types.contains(&message.interface_type) { + debug!("Ignoring message from {} because interface type {} is not allowed", addr, message.interface_type); + continue; + } + } + + self.on_discovery_message(message, addr).await; + } + } + + async fn on_discovery_message(&self, message: DiscoveryMessage, addr: SocketAddr) { + let known_peers = self.known_peers.write().await; + let existing = known_peers.get(&message.node_id); + + if let Some(existing) = existing { + + } + + /* + if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}": + if peer_id in self.known_peers: + existing_peer_prio = self.known_peers[peer_id][3] + if existing_peer_prio >= peer_prio: + if DEBUG >= 1: + print( + f"Ignoring peer {peer_id} at {peer_host}:{peer_port} with priority {peer_prio} because we already know about a peer with higher or equal priority: {existing_peer_prio}") + return + new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", + f"{peer_interface_type} ({peer_interface_name})", + device_capabilities) + if not await new_peer_handle.health_check(): + if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Skipping.") + return + if DEBUG >= 1: print( + f"Adding {peer_id=} at {peer_host}:{peer_port}. Replace existing peer_id: {peer_id in self.known_peers}") + self.known_peers[peer_id] = (new_peer_handle, time.time(), time.time(), peer_prio) + else: + if not await self.known_peers[peer_id][0].health_check(): + if DEBUG >= 1: print(f"Peer {peer_id} at {peer_host}:{peer_port} is not healthy. Removing.") + if peer_id in self.known_peers: del self.known_peers[peer_id] + return + if peer_id in self.known_peers: self.known_peers[peer_id] = ( + self.known_peers[peer_id][0], self.known_peers[peer_id][1], time.time(), peer_prio) + */ + } + } +} diff --git a/src/main.rs b/src/main.rs index 5fb1fec..1ac08d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ mod topology; mod discovery; +mod orchestration; use serde::{Deserialize, Serialize}; use serde_json::Value; diff --git a/src/orchestration.rs b/src/orchestration.rs new file mode 100644 index 0000000..60639cf --- /dev/null +++ b/src/orchestration.rs @@ -0,0 +1,20 @@ +use std::net::SocketAddr; +use crate::topology::DeviceCapabilities; + +struct PeerHandle { + node_id: String, + address: SocketAddr, + device_capabilities: DeviceCapabilities, +} + +impl PeerHandle { + fn new(node_id: String, address: SocketAddr, device_capabilities: DeviceCapabilities) -> Self { + crate::node_service::node_service_client::NodeServiceClient::connect(address); + + Self { + node_id, + address, + device_capabilities, + } + } +} \ No newline at end of file