diff --git a/src/discovery.rs b/src/discovery.rs index 374d0bc..61d692c 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -1,10 +1,18 @@ use std::net::{IpAddr, SocketAddr}; +use std::time::Duration; use tokio::net::UdpSocket; use tracing::debug; use socket2::{Socket, Domain, Type, Protocol}; use serde::{Deserialize, Serialize}; +use crate::network::BroadcastCreationInfo; use crate::topology::DeviceCapabilities; +struct NodeInfo { + node_id: String, + node_port: u16, + device_capabilities: DeviceCapabilities, +} + #[derive(Debug, Serialize, Deserialize)] struct DiscoveryMessage { #[serde(rename = "type")] @@ -12,23 +20,56 @@ struct DiscoveryMessage { node_id: String, grpc_port: u16, device_capabilities: DeviceCapabilities, - priority: u32, + priority: u8, interface_name: String, interface_type: String, } -async fn broadcast(address: SocketAddr) { +async fn listen( + broadcast_creation_info: BroadcastCreationInfo, + node_info: NodeInfo, + broadcast_port: u16, + broadcast_interval: Duration, +) { + let socket_addr = SocketAddr::new(IpAddr::V4(broadcast_creation_info.bind_address), 0); + + let socket = bind_to_address(socket_addr); + + let message = serde_json::to_vec(&DiscoveryMessage { + message_type: "discovery".to_string(), + node_id: node_info.node_id, + grpc_port: node_info.node_port, + device_capabilities: node_info.device_capabilities, + priority: broadcast_creation_info.interface_type.priority(), + interface_name: broadcast_creation_info.interface_name, + interface_type: broadcast_creation_info.interface_type.to_string(), + }) + .unwrap(); + + loop { + socket + .send_to( + &message, + SocketAddr::new( + IpAddr::V4(broadcast_creation_info.broadcast_address), + broadcast_port, + ), + ) + .await + .unwrap(); + + tokio::time::sleep(broadcast_interval).await; + } +} + +fn bind_to_address(address: SocketAddr) -> UdpSocket { let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap(); socket.set_broadcast(true).unwrap(); socket.set_reuse_address(true).unwrap(); #[cfg(not(target_os = "windows"))] socket.set_reuse_port(true).unwrap(); - + socket.bind(&address.into()).unwrap(); - let udp: UdpSocket = UdpSocket::from_std(socket.into()).unwrap(); - // - // loop { - // udp.send_to() - // } + UdpSocket::from_std(socket.into()).unwrap() } diff --git a/src/network.rs b/src/network.rs index 553f995..33f2655 100644 --- a/src/network.rs +++ b/src/network.rs @@ -10,7 +10,7 @@ use system_configuration::sys::network_configuration::SCNetworkInterfaceRef; use tokio::net::UdpSocket; #[derive(Copy, Clone, Debug)] -enum InterfaceType { +pub enum InterfaceType { ContainerVirtual, Loopback, Thunderbolt, @@ -21,7 +21,7 @@ enum InterfaceType { } impl InterfaceType { - fn priority(&self) -> u8 { + pub fn priority(&self) -> u8 { match self { InterfaceType::ContainerVirtual => 7, InterfaceType::Loopback => 6, @@ -48,15 +48,16 @@ impl ToString for InterfaceType { } } -struct BroadcastCreationInfo { - interface_name: String, - interface_type: InterfaceType, +#[derive(Debug, Clone)] +pub struct BroadcastCreationInfo { + pub interface_name: String, + pub interface_type: InterfaceType, - bind_address: Ipv4Addr, - broadcast_address: Ipv4Addr, + pub bind_address: Ipv4Addr, + pub broadcast_address: Ipv4Addr, } -fn get_broadcast_creation_info() -> Vec { +pub fn get_broadcast_creation_info() -> Vec { let interfaces = NetworkInterface::show().unwrap(); let mut broadcast_info = Vec::new(); @@ -135,74 +136,7 @@ fn determine_interface_type(name: &str) -> InterfaceType { } } -struct NodeInfo { - node_id: String, - node_port: u16, - device_capabilities: DeviceCapabilities, -} - -#[derive(Debug, Serialize, Deserialize)] -struct DiscoveryMessage { - #[serde(rename = "type")] - message_type: String, - node_id: String, - grpc_port: u16, - device_capabilities: DeviceCapabilities, - priority: u8, - interface_name: String, - interface_type: String, -} - -async fn listen( - broadcast_creation_info: BroadcastCreationInfo, - node_info: NodeInfo, - broadcast_port: u16, - broadcast_interval: Duration, -) { - let socket_addr = SocketAddr::new(IpAddr::V4(broadcast_creation_info.bind_address), 0); - - let socket = bind_to_address(socket_addr); - - let message = serde_json::to_vec(&DiscoveryMessage { - message_type: "discovery".to_string(), - node_id: node_info.node_id, - grpc_port: node_info.node_port, - device_capabilities: node_info.device_capabilities, - priority: broadcast_creation_info.interface_type.priority(), - interface_name: broadcast_creation_info.interface_name, - interface_type: broadcast_creation_info.interface_type.to_string(), - }) - .unwrap(); - - loop { - socket - .send_to( - &message, - SocketAddr::new( - IpAddr::V4(broadcast_creation_info.broadcast_address), - broadcast_port, - ), - ) - .await - .unwrap(); - - tokio::time::sleep(broadcast_interval).await; - } -} - -fn bind_to_address(address: SocketAddr) -> UdpSocket { - let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap(); - socket.set_broadcast(true).unwrap(); - socket.set_reuse_address(true).unwrap(); - - #[cfg(not(target_os = "windows"))] - socket.set_reuse_port(true).unwrap(); - - socket.bind(&address.into()).unwrap(); - UdpSocket::from_std(socket.into()).unwrap() -} - #[test] fn test_interfaces() { - dbg!(get_interfaces()); + dbg!(get_broadcast_creation_info()); }