diff --git a/Cargo.lock b/Cargo.lock index f2ec38e..54fd630 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,12 +163,37 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" +[[package]] +name = "cc" +version = "1.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7777341816418c02e033934a09f20dc0ccaf65a5201ef8a450ae0105a573fda" +dependencies = [ + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "crc32fast" version = "1.4.2" @@ -204,10 +229,13 @@ dependencies = [ name = "exo-rs" version = "0.1.0" dependencies = [ + "network-interface", "prost", "serde", "serde_json", - "thiserror", + "socket2", + "system-configuration", + "thiserror 2.0.11", "tokio", "tonic", "tonic-build", @@ -560,6 +588,18 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "network-interface" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433419f898328beca4f2c6c73a1b52540658d92b0a99f0269330457e0fd998d5" +dependencies = [ + "cc", + "libc", + "thiserror 1.0.69", + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -897,6 +937,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -948,6 +994,27 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tempfile" version = "3.16.0" @@ -962,13 +1029,33 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.11", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 40f66df..d737ec6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,9 @@ tonic = { version = "0.12.3", features = ["gzip"] } thiserror = "2.0" tracing = "0.1" tracing-subscriber = "0.3" +socket2 = "0.5.8" +system-configuration = "0.6.1" +network-interface = "2.0.0" [build-dependencies] tonic-build = "0.12.3" diff --git a/src/discovery.rs b/src/discovery.rs new file mode 100644 index 0000000..374d0bc --- /dev/null +++ b/src/discovery.rs @@ -0,0 +1,34 @@ +use std::net::{IpAddr, SocketAddr}; +use tokio::net::UdpSocket; +use tracing::debug; +use socket2::{Socket, Domain, Type, Protocol}; +use serde::{Deserialize, Serialize}; +use crate::topology::DeviceCapabilities; + +#[derive(Debug, Serialize, Deserialize)] +struct DiscoveryMessage { + #[serde(rename = "type")] + message_type: String, + node_id: String, + grpc_port: u16, + device_capabilities: DeviceCapabilities, + priority: u32, + interface_name: String, + interface_type: String, +} + +async fn broadcast(address: SocketAddr) { + let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap(); + socket.set_broadcast(true).unwrap(); + socket.set_reuse_address(true).unwrap(); + + #[cfg(not(target_os = "windows"))] + socket.set_reuse_port(true).unwrap(); + + socket.bind(&address.into()).unwrap(); + let udp: UdpSocket = UdpSocket::from_std(socket.into()).unwrap(); + // + // loop { + // udp.send_to() + // } +} diff --git a/src/main.rs b/src/main.rs index a9542b1..0453fd3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ mod topology; mod orchestration; +mod discovery; +mod network; use serde::{Deserialize, Serialize}; use serde_json::Value; diff --git a/src/network.rs b/src/network.rs new file mode 100644 index 0000000..567b5f4 --- /dev/null +++ b/src/network.rs @@ -0,0 +1,132 @@ +use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig}; +use socket2::{Domain, Protocol, Socket, Type}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::time::Duration; +use serde::{Deserialize, Serialize}; +use tokio::net::UdpSocket; +use crate::topology::DeviceCapabilities; + +#[derive(Copy, Clone, Debug)] +enum InterfaceType { + ContainerVirtual, + Loopback, + Thunderbolt, + Ethernet, + WiFi, + ExternalVirtual, + Other, +} + +impl InterfaceType { + fn priority(&self) -> u8 { + match self { + InterfaceType::ContainerVirtual => 7, + InterfaceType::Loopback => 6, + InterfaceType::Thunderbolt => 5, + InterfaceType::Ethernet => 4, + InterfaceType::WiFi => 3, + InterfaceType::ExternalVirtual => 1, + InterfaceType::Other => 2, + } + } +} + +impl ToString for InterfaceType { + fn to_string(&self) -> String { + match self { + InterfaceType::ContainerVirtual => "Container Virtual".to_string(), + InterfaceType::Loopback => "Loopback".to_string(), + InterfaceType::Thunderbolt => "Thunderbolt".to_string(), + InterfaceType::Ethernet => "Ethernet".to_string(), + InterfaceType::WiFi => "WiFi".to_string(), + InterfaceType::ExternalVirtual => "External Virtual".to_string(), + InterfaceType::Other => "Other".to_string(), + } + } +} + + +struct BroadcastCreationInfo { + interface_name: String, + interface_type: InterfaceType, + + bind_address: Ipv4Addr, + broadcast_address: Ipv4Addr, +} + +fn get_broadcast_creation_info() -> Vec {} + +struct NodeInfo { + node_id: String, + node_port: u16, + device_capabilities: DeviceCapabilities, +} + +#[derive(Debug, Serialize, Deserialize)] +struct DiscoveryMessage { + #[serde(rename = "type")] + message_type: String, + node_id: String, + grpc_port: u16, + device_capabilities: DeviceCapabilities, + priority: u8, + interface_name: String, + interface_type: String, +} + +async fn listen(broadcast_creation_info: BroadcastCreationInfo, node_info: NodeInfo, broadcast_port: u16, broadcast_interval: Duration) { + let socket_addr = SocketAddr::new( + IpAddr::V4(broadcast_creation_info.bind_address), + 0, + ); + + let socket = bind_to_address(socket_addr); + + let message = serde_json::to_vec(&DiscoveryMessage { + message_type: "discovery".to_string(), + node_id: node_info.node_id, + grpc_port: node_info.node_port, + device_capabilities: node_info.device_capabilities, + priority: broadcast_creation_info.interface_type.priority(), + interface_name: broadcast_creation_info.interface_name, + interface_type: broadcast_creation_info.interface_type.to_string(), + }).unwrap(); + + loop { + socket.send_to( + &message, + SocketAddr::new(IpAddr::V4(broadcast_creation_info.broadcast_address), broadcast_port), + ).await.unwrap(); + + tokio::time::sleep(broadcast_interval).await; + } +} + +fn bind_to_address(address: SocketAddr) -> UdpSocket { + let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap(); + socket.set_broadcast(true).unwrap(); + socket.set_reuse_address(true).unwrap(); + + #[cfg(not(target_os = "windows"))] + socket.set_reuse_port(true).unwrap(); + + socket.bind(&address.into()).unwrap(); + UdpSocket::from_std(socket.into()).unwrap() +} + +#[test] +fn test_interfaces() { + let raw = NetworkInterface::show().unwrap(); + let names_and_addrs = raw + .iter() + .flat_map(|network_interface| { + let v4_addrs = network_interface + .addr + .iter() + .filter(|addr: &&Addr| matches!(addr, Addr::V4(..))) + .map(|addr: &Addr| (network_interface.name.clone(), *addr)); + + v4_addrs + }) + .collect::>(); +}