Listen all at once

This commit is contained in:
Joshua Coles 2025-02-12 10:33:43 +00:00
parent c949879147
commit 400144fe46

View File

@ -1,11 +1,10 @@
use crate::network::BroadcastCreationInfo;
use crate::topology::DeviceCapabilities;
use serde::{Deserialize, Serialize};
use socket2::{Domain, Protocol, Socket, Type};
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;
use tokio::net::UdpSocket;
use tracing::debug;
use socket2::{Socket, Domain, Type, Protocol};
use serde::{Deserialize, Serialize};
use crate::network::BroadcastCreationInfo;
use crate::topology::DeviceCapabilities;
struct NodeInfo {
node_id: String,
@ -25,6 +24,18 @@ struct DiscoveryMessage {
interface_type: String,
}
fn bind_to_address(address: SocketAddr) -> UdpSocket {
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap();
socket.set_broadcast(true).unwrap();
socket.set_reuse_address(true).unwrap();
#[cfg(not(target_os = "windows"))]
socket.set_reuse_port(true).unwrap();
socket.bind(&address.into()).unwrap();
UdpSocket::from_std(socket.into()).unwrap()
}
async fn listen(
broadcast_creation_info: BroadcastCreationInfo,
node_info: NodeInfo,
@ -44,7 +55,7 @@ async fn listen(
interface_name: broadcast_creation_info.interface_name,
interface_type: broadcast_creation_info.interface_type.to_string(),
})
.unwrap();
.unwrap();
loop {
socket
@ -62,14 +73,46 @@ async fn listen(
}
}
fn bind_to_address(address: SocketAddr) -> UdpSocket {
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap();
socket.set_broadcast(true).unwrap();
socket.set_reuse_address(true).unwrap();
pub async fn listen_all(
node_info: NodeInfo,
broadcast_port: u16,
broadcast_interval: Duration,
broadcast_creation_infos: Vec<BroadcastCreationInfo>,
) {
let sockets_and_messages =
broadcast_creation_infos
.iter()
.map(|broadcast_creation_info: &BroadcastCreationInfo| {
let socket_addr =
SocketAddr::new(IpAddr::V4(broadcast_creation_info.bind_address), 0);
#[cfg(not(target_os = "windows"))]
socket.set_reuse_port(true).unwrap();
let socket = bind_to_address(socket_addr);
socket.bind(&address.into()).unwrap();
UdpSocket::from_std(socket.into()).unwrap()
let message = serde_json::to_vec(&DiscoveryMessage {
message_type: "discovery".to_string(),
node_id: node_info.node_id.clone(),
grpc_port: node_info.node_port,
device_capabilities: node_info.device_capabilities.clone(),
priority: broadcast_creation_info.interface_type.priority(),
interface_name: broadcast_creation_info.interface_name.clone(),
interface_type: broadcast_creation_info.interface_type.to_string(),
})
.unwrap();
(socket, broadcast_creation_info.broadcast_address, message)
});
loop {
for (socket, broadcast_address, message) in sockets_and_messages.clone() {
socket
.send_to(
&message,
SocketAddr::new(IpAddr::V4(broadcast_address), broadcast_port),
)
.await
.unwrap();
}
tokio::time::sleep(broadcast_interval).await;
}
}