diff --git a/src/discovery.rs b/src/discovery.rs index bbbb507..f220ab7 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -5,12 +5,7 @@ use socket2::{Domain, Protocol, Socket, Type}; use std::net::{IpAddr, SocketAddr}; use std::time::Duration; use tokio::net::UdpSocket; - -struct NodeInfo { - node_id: String, - node_port: u16, - device_capabilities: DeviceCapabilities, -} +use crate::udp_listen::NodeInfo; #[derive(Debug, Serialize, Deserialize)] pub struct DiscoveryMessage { @@ -36,43 +31,6 @@ fn bind_to_address(address: SocketAddr) -> UdpSocket { UdpSocket::from_std(socket.into()).unwrap() } -async fn listen( - broadcast_creation_info: BroadcastCreationInfo, - node_info: NodeInfo, - broadcast_port: u16, - broadcast_interval: Duration, -) { - let socket_addr = SocketAddr::new(IpAddr::V4(broadcast_creation_info.bind_address), 0); - - let socket = bind_to_address(socket_addr); - - let message = serde_json::to_vec(&DiscoveryMessage { - message_type: "discovery".to_string(), - node_id: node_info.node_id, - grpc_port: node_info.node_port, - device_capabilities: node_info.device_capabilities, - priority: broadcast_creation_info.interface_type.priority(), - interface_name: broadcast_creation_info.interface_name, - interface_type: broadcast_creation_info.interface_type.to_string(), - }) - .unwrap(); - - loop { - socket - .send_to( - &message, - SocketAddr::new( - IpAddr::V4(broadcast_creation_info.broadcast_address), - broadcast_port, - ), - ) - .await - .unwrap(); - - tokio::time::sleep(broadcast_interval).await; - } -} - pub async fn listen_all( node_info: NodeInfo, broadcast_port: u16, @@ -91,7 +49,7 @@ pub async fn listen_all( let message = serde_json::to_vec(&DiscoveryMessage { message_type: "discovery".to_string(), node_id: node_info.node_id.clone(), - grpc_port: node_info.node_port, + grpc_port: node_info.grpc_port, device_capabilities: node_info.device_capabilities.clone(), priority: broadcast_creation_info.interface_type.priority(), interface_name: broadcast_creation_info.interface_name.clone(), diff --git a/src/udp_listen.rs b/src/udp_listen.rs index f289f51..6cd7546 100644 --- a/src/udp_listen.rs +++ b/src/udp_listen.rs @@ -9,21 +9,24 @@ use tokio::select; use tokio::sync::mpsc::UnboundedSender; use tonic::transport::Error; use tracing::{debug, error, info}; +use crate::topology::DeviceCapabilities; #[derive(Debug, Clone)] -struct NodeInfo { - id: String, - listen_port: u16, - allowed_peer_ids: Option>, - allowed_interfaces: Option>, - discovery_timeout: Duration, +pub struct NodeInfo { + pub node_id: String, + pub discovery_listen_port: u16, + pub grpc_port: u16, + pub allowed_peer_ids: Option>, + pub allowed_interfaces: Option>, + pub discovery_timeout: Duration, + pub device_capabilities: DeviceCapabilities, } pub async fn listen_for_discovery( node_info: NodeInfo, tx: UnboundedSender<(SocketAddr, DiscoveryMessage)>, ) { - let socket = UdpSocket::bind(format!("0.0.0.0:{}", node_info.listen_port)) + let socket = UdpSocket::bind(format!("0.0.0.0:{}", node_info.discovery_listen_port)) .await .unwrap(); let mut buf = vec![0u8; 65535]; @@ -45,7 +48,7 @@ pub async fn listen_for_discovery( }; // Validate message - if message.message_type != "discovery" || message.node_id == node_info.id { + if message.message_type != "discovery" || message.node_id == node_info.node_id { continue; }