Start to combine
This commit is contained in:
parent
f56e719e95
commit
4b86df17b9
@ -5,12 +5,7 @@ use socket2::{Domain, Protocol, Socket, Type};
|
|||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
|
use crate::udp_listen::NodeInfo;
|
||||||
struct NodeInfo {
|
|
||||||
node_id: String,
|
|
||||||
node_port: u16,
|
|
||||||
device_capabilities: DeviceCapabilities,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct DiscoveryMessage {
|
pub struct DiscoveryMessage {
|
||||||
@ -36,43 +31,6 @@ fn bind_to_address(address: SocketAddr) -> UdpSocket {
|
|||||||
UdpSocket::from_std(socket.into()).unwrap()
|
UdpSocket::from_std(socket.into()).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn listen(
|
|
||||||
broadcast_creation_info: BroadcastCreationInfo,
|
|
||||||
node_info: NodeInfo,
|
|
||||||
broadcast_port: u16,
|
|
||||||
broadcast_interval: Duration,
|
|
||||||
) {
|
|
||||||
let socket_addr = SocketAddr::new(IpAddr::V4(broadcast_creation_info.bind_address), 0);
|
|
||||||
|
|
||||||
let socket = bind_to_address(socket_addr);
|
|
||||||
|
|
||||||
let message = serde_json::to_vec(&DiscoveryMessage {
|
|
||||||
message_type: "discovery".to_string(),
|
|
||||||
node_id: node_info.node_id,
|
|
||||||
grpc_port: node_info.node_port,
|
|
||||||
device_capabilities: node_info.device_capabilities,
|
|
||||||
priority: broadcast_creation_info.interface_type.priority(),
|
|
||||||
interface_name: broadcast_creation_info.interface_name,
|
|
||||||
interface_type: broadcast_creation_info.interface_type.to_string(),
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
socket
|
|
||||||
.send_to(
|
|
||||||
&message,
|
|
||||||
SocketAddr::new(
|
|
||||||
IpAddr::V4(broadcast_creation_info.broadcast_address),
|
|
||||||
broadcast_port,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
tokio::time::sleep(broadcast_interval).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn listen_all(
|
pub async fn listen_all(
|
||||||
node_info: NodeInfo,
|
node_info: NodeInfo,
|
||||||
broadcast_port: u16,
|
broadcast_port: u16,
|
||||||
@ -91,7 +49,7 @@ pub async fn listen_all(
|
|||||||
let message = serde_json::to_vec(&DiscoveryMessage {
|
let message = serde_json::to_vec(&DiscoveryMessage {
|
||||||
message_type: "discovery".to_string(),
|
message_type: "discovery".to_string(),
|
||||||
node_id: node_info.node_id.clone(),
|
node_id: node_info.node_id.clone(),
|
||||||
grpc_port: node_info.node_port,
|
grpc_port: node_info.grpc_port,
|
||||||
device_capabilities: node_info.device_capabilities.clone(),
|
device_capabilities: node_info.device_capabilities.clone(),
|
||||||
priority: broadcast_creation_info.interface_type.priority(),
|
priority: broadcast_creation_info.interface_type.priority(),
|
||||||
interface_name: broadcast_creation_info.interface_name.clone(),
|
interface_name: broadcast_creation_info.interface_name.clone(),
|
||||||
|
|||||||
@ -9,21 +9,24 @@ use tokio::select;
|
|||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tonic::transport::Error;
|
use tonic::transport::Error;
|
||||||
use tracing::{debug, error, info};
|
use tracing::{debug, error, info};
|
||||||
|
use crate::topology::DeviceCapabilities;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct NodeInfo {
|
pub struct NodeInfo {
|
||||||
id: String,
|
pub node_id: String,
|
||||||
listen_port: u16,
|
pub discovery_listen_port: u16,
|
||||||
allowed_peer_ids: Option<Vec<String>>,
|
pub grpc_port: u16,
|
||||||
allowed_interfaces: Option<Vec<String>>,
|
pub allowed_peer_ids: Option<Vec<String>>,
|
||||||
discovery_timeout: Duration,
|
pub allowed_interfaces: Option<Vec<String>>,
|
||||||
|
pub discovery_timeout: Duration,
|
||||||
|
pub device_capabilities: DeviceCapabilities,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn listen_for_discovery(
|
pub async fn listen_for_discovery(
|
||||||
node_info: NodeInfo,
|
node_info: NodeInfo,
|
||||||
tx: UnboundedSender<(SocketAddr, DiscoveryMessage)>,
|
tx: UnboundedSender<(SocketAddr, DiscoveryMessage)>,
|
||||||
) {
|
) {
|
||||||
let socket = UdpSocket::bind(format!("0.0.0.0:{}", node_info.listen_port))
|
let socket = UdpSocket::bind(format!("0.0.0.0:{}", node_info.discovery_listen_port))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut buf = vec![0u8; 65535];
|
let mut buf = vec![0u8; 65535];
|
||||||
@ -45,7 +48,7 @@ pub async fn listen_for_discovery(
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Validate message
|
// Validate message
|
||||||
if message.message_type != "discovery" || message.node_id == node_info.id {
|
if message.message_type != "discovery" || message.node_id == node_info.node_id {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user