Skip to content

Instantly share code, notes, and snippets.

@TheOnlyArtz
Created October 26, 2021 19:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TheOnlyArtz/c47b651a429301a12c47aeb9ae4253f2 to your computer and use it in GitHub Desktop.
Save TheOnlyArtz/c47b651a429301a12c47aeb9ae4253f2 to your computer and use it in GitHub Desktop.
use net2::TcpBuilder;
use socket2::{Domain, Socket, Type};
use std::io::prelude::*;
use std::net::{SocketAddr, TcpListener, TcpStream};
fn main() -> std::io::Result<()> {
let connection_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
connection_socket.reuse_address().unwrap();
let connection_address: SocketAddr = "178.128.32.250:3000".parse().unwrap();
// connection_socket.bind(&connection_address.into())?;
// connection_socket.bind(&connection_address.into())?;
connection_socket.connect(&connection_address.into())?;
// let connection_builder = TcpBuilder::new_v4()?;
// connection_builder.reuse_address(true).unwrap();
let mut stream: TcpStream = connection_socket.into();
let formatted_msg = format!(
"{}:{}",
stream.local_addr()?.ip(),
stream.local_addr()?.port()
);
println!("[ME -> S] publishing local endpoint {}", formatted_msg);
stream.write(formatted_msg.as_bytes())?;
loop {
let mut buf = [0; 1024];
let size = stream.read(&mut buf).unwrap();
let buf = String::from_utf8(buf[..size].to_vec()).unwrap();
println!("[S -> ME] {}", buf);
let cloned_stream = stream.try_clone().unwrap();
std::thread::spawn(move || {
let listen_on = cloned_stream.local_addr().unwrap().to_string();
println!(
"[LISTENING] on the same port used to connect to S {}",
listen_on
);
listen(cloned_stream.local_addr().unwrap().to_string()).unwrap();
});
// PUBLIC
let buf_clone = buf.clone();
std::thread::spawn(move || {
let ips: Vec<&str> = buf_clone.split("|").collect();
let connect_to = ips.get(0).unwrap();
connect(connect_to, "public").unwrap();
});
// PRIVATE
std::thread::spawn(move || {
let ips: Vec<&str> = (&buf).split("|").collect();
let connect_to = ips.get(1).unwrap();
connect(connect_to, "private").unwrap();
});
}
// Ok(())
}
fn connect(ip: &str, flag: &'static str) -> std::io::Result<()> {
loop {
println!("[ME -> B] Trying to connect to {} which is {}", ip, flag);
let connection_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
connection_socket.reuse_address().unwrap();
let connection_address: SocketAddr = ip.parse().unwrap();
let stream_connection = connection_socket.connect(&connection_address.into());
if stream_connection.is_err() {
println!("[ME -> B] Connection failed: repeating in 3 seconds");
std::thread::sleep(std::time::Duration::from_millis(3_000));
continue;
}
let mut stream: TcpStream = connection_socket.into();
loop {
let mut buf = [0; 1024];
let size = stream.read(&mut buf).unwrap();
let buf = String::from_utf8(buf[..size].to_vec()).unwrap();
if size != 0 {
println!("[B -> ME]: {}", buf);
}
}
}
}
fn listen(ip: String) -> std::io::Result<()> {
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
socket.reuse_address().unwrap();
let address: SocketAddr = ip.parse().unwrap();
let address = address.into();
socket.bind(&address)?;
socket.listen(128)?;
let listener: TcpListener = socket.into();
for stream in listener.incoming() {
let stream = stream.unwrap();
println!(
"[B -> ME] PEER: {:?} | LOCAL: {:?}",
stream.peer_addr().unwrap(),
stream.local_addr().unwrap()
);
}
Ok(())
}
use std::collections::HashMap;
use std::io::prelude::*;
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender};
#[derive(Debug, Clone)]
struct Peer {
pub local_address: String,
pub local_port: u16,
pub remote_address: String,
pub remote_port: u16,
}
fn handle_client(mut socket: TcpStream, peers: Arc<Mutex<Vec<Peer>>>, hosts_tx: Sender<(std::string::String, std::string::String)>) -> std::io::Result<()> {
let stringified_address = socket.peer_addr().unwrap().ip().to_string();
let socket_port = socket.peer_addr().unwrap().port();
loop {
let mut buf = [0; 1024];
let size = socket.read(&mut buf);
if buf.len() == 0 || size.is_err() {
let mut lock = peers.lock().unwrap();
let mut iter = lock.iter();
let i = iter.position(|x| {
x.remote_address == stringified_address && x.remote_port == socket_port
});
if let Some(index) = i {
lock.remove(index);
println!("Removed: {:?}", lock);
}
// drop(lock);
return Ok(());
}
let buf = String::from_utf8(buf[..size.unwrap()].to_vec()).unwrap();
println!("[INCOMING] from {} => {}", socket.peer_addr().unwrap(), buf);
let mut local_elements = buf.split(":");
// since it's a POC - it needs to be set and done so let us assume
// that the message looks like xxx.xxx.xxx.xxx:ppppp
let local_address = local_elements.next().unwrap();
let local_port = local_elements.next().unwrap();
let peer = Peer {
local_address: local_address.to_string(),
local_port: local_port.parse::<u16>().unwrap(),
remote_address: socket.peer_addr().unwrap().ip().to_string(),
remote_port: socket
.peer_addr()
.unwrap()
.port()
.to_string()
.parse::<u16>()
.unwrap(),
};
let mut lock = peers.lock().unwrap();
lock.push(peer);
for p in lock.iter() {
let filtered =
filter_peers(&lock, String::from(&p.remote_address), p.remote_port);
if filtered.len() > 0 {
let sent = hosts_tx.send((format!("{}:{}", p.remote_address, p.remote_port), encode_peers(&filtered)));
if let Err(e) = sent {
println!("Error sending payload to channel {}", e);
}
}
}
// drop(lock);
}
}
fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("178.128.32.250:3000")?;
let peers: Arc<Mutex<Vec<Peer>>> = Arc::new(Mutex::new(Vec::<Peer>::new()));
let connections: Arc<Mutex<HashMap<String, TcpStream>>> = Arc::new(Mutex::new(HashMap::<String, TcpStream>::new()));
let (hosts_tx, hosts_rx) = channel::<(String, String)>();
let cloned_connections = Arc::clone(&connections);
// This is the loop which is listening for incoming messages
// from the channel
// the idea behind this channel is to send payloads to the desired
// socket connections
std::thread::spawn(move || {
loop {
let recv = hosts_rx.recv();
if recv.is_err() {
println!("Recv error !");
break
}
// Get the desired socket via the key
let (key, payload) = recv.unwrap();
let mut lock = cloned_connections.lock().unwrap();
let target = lock.get_mut(&key);
if let Some(target) = target {
let written = target.write(payload.as_bytes());
if let Err(e) = written {
println!("Error sending payload to {}: {}", key, e);
}
}
// drop(lock);
}
});
for stream in listener.incoming() {
let peers_arc = Arc::clone(&peers);
let stream = stream.unwrap();
let stringified_address = stream.peer_addr().unwrap().ip().to_string();
let stream_port = stream.peer_addr().unwrap().port();
let key = format!("{}:{}", stringified_address, stream_port);
let mut lock = connections.lock().unwrap();
lock.insert(key, stream.try_clone().unwrap());
// drop(lock);
let hosts_tx_clone = hosts_tx.clone();
std::thread::spawn(move || {
let handled = handle_client(stream, peers_arc, hosts_tx_clone);
if let Err(e) = handled {
println!("Handling client crashed ! {}", e);
}
});
}
Ok(())
}
// filter is remote address
fn filter_peers(peers: &Vec<Peer>, filter_ip: String, filter_port: u16) -> Vec<Peer> {
let mut result: Vec<Peer> = vec![];
for i in peers {
if i.remote_address == filter_ip && i.remote_port == filter_port {
continue;
}
result.push(i.clone());
}
result
}
fn encode_peers(peers: &Vec<Peer>) -> String {
let mut keys: Vec<String> = vec![];
for p in peers {
keys.push(format!("{}:{}|{}:{}", p.remote_address, p.remote_port, p.local_address, p.local_port));
}
keys.join(",")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment