diff --git a/src/discovery.rs b/src/discovery.rs index 61d692c..cb86949 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -1,11 +1,10 @@ +use crate::network::BroadcastCreationInfo; +use crate::topology::DeviceCapabilities; +use serde::{Deserialize, Serialize}; +use socket2::{Domain, Protocol, Socket, Type}; use std::net::{IpAddr, SocketAddr}; use std::time::Duration; use tokio::net::UdpSocket; -use tracing::debug; -use socket2::{Socket, Domain, Type, Protocol}; -use serde::{Deserialize, Serialize}; -use crate::network::BroadcastCreationInfo; -use crate::topology::DeviceCapabilities; struct NodeInfo { node_id: String, @@ -25,6 +24,18 @@ struct DiscoveryMessage { interface_type: String, } +fn bind_to_address(address: SocketAddr) -> UdpSocket { + let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap(); + socket.set_broadcast(true).unwrap(); + socket.set_reuse_address(true).unwrap(); + + #[cfg(not(target_os = "windows"))] + socket.set_reuse_port(true).unwrap(); + + socket.bind(&address.into()).unwrap(); + UdpSocket::from_std(socket.into()).unwrap() +} + async fn listen( broadcast_creation_info: BroadcastCreationInfo, node_info: NodeInfo, @@ -44,7 +55,7 @@ async fn listen( interface_name: broadcast_creation_info.interface_name, interface_type: broadcast_creation_info.interface_type.to_string(), }) - .unwrap(); + .unwrap(); loop { socket @@ -62,14 +73,46 @@ async fn listen( } } -fn bind_to_address(address: SocketAddr) -> UdpSocket { - let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap(); - socket.set_broadcast(true).unwrap(); - socket.set_reuse_address(true).unwrap(); +pub async fn listen_all( + node_info: NodeInfo, + broadcast_port: u16, + broadcast_interval: Duration, + broadcast_creation_infos: Vec, +) { + let sockets_and_messages = + broadcast_creation_infos + .iter() + .map(|broadcast_creation_info: &BroadcastCreationInfo| { + let socket_addr = + SocketAddr::new(IpAddr::V4(broadcast_creation_info.bind_address), 0); - #[cfg(not(target_os = "windows"))] - socket.set_reuse_port(true).unwrap(); + let socket = bind_to_address(socket_addr); - socket.bind(&address.into()).unwrap(); - UdpSocket::from_std(socket.into()).unwrap() + let message = serde_json::to_vec(&DiscoveryMessage { + message_type: "discovery".to_string(), + node_id: node_info.node_id.clone(), + grpc_port: node_info.node_port, + device_capabilities: node_info.device_capabilities.clone(), + priority: broadcast_creation_info.interface_type.priority(), + interface_name: broadcast_creation_info.interface_name.clone(), + interface_type: broadcast_creation_info.interface_type.to_string(), + }) + .unwrap(); + + (socket, broadcast_creation_info.broadcast_address, message) + }); + + loop { + for (socket, broadcast_address, message) in sockets_and_messages.clone() { + socket + .send_to( + &message, + SocketAddr::new(IpAddr::V4(broadcast_address), broadcast_port), + ) + .await + .unwrap(); + } + + tokio::time::sleep(broadcast_interval).await; + } }