Last active
November 8, 2022 14:57
-
-
Save gabrielheinrich/64d2a1d1e496967e8021d8115be41728 to your computer and use it in GitHub Desktop.
Replicache inspired realtime state sync in Rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::HashMap; | |
#[derive(Debug, Clone)] | |
struct User { | |
id: String, | |
name: String, | |
} | |
#[derive(Debug, Clone)] | |
struct Post { | |
id: String, | |
user_id: String, | |
content: String, | |
} | |
struct State { | |
user: HashMap<String, User>, | |
post: HashMap<String, Post>, | |
} | |
#[derive(Debug, Clone)] | |
struct Patch { | |
user: HashMap<String, Option<User>>, | |
post: HashMap<String, Option<Post>>, | |
} | |
struct Tx<'a> { | |
state: &'a mut State, | |
patch: Patch, | |
undo: Patch, | |
} | |
impl Patch { | |
fn new() -> Self { | |
Self { | |
user: HashMap::new(), | |
post: HashMap::new(), | |
} | |
} | |
fn merge(&mut self, other: &Self) { | |
for (id, user) in other.user.iter() { | |
self.user.insert(id.clone(), user.clone()); | |
} | |
for (id, post) in other.post.iter() { | |
self.post.insert(id.clone(), post.clone()); | |
} | |
} | |
} | |
impl State { | |
fn new() -> Self { | |
Self { | |
user: HashMap::new(), | |
post: HashMap::new(), | |
} | |
} | |
fn tx<'a>(&'a mut self) -> Tx<'a> { | |
Tx { | |
state: self, | |
patch: Patch::new(), | |
undo: Patch::new(), | |
} | |
} | |
fn mutate<'a>(&'a mut self, mutation: Mutation) -> Tx<'a> { | |
let mut tx = self.tx(); | |
match mutation.op { | |
MutationOp::InsertUser(user) => tx.insert_user(user), | |
MutationOp::RemoveUser(id) => tx.remove_user(&id), | |
MutationOp::InsertPost(post) => tx.insert_post(post), | |
MutationOp::RemovePost(id) => tx.remove_post(&id), | |
} | |
tx | |
} | |
fn apply_patch(&mut self, patch: &Patch) { | |
for (id, user) in patch.user.iter() { | |
match user { | |
Some(user) => self.user.insert(id.clone(), user.clone()), | |
None => self.user.remove(id), | |
}; | |
} | |
for (id, post) in patch.post.iter() { | |
match post { | |
Some(post) => self.post.insert(id.clone(), post.clone()), | |
None => self.post.remove(id), | |
}; | |
} | |
} | |
} | |
impl<'a> Tx<'a> { | |
fn insert_user(&mut self, user: User) { | |
self.patch.user.insert(user.id.clone(), Some(user.clone())); | |
self.undo | |
.user | |
.insert(user.id.clone(), self.state.user.remove(&user.id)); | |
self.state.user.insert(user.id.clone(), user); | |
} | |
fn insert_post(&mut self, post: Post) { | |
self.patch.post.insert(post.id.clone(), Some(post.clone())); | |
self.undo | |
.post | |
.insert(post.id.clone(), self.state.post.remove(&post.id)); | |
self.state.post.insert(post.id.clone(), post); | |
} | |
fn remove_user(&mut self, id: &str) { | |
self.patch.user.insert(id.to_string(), None); | |
self.undo | |
.user | |
.insert(id.to_string(), self.state.user.remove(id)); | |
} | |
fn remove_post(&mut self, id: &str) { | |
self.patch.post.insert(id.to_string(), None); | |
self.undo | |
.post | |
.insert(id.to_string(), self.state.post.remove(id)); | |
} | |
} | |
#[derive(Debug, Clone)] | |
enum MutationOp { | |
InsertUser(User), | |
RemoveUser(String), | |
InsertPost(Post), | |
RemovePost(String), | |
} | |
#[derive(Debug, Clone)] | |
struct Mutation { | |
id: usize, | |
op: MutationOp, | |
} | |
struct ServerClientData { | |
last_mutation_id: usize, | |
} | |
struct Server { | |
state: State, | |
clients: HashMap<String, ServerClientData>, | |
global_version: usize, | |
recent_patches: Vec<Patch>, | |
} | |
struct PullRequest { | |
client_id: String, | |
cookie: usize, | |
} | |
struct PullResponse { | |
patch: Patch, | |
cookie: usize, | |
last_mutation_id: usize, | |
} | |
struct PushRequest { | |
client_id: String, | |
mutations: Vec<Mutation>, | |
} | |
impl Server { | |
fn new() -> Self { | |
Self { | |
state: State::new(), | |
clients: HashMap::new(), | |
global_version: 0, | |
recent_patches: Vec::new(), | |
} | |
} | |
fn push(&mut self, req: PushRequest) { | |
for mutation in req.mutations { | |
let tx = self.state.mutate(mutation.clone()); | |
self.recent_patches.push(tx.patch); | |
self.global_version += 1; | |
self.clients | |
.get_mut(&req.client_id) | |
.unwrap() | |
.last_mutation_id = mutation.id; | |
} | |
} | |
fn pull(&mut self, req: PullRequest) -> PullResponse { | |
if (!self.clients.contains_key(&req.client_id)) { | |
self.clients.insert( | |
req.client_id.clone(), | |
ServerClientData { | |
last_mutation_id: 0, | |
}, | |
); | |
} | |
let client = self.clients.get(&req.client_id).unwrap(); | |
assert!(self.recent_patches.len() == self.global_version); | |
let patch = self.recent_patches | |
[self.recent_patches.len() - (self.global_version - req.cookie)..] | |
.iter() | |
.fold(Patch::new(), |mut patch, p| { | |
patch.merge(p); | |
patch | |
}); | |
PullResponse { | |
patch, | |
cookie: self.global_version, | |
last_mutation_id: client.last_mutation_id, | |
} | |
} | |
} | |
#[derive(Debug, Clone)] | |
struct PendingMutation { | |
mutation: Mutation, | |
undo: Patch, | |
} | |
struct Client { | |
state: State, | |
cookie: usize, | |
client_id: String, | |
last_mutation_id: usize, | |
pending_mutations: Vec<PendingMutation>, | |
} | |
impl Client { | |
fn new(client_id: &str) -> Self { | |
Self { | |
state: State::new(), | |
cookie: 0, | |
client_id: client_id.to_string(), | |
last_mutation_id: 0, | |
pending_mutations: Vec::new(), | |
} | |
} | |
fn push(&mut self, server: &mut Server) { | |
let req = PushRequest { | |
client_id: self.client_id.clone(), | |
mutations: self | |
.pending_mutations | |
.iter() | |
.map(|m| m.mutation.clone()) | |
.collect(), | |
}; | |
server.push(req); | |
} | |
fn mutate(&mut self, op: MutationOp) { | |
let id = self.last_mutation_id + 1; | |
self.apply_mutation(Mutation { id, op }); | |
} | |
fn apply_mutation(&mut self, mutation: Mutation) { | |
let tx = self.state.mutate(mutation.clone()); | |
self.last_mutation_id = mutation.id; | |
self.pending_mutations.push(PendingMutation { | |
mutation, | |
undo: tx.undo, | |
}); | |
} | |
fn pull(&mut self, server: &mut Server) { | |
let req = PullRequest { | |
client_id: self.client_id.clone(), | |
cookie: self.cookie, | |
}; | |
let resp = server.pull(req); | |
self.cookie = resp.cookie; | |
// undo pending mutations | |
for pending in self.pending_mutations.iter().rev() { | |
self.state.apply_patch(&pending.undo); | |
} | |
// apply patch | |
self.state.apply_patch(&resp.patch); | |
// reapply mutations that were not applied by the server | |
let pending_mutations = std::mem::take(&mut self.pending_mutations); | |
for pending in pending_mutations.iter() { | |
self.apply_mutation(pending.mutation.clone()); | |
} | |
} | |
} | |
fn main() { | |
println!("Hello, world!"); | |
} | |
#[test] | |
fn test_state() { | |
let mut state = State::new(); | |
let mut tx = state.tx(); | |
tx.insert_user(User { | |
id: "1".to_string(), | |
name: "Alice".to_string(), | |
}); | |
tx.insert_post(Post { | |
id: "1".to_string(), | |
user_id: "1".to_string(), | |
content: "Hello, world!".to_string(), | |
}); | |
assert!(tx.state.user.contains_key("1")); | |
assert!(tx.state.post.contains_key("1")); | |
tx.state.apply_patch(&tx.undo); | |
assert!(!tx.state.user.contains_key("1")); | |
assert!(!tx.state.post.contains_key("1")); | |
} | |
#[test] | |
fn test_protocol() { | |
let mut server = Server::new(); | |
let mut client_1 = Client::new("1"); | |
let mut client_2 = Client::new("2"); | |
client_1.pull(&mut server); | |
client_1.mutate(MutationOp::InsertUser(User { | |
id: "1".to_string(), | |
name: "Alice".to_string(), | |
})); | |
assert!(client_1.state.user.contains_key("1")); | |
client_1.push(&mut server); | |
client_1.pull(&mut server); | |
assert!(client_1.state.user.contains_key("1")); | |
client_2.pull(&mut server); | |
assert!(client_2.state.user.contains_key("1")); | |
client_2.mutate(MutationOp::InsertUser(User { | |
id: "2".to_string(), | |
name: "Bob".to_string(), | |
})); | |
client_1.mutate(MutationOp::InsertPost(Post { | |
id: "1".to_string(), | |
user_id: "1".to_string(), | |
content: "Hello, world!".to_string(), | |
})); | |
client_1.mutate(MutationOp::InsertPost(Post { | |
id: "2".to_string(), | |
user_id: "1".to_string(), | |
content: "Hello, world!".to_string(), | |
})); | |
client_1.mutate(MutationOp::RemovePost("1".to_string())); | |
client_2.push(&mut server); | |
client_1.push(&mut server); | |
client_1.mutate(MutationOp::InsertPost(Post { | |
id: "3".to_string(), | |
user_id: "1".to_string(), | |
content: "Hello, world!".to_string(), | |
})); | |
client_1.pull(&mut server); | |
client_2.pull(&mut server); | |
client_2.pull(&mut server); | |
assert!(client_1.state.post.contains_key("3")); | |
assert!(client_1.state.post.contains_key("2")); | |
assert!(client_2.state.post.contains_key("2")); | |
assert!(client_1.state.user.contains_key("2")); | |
assert!(client_2.state.user.contains_key("2")); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment