-
-
Save TheOnlyArtz/c47b651a429301a12c47aeb9ae4253f2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use net2::TcpBuilder; | |
use socket2::{Domain, Socket, Type}; | |
use std::io::prelude::*; | |
use std::net::{SocketAddr, TcpListener, TcpStream}; | |
fn main() -> std::io::Result<()> { | |
let connection_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; | |
connection_socket.reuse_address().unwrap(); | |
let connection_address: SocketAddr = "178.128.32.250:3000".parse().unwrap(); | |
// connection_socket.bind(&connection_address.into())?; | |
// connection_socket.bind(&connection_address.into())?; | |
connection_socket.connect(&connection_address.into())?; | |
// let connection_builder = TcpBuilder::new_v4()?; | |
// connection_builder.reuse_address(true).unwrap(); | |
let mut stream: TcpStream = connection_socket.into(); | |
let formatted_msg = format!( | |
"{}:{}", | |
stream.local_addr()?.ip(), | |
stream.local_addr()?.port() | |
); | |
println!("[ME -> S] publishing local endpoint {}", formatted_msg); | |
stream.write(formatted_msg.as_bytes())?; | |
loop { | |
let mut buf = [0; 1024]; | |
let size = stream.read(&mut buf).unwrap(); | |
let buf = String::from_utf8(buf[..size].to_vec()).unwrap(); | |
println!("[S -> ME] {}", buf); | |
let cloned_stream = stream.try_clone().unwrap(); | |
std::thread::spawn(move || { | |
let listen_on = cloned_stream.local_addr().unwrap().to_string(); | |
println!( | |
"[LISTENING] on the same port used to connect to S {}", | |
listen_on | |
); | |
listen(cloned_stream.local_addr().unwrap().to_string()).unwrap(); | |
}); | |
// PUBLIC | |
let buf_clone = buf.clone(); | |
std::thread::spawn(move || { | |
let ips: Vec<&str> = buf_clone.split("|").collect(); | |
let connect_to = ips.get(0).unwrap(); | |
connect(connect_to, "public").unwrap(); | |
}); | |
// PRIVATE | |
std::thread::spawn(move || { | |
let ips: Vec<&str> = (&buf).split("|").collect(); | |
let connect_to = ips.get(1).unwrap(); | |
connect(connect_to, "private").unwrap(); | |
}); | |
} | |
// Ok(()) | |
} | |
fn connect(ip: &str, flag: &'static str) -> std::io::Result<()> { | |
loop { | |
println!("[ME -> B] Trying to connect to {} which is {}", ip, flag); | |
let connection_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; | |
connection_socket.reuse_address().unwrap(); | |
let connection_address: SocketAddr = ip.parse().unwrap(); | |
let stream_connection = connection_socket.connect(&connection_address.into()); | |
if stream_connection.is_err() { | |
println!("[ME -> B] Connection failed: repeating in 3 seconds"); | |
std::thread::sleep(std::time::Duration::from_millis(3_000)); | |
continue; | |
} | |
let mut stream: TcpStream = connection_socket.into(); | |
loop { | |
let mut buf = [0; 1024]; | |
let size = stream.read(&mut buf).unwrap(); | |
let buf = String::from_utf8(buf[..size].to_vec()).unwrap(); | |
if size != 0 { | |
println!("[B -> ME]: {}", buf); | |
} | |
} | |
} | |
} | |
fn listen(ip: String) -> std::io::Result<()> { | |
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; | |
socket.reuse_address().unwrap(); | |
let address: SocketAddr = ip.parse().unwrap(); | |
let address = address.into(); | |
socket.bind(&address)?; | |
socket.listen(128)?; | |
let listener: TcpListener = socket.into(); | |
for stream in listener.incoming() { | |
let stream = stream.unwrap(); | |
println!( | |
"[B -> ME] PEER: {:?} | LOCAL: {:?}", | |
stream.peer_addr().unwrap(), | |
stream.local_addr().unwrap() | |
); | |
} | |
Ok(()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::HashMap; | |
use std::io::prelude::*; | |
use std::net::{TcpListener, TcpStream}; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::mpsc::{channel, Sender}; | |
#[derive(Debug, Clone)] | |
struct Peer { | |
pub local_address: String, | |
pub local_port: u16, | |
pub remote_address: String, | |
pub remote_port: u16, | |
} | |
fn handle_client(mut socket: TcpStream, peers: Arc<Mutex<Vec<Peer>>>, hosts_tx: Sender<(std::string::String, std::string::String)>) -> std::io::Result<()> { | |
let stringified_address = socket.peer_addr().unwrap().ip().to_string(); | |
let socket_port = socket.peer_addr().unwrap().port(); | |
loop { | |
let mut buf = [0; 1024]; | |
let size = socket.read(&mut buf); | |
if buf.len() == 0 || size.is_err() { | |
let mut lock = peers.lock().unwrap(); | |
let mut iter = lock.iter(); | |
let i = iter.position(|x| { | |
x.remote_address == stringified_address && x.remote_port == socket_port | |
}); | |
if let Some(index) = i { | |
lock.remove(index); | |
println!("Removed: {:?}", lock); | |
} | |
// drop(lock); | |
return Ok(()); | |
} | |
let buf = String::from_utf8(buf[..size.unwrap()].to_vec()).unwrap(); | |
println!("[INCOMING] from {} => {}", socket.peer_addr().unwrap(), buf); | |
let mut local_elements = buf.split(":"); | |
// since it's a POC - it needs to be set and done so let us assume | |
// that the message looks like xxx.xxx.xxx.xxx:ppppp | |
let local_address = local_elements.next().unwrap(); | |
let local_port = local_elements.next().unwrap(); | |
let peer = Peer { | |
local_address: local_address.to_string(), | |
local_port: local_port.parse::<u16>().unwrap(), | |
remote_address: socket.peer_addr().unwrap().ip().to_string(), | |
remote_port: socket | |
.peer_addr() | |
.unwrap() | |
.port() | |
.to_string() | |
.parse::<u16>() | |
.unwrap(), | |
}; | |
let mut lock = peers.lock().unwrap(); | |
lock.push(peer); | |
for p in lock.iter() { | |
let filtered = | |
filter_peers(&lock, String::from(&p.remote_address), p.remote_port); | |
if filtered.len() > 0 { | |
let sent = hosts_tx.send((format!("{}:{}", p.remote_address, p.remote_port), encode_peers(&filtered))); | |
if let Err(e) = sent { | |
println!("Error sending payload to channel {}", e); | |
} | |
} | |
} | |
// drop(lock); | |
} | |
} | |
fn main() -> std::io::Result<()> { | |
let listener = TcpListener::bind("178.128.32.250:3000")?; | |
let peers: Arc<Mutex<Vec<Peer>>> = Arc::new(Mutex::new(Vec::<Peer>::new())); | |
let connections: Arc<Mutex<HashMap<String, TcpStream>>> = Arc::new(Mutex::new(HashMap::<String, TcpStream>::new())); | |
let (hosts_tx, hosts_rx) = channel::<(String, String)>(); | |
let cloned_connections = Arc::clone(&connections); | |
// This is the loop which is listening for incoming messages | |
// from the channel | |
// the idea behind this channel is to send payloads to the desired | |
// socket connections | |
std::thread::spawn(move || { | |
loop { | |
let recv = hosts_rx.recv(); | |
if recv.is_err() { | |
println!("Recv error !"); | |
break | |
} | |
// Get the desired socket via the key | |
let (key, payload) = recv.unwrap(); | |
let mut lock = cloned_connections.lock().unwrap(); | |
let target = lock.get_mut(&key); | |
if let Some(target) = target { | |
let written = target.write(payload.as_bytes()); | |
if let Err(e) = written { | |
println!("Error sending payload to {}: {}", key, e); | |
} | |
} | |
// drop(lock); | |
} | |
}); | |
for stream in listener.incoming() { | |
let peers_arc = Arc::clone(&peers); | |
let stream = stream.unwrap(); | |
let stringified_address = stream.peer_addr().unwrap().ip().to_string(); | |
let stream_port = stream.peer_addr().unwrap().port(); | |
let key = format!("{}:{}", stringified_address, stream_port); | |
let mut lock = connections.lock().unwrap(); | |
lock.insert(key, stream.try_clone().unwrap()); | |
// drop(lock); | |
let hosts_tx_clone = hosts_tx.clone(); | |
std::thread::spawn(move || { | |
let handled = handle_client(stream, peers_arc, hosts_tx_clone); | |
if let Err(e) = handled { | |
println!("Handling client crashed ! {}", e); | |
} | |
}); | |
} | |
Ok(()) | |
} | |
// filter is remote address | |
fn filter_peers(peers: &Vec<Peer>, filter_ip: String, filter_port: u16) -> Vec<Peer> { | |
let mut result: Vec<Peer> = vec![]; | |
for i in peers { | |
if i.remote_address == filter_ip && i.remote_port == filter_port { | |
continue; | |
} | |
result.push(i.clone()); | |
} | |
result | |
} | |
fn encode_peers(peers: &Vec<Peer>) -> String { | |
let mut keys: Vec<String> = vec![]; | |
for p in peers { | |
keys.push(format!("{}:{}|{}:{}", p.remote_address, p.remote_port, p.local_address, p.local_port)); | |
} | |
keys.join(",") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment