Skip to content

Instantly share code, notes, and snippets.

@jaytaph
Last active January 22, 2024 09:47
Show Gist options
  • Save jaytaph/d75eba7056a3e94bf00e822a5e977691 to your computer and use it in GitHub Desktop.
Save jaytaph/d75eba7056a3e94bf00e822a5e977691 to your computer and use it in GitHub Desktop.
ipc in rust
use fork::{fork, Fork};
use std::{process, thread};
use std::thread::sleep;
use anyhow::{anyhow, Error};
use ipc_channel::ipc::{IpcReceiver, IpcSelectionResult};
use ipc_channel::ipc::{self, IpcReceiverSet, IpcSender};
use serde::{Deserialize, Serialize};
use rand::Rng;
const MAX_PRIORITY: i8 = 127;
const MIN_PRIORITY: i8 = -128;
const DEFAULT_PRIORITY: i8 = 0;
/// A GIPC channel is a connection to another process. It is used to send and receive messages. However, due to the fact
/// that we cannot clone the IpcReceiver, we need to store the IpcSender in the GipcChannel struct, and the IpcReceiver
/// in the IpcReceiverSet. Bummer...
#[allow(dead_code)]
struct GipcChannel {
/// The name of the channel
name: String,
out_chan: IpcSender<GipcMessage>
}
/// A GIPC message is a message that is sent between processes. It contains the source and destination of the message,
/// the priority of the message, and the message itself.
#[derive(Serialize, Deserialize, Debug)]
struct GipcMessage {
/// The source name of the channel, the GIPC broker will fill this is, even if the user has set it.
source: String,
/// The destination name of the channel
dest: String,
/// When set, the channel that this message is a reply to
in_reply_to: Option<String>,
/// Priority of the message (+127 (highest) to -128 (lowest))
priority: i8,
/// The message to send (this needs to be a protobuf type of system, but we use a simple string for now)
msg: String,
}
/// The GIPC broker. The system that deals with sending and receiving messages between processes. It also allows processes
/// to register their connection.
struct GipcBroker {
/// The registered channels
channels: Vec<GipcChannel>,
/// When a message needs to be send, it is added to the message queue. The GIPC broker will send the message to the
/// destination process based on priority.
msg_queue: Vec<GipcMessage>,
/// The IPC receiver set. receive channels are not stored in the GipcChannel struct, because we cannot share/clone them between the
/// gipc-channel struct and the ipc-receiver-set.
ipc_rx_set: IpcReceiverSet,
}
/// A gipc client is a connection to the GIPC broker. It is used to send and receive messages from a child process back to the broker (or other clients)
struct GipcClient {
/// The name of the client
name: String,
/// The IPC sender
tx: IpcSender<GipcMessage>,
/// The IPC receiver
rx: IpcReceiver<GipcMessage>,
}
impl GipcClient {
/// Create a new client with a specific name and tx/rx channels
fn new(name: &str, tx: IpcSender<GipcMessage>, rx: IpcReceiver<GipcMessage>) -> Self {
GipcClient {
name: name.to_string(),
tx,
rx,
}
}
/// Send a message to given destination
fn send_message(&self, dest: &str, prio: i8, msg: &str) -> Result<(), Error> {
let res = self.tx.send(GipcMessage {
source: self.name.clone(),
dest: dest.to_string(),
in_reply_to: None,
priority: prio,
msg: msg.to_string(),
});
if res.is_ok() {
return Ok(());
}
return Err(anyhow!("Failed to send message"));
}
}
// unsafe impl Send for GipcBroker {}
// unsafe impl Sync for GipcBroker {}
impl GipcBroker {
/// Simple init
fn init() -> GipcBroker {
GipcBroker {
channels: Vec::new(),
msg_queue: Vec::new(),
ipc_rx_set: IpcReceiverSet::new().unwrap(),
}
}
/// Register a new channel. This will create a new channel, and return the endpoints that needs to be used by the
/// child process.
fn register(&mut self, name: &str) -> Result<GipcClient, Error> {
for channel in &self.channels {
if channel.name == name {
return Err(anyhow!("Channel {} already exists", name));
}
}
// IPC connections are unidirectional, so we need to create two channels for each connection
let (tx_to, rx_to) = ipc::channel::<GipcMessage>().unwrap();
let (tx_from, rx_from) = ipc::channel::<GipcMessage>().unwrap();
let channel = GipcChannel {
name: name.to_string(),
out_chan: tx_to,
// in_chan: rx_from,
};
self.channels.push(channel);
self.ipc_rx_set.add(rx_from).unwrap();
// Return the gipc client that the child process can use to send and receive messages
Ok(GipcClient::new(name, tx_from, rx_to))
}
/// Not yet used. This will send a message to the destination process.
#[allow(dead_code)]
fn send(&mut self, msg: GipcMessage) -> Result<(), String> {
let mut found = false;
for channel in &self.channels {
if channel.name == msg.dest {
found = true;
break;
}
}
if !found {
return Err(format!("Channel {} does not exist", msg.dest));
}
self.msg_queue.push(msg);
Ok(())
}
/// The main loop of the broker. This will handle incoming and outgoing IPC messages. Note: should we have
/// a separate thread for outgoing messages?
fn run(&mut self) {
loop {
println!("[{}] IPC thread loop", process::id());
for event in self.ipc_rx_set.select().unwrap() {
match event {
IpcSelectionResult::MessageReceived(id, message) => {
let msg: GipcMessage = message.to().unwrap();
println!("[{}] Message {} received: {:?}", process::id(), id, msg);
}
IpcSelectionResult::ChannelClosed(id) => {
println!("[{}] Channel closed: {}", process::id(), id);
}
}
}
}
}
}
fn main() {
println!("[{}] Hello, world!", process::id());
let mut broker = GipcBroker::init();
broker.register("main").expect("Failed to register main");
// register some channels.. it's fine when the broker has not started yet
create_session(&mut broker, "session#1");
create_session(&mut broker, "session#2");
// start the broker in a new thread
println!("[{}] Starting IPC thread", process::id());
thread::spawn(move || broker.run());
// Register some more channels afterwards. This fails as the broker is moved to the thread.
create_session(&mut broker, "session#3");
create_session(&mut broker, "session#4");
sleep(std::time::Duration::from_secs(100));
println!("[{}] End program", process::id());
}
/// THis will create a new session. It will fork a new process, and send messages to the main process.
fn create_session(broker: &mut GipcBroker, name: &str) {
// Register a new client to the broker
let gipc_client = broker.register(name).expect("Failed to register broker client");
match fork() {
Ok(Fork::Parent(child)) => {
// Just continue execution
println!("[{}] fork::parent (child has: {})", process::id(), child);
}
Ok(Fork::Child) => {
// Child process
println!("[{}] fork::child", process::id());
println!("[{}] Continuing execution in child process", process::id());
loop {
gipc_client.send_message("main", DEFAULT_PRIORITY, &format!("Hello from child {} to main", name).to_string()).expect("Failed to send message");
let ms = rand::thread_rng().gen_range(1000..4000);
sleep(std::time::Duration::from_millis(ms));
}
}
Err(err) => panic!("Fork failed: {}", err),
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment