pub(crate) mod command;
mod bootstrap;
mod comm;
mod core;
mod dispatcher;
mod enduser_registry;
mod event_stream;
mod split_barrier;
#[cfg(test)]
pub(crate) mod tests;
pub use self::event_stream::EventStream;
use self::{
comm::{Comm, ConnectionEvent},
command::Command,
core::Core,
dispatcher::Dispatcher,
};
use crate::{
ed25519,
error::Result,
event::{Elders, Event, NodeElderChange},
messages::RoutingMsgUtils,
network::NetworkUtils,
node::Node,
peer::PeerUtils,
section::{SectionAuthorityProviderUtils, SectionUtils},
Error, TransportConfig, MIN_ADULT_AGE,
};
use bytes::Bytes;
use ed25519_dalek::{Keypair, PublicKey, Signature, Signer, KEYPAIR_LENGTH};
use itertools::Itertools;
use secured_linked_list::SecuredLinkedList;
use sn_messaging::{
client::ClientMsg,
node::{Peer, RoutingMsg},
DestInfo, DstLocation, EndUser, Itinerary, MessageType, SectionAuthorityProvider, WireMsg,
};
use std::{
collections::BTreeSet,
fmt::{self, Debug, Formatter},
net::SocketAddr,
sync::Arc,
};
use tokio::{sync::mpsc, task};
use xor_name::{Prefix, XorName};
#[derive(Debug)]
pub struct Config {
pub first: bool,
pub keypair: Option<Keypair>,
pub transport_config: TransportConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
first: false,
keypair: None,
transport_config: TransportConfig::default(),
}
}
}
pub struct Routing {
dispatcher: Arc<Dispatcher>,
}
static EVENT_CHANNEL_SIZE: usize = 20;
impl Routing {
pub async fn new(config: Config) -> Result<(Self, EventStream)> {
let keypair = config.keypair.unwrap_or_else(|| {
ed25519::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE)
});
let node_name = ed25519::name(&keypair.public);
let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
let (connection_event_tx, mut connection_event_rx) = mpsc::channel(1);
let (state, comm, backlog) = if config.first {
let keypair = ed25519::gen_keypair(&Prefix::default().range_inclusive(), 255);
let node_name = ed25519::name(&keypair.public);
info!("{} Starting a new network as the genesis node.", node_name);
let comm = Comm::new(config.transport_config, connection_event_tx).await?;
let node = Node::new(keypair, comm.our_connection_info());
let state = Core::first_node(node, event_tx)?;
let section = state.section();
let elders = Elders {
prefix: *section.prefix(),
key: *section.chain().last_key(),
remaining: BTreeSet::new(),
added: section.authority_provider().names(),
removed: BTreeSet::new(),
};
state
.send_event(Event::EldersChanged {
elders,
self_status_change: NodeElderChange::Promoted,
})
.await;
(state, comm, vec![])
} else {
info!("{} Bootstrapping a new node.", node_name);
let (comm, bootstrap_addr) =
Comm::bootstrap(config.transport_config, connection_event_tx).await?;
let node = Node::new(keypair, comm.our_connection_info());
let (node, section, backlog) =
bootstrap::join(node, &comm, &mut connection_event_rx, bootstrap_addr).await?;
let state = Core::new(node, section, None, event_tx);
(state, comm, backlog)
};
let dispatcher = Arc::new(Dispatcher::new(state, comm));
let event_stream = EventStream::new(event_rx);
info!("{} Bootstrapped!", node_name);
for (message, sender, dest_info) in backlog {
dispatcher
.clone()
.handle_commands(Command::HandleMessage {
message,
sender: Some(sender),
dest_info,
})
.await?;
}
let _ = task::spawn(handle_connection_events(
dispatcher.clone(),
connection_event_rx,
));
let routing = Self { dispatcher };
Ok((routing, event_stream))
}
pub async fn set_joins_allowed(&self, joins_allowed: bool) -> Result<()> {
let command = Command::SetJoinsAllowed(joins_allowed);
self.dispatcher.clone().handle_commands(command).await
}
pub async fn propose_offline(&self, name: XorName) -> Result<()> {
if !self.is_elder().await {
return Err(Error::InvalidState);
}
let command = Command::ProposeOffline(name);
self.dispatcher.clone().handle_commands(command).await
}
pub async fn start_connectivity_test(&self, name: XorName) -> Result<()> {
let command = Command::StartConnectivityTest(name);
self.dispatcher.clone().handle_commands(command).await
}
pub async fn age(&self) -> u8 {
self.dispatcher.core.read().await.node().age()
}
pub async fn public_key(&self) -> PublicKey {
self.dispatcher.core.read().await.node().keypair.public
}
pub async fn keypair_as_bytes(&self) -> [u8; KEYPAIR_LENGTH] {
self.dispatcher.core.read().await.node().keypair.to_bytes()
}
pub async fn sign_as_node(&self, data: &[u8]) -> Signature {
self.dispatcher.core.read().await.node().keypair.sign(data)
}
pub async fn sign_as_elder(
&self,
data: &[u8],
public_key: &bls::PublicKey,
) -> Result<bls::SignatureShare> {
self.dispatcher
.core
.read()
.await
.sign_with_section_key_share(data, public_key)
}
pub async fn verify(&self, data: &[u8], signature: &Signature) -> bool {
self.dispatcher
.core
.read()
.await
.node()
.keypair
.verify(data, signature)
.is_ok()
}
pub async fn name(&self) -> XorName {
self.dispatcher.core.read().await.node().name()
}
pub fn our_connection_info(&self) -> SocketAddr {
self.dispatcher.comm.our_connection_info()
}
pub async fn section_chain(&self) -> SecuredLinkedList {
self.dispatcher.core.read().await.section_chain().clone()
}
pub async fn our_prefix(&self) -> Prefix {
*self.dispatcher.core.read().await.section().prefix()
}
pub async fn matches_our_prefix(&self, name: &XorName) -> bool {
self.our_prefix().await.matches(name)
}
pub async fn is_elder(&self) -> bool {
self.dispatcher.core.read().await.is_elder()
}
pub async fn our_elders(&self) -> Vec<Peer> {
self.dispatcher
.core
.read()
.await
.section()
.authority_provider()
.peers()
.collect()
}
pub async fn our_elders_sorted_by_distance_to(&self, name: &XorName) -> Vec<Peer> {
self.our_elders()
.await
.into_iter()
.sorted_by(|lhs, rhs| name.cmp_distance(lhs.name(), rhs.name()))
.collect()
}
pub async fn our_adults(&self) -> Vec<Peer> {
self.dispatcher
.core
.read()
.await
.section()
.adults()
.copied()
.collect()
}
pub async fn our_adults_sorted_by_distance_to(&self, name: &XorName) -> Vec<Peer> {
self.our_adults()
.await
.into_iter()
.sorted_by(|lhs, rhs| name.cmp_distance(lhs.name(), rhs.name()))
.collect()
}
pub async fn our_section(&self) -> SectionAuthorityProvider {
self.dispatcher
.core
.read()
.await
.section()
.authority_provider()
.clone()
}
pub async fn other_sections(&self) -> Vec<SectionAuthorityProvider> {
self.dispatcher
.core
.read()
.await
.network()
.all()
.cloned()
.collect()
}
pub async fn section_key(&self, prefix: &Prefix) -> Option<bls::PublicKey> {
self.dispatcher.core.read().await.section_key(prefix)
}
pub async fn matching_section(&self, name: &XorName) -> Result<SectionAuthorityProvider> {
let state = self.dispatcher.core.read().await;
state.matching_section(name)
}
pub async fn send_message(
&self,
itinerary: Itinerary,
content: Bytes,
additional_proof_chain_key: Option<bls::PublicKey>,
) -> Result<()> {
if let DstLocation::EndUser(EndUser { socket_id, xorname }) = itinerary.dst {
if self.our_prefix().await.matches(&xorname) {
let addr = self
.dispatcher
.core
.read()
.await
.get_socket_addr(socket_id)
.copied();
if let Some(socket_addr) = addr {
debug!("Sending client msg to {:?}", socket_addr);
return self
.send_message_to_client(socket_addr, xorname, ClientMsg::from(content)?)
.await;
} else {
debug!(
"Could not find socketaddr corresponding to socket_id {:?}",
socket_id
);
debug!("Relaying user message instead.. (Command::SendUserMessage)");
}
} else {
debug!("Relaying message with sending user message (Command::SendUserMessage)");
}
}
let command = Command::SendUserMessage {
itinerary,
content,
additional_proof_chain_key,
};
self.dispatcher.clone().handle_commands(command).await
}
async fn send_message_to_client(
&self,
recipient: SocketAddr,
user_xorname: XorName,
message: ClientMsg,
) -> Result<()> {
let command = Command::SendMessage {
recipients: vec![(user_xorname, recipient)],
delivery_group_size: 1,
message: MessageType::Client {
msg: message,
dest_info: DestInfo {
dest: user_xorname,
dest_section_pk: *self.section_chain().await.last_key(),
},
},
};
self.dispatcher.clone().handle_commands(command).await
}
pub async fn public_key_set(&self) -> Result<bls::PublicKeySet> {
self.dispatcher.core.read().await.public_key_set()
}
pub async fn our_history(&self) -> SecuredLinkedList {
self.dispatcher.core.read().await.section().chain().clone()
}
pub async fn our_index(&self) -> Result<usize> {
self.dispatcher.core.read().await.our_index()
}
}
impl Drop for Routing {
fn drop(&mut self) {
self.dispatcher.terminate()
}
}
impl Debug for Routing {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "Routing")
}
}
async fn handle_connection_events(
dispatcher: Arc<Dispatcher>,
mut incoming_conns: mpsc::Receiver<ConnectionEvent>,
) {
while let Some(event) = incoming_conns.recv().await {
match event {
ConnectionEvent::Received((src, bytes)) => {
trace!("New message ({} bytes) received from: {}", bytes.len(), src);
handle_message(dispatcher.clone(), bytes, src).await;
}
ConnectionEvent::Disconnected(addr) => {
trace!("Lost connection to {:?}", addr);
let _ = dispatcher
.clone()
.handle_commands(Command::HandleConnectionLost(addr))
.await;
}
}
}
}
async fn handle_message(dispatcher: Arc<Dispatcher>, bytes: Bytes, sender: SocketAddr) {
let wire_msg = match WireMsg::from(bytes) {
Ok(wire_msg) => wire_msg,
Err(error) => {
error!("Failed to deserialize message header: {}", error);
return;
}
};
let span = {
let mut state = dispatcher.core.write().await;
if !state.add_to_filter(&wire_msg).await {
trace!(
"not handling message - already handled: {:?}",
wire_msg.msg_id()
);
return;
}
trace_span!("handle_message", name = %state.node().name(), %sender)
};
let _span_guard = span.enter();
let message_type = match wire_msg.to_message() {
Ok(message_type) => message_type,
Err(error) => {
error!(
"Failed to deserialize message payload ({:?}): {}",
wire_msg.msg_id(),
error
);
return;
}
};
match message_type {
MessageType::SectionInfo { msg, dest_info } => {
let command = Command::HandleSectionInfoMsg {
sender,
message: msg,
dest_info,
};
let _ = task::spawn(dispatcher.handle_commands(command));
}
MessageType::Routing { msg, dest_info } => {
if let Err(err) = RoutingMsg::check_signature(&msg) {
error!(
"Discarding message received ({:?}) due to invalid signature: {:?}",
msg.id, err
);
return;
}
let command = Command::HandleMessage {
message: msg,
sender: Some(sender),
dest_info,
};
let _ = task::spawn(dispatcher.handle_commands(command));
}
MessageType::Node {
msg: _,
dest_info: _,
src_section_pk: _,
} => unimplemented!(),
MessageType::Client { msg, .. } => {
let end_user = dispatcher
.core
.read()
.await
.get_enduser_by_addr(&sender)
.copied();
let end_user = match end_user {
Some(end_user) => {
debug!(
"Message from client {}, socket id already exists: {:?}",
sender, end_user
);
end_user
}
None => {
debug!("First message from client {}, creating a socket id", sender);
match dispatcher.core.write().await.try_add(sender) {
Ok(end_user) => end_user,
Err(err) => {
error!(
"Failed to cache client socket address for message {:?}: {:?}",
msg, err
);
return;
}
}
}
};
let event = Event::ClientMsgReceived {
msg: Box::new(msg),
user: end_user,
};
dispatcher.send_event(event).await;
}
}
}