Skip to content

Instantly share code, notes, and snippets.

@gabrielheinrich
Last active November 8, 2022 14:57
Show Gist options
  • Save gabrielheinrich/64d2a1d1e496967e8021d8115be41728 to your computer and use it in GitHub Desktop.
Save gabrielheinrich/64d2a1d1e496967e8021d8115be41728 to your computer and use it in GitHub Desktop.
Replicache inspired realtime state sync in Rust
use std::collections::HashMap;
#[derive(Debug, Clone)]
struct User {
id: String,
name: String,
}
#[derive(Debug, Clone)]
struct Post {
id: String,
user_id: String,
content: String,
}
struct State {
user: HashMap<String, User>,
post: HashMap<String, Post>,
}
#[derive(Debug, Clone)]
struct Patch {
user: HashMap<String, Option<User>>,
post: HashMap<String, Option<Post>>,
}
struct Tx<'a> {
state: &'a mut State,
patch: Patch,
undo: Patch,
}
impl Patch {
fn new() -> Self {
Self {
user: HashMap::new(),
post: HashMap::new(),
}
}
fn merge(&mut self, other: &Self) {
for (id, user) in other.user.iter() {
self.user.insert(id.clone(), user.clone());
}
for (id, post) in other.post.iter() {
self.post.insert(id.clone(), post.clone());
}
}
}
impl State {
fn new() -> Self {
Self {
user: HashMap::new(),
post: HashMap::new(),
}
}
fn tx<'a>(&'a mut self) -> Tx<'a> {
Tx {
state: self,
patch: Patch::new(),
undo: Patch::new(),
}
}
fn mutate<'a>(&'a mut self, mutation: Mutation) -> Tx<'a> {
let mut tx = self.tx();
match mutation.op {
MutationOp::InsertUser(user) => tx.insert_user(user),
MutationOp::RemoveUser(id) => tx.remove_user(&id),
MutationOp::InsertPost(post) => tx.insert_post(post),
MutationOp::RemovePost(id) => tx.remove_post(&id),
}
tx
}
fn apply_patch(&mut self, patch: &Patch) {
for (id, user) in patch.user.iter() {
match user {
Some(user) => self.user.insert(id.clone(), user.clone()),
None => self.user.remove(id),
};
}
for (id, post) in patch.post.iter() {
match post {
Some(post) => self.post.insert(id.clone(), post.clone()),
None => self.post.remove(id),
};
}
}
}
impl<'a> Tx<'a> {
fn insert_user(&mut self, user: User) {
self.patch.user.insert(user.id.clone(), Some(user.clone()));
self.undo
.user
.insert(user.id.clone(), self.state.user.remove(&user.id));
self.state.user.insert(user.id.clone(), user);
}
fn insert_post(&mut self, post: Post) {
self.patch.post.insert(post.id.clone(), Some(post.clone()));
self.undo
.post
.insert(post.id.clone(), self.state.post.remove(&post.id));
self.state.post.insert(post.id.clone(), post);
}
fn remove_user(&mut self, id: &str) {
self.patch.user.insert(id.to_string(), None);
self.undo
.user
.insert(id.to_string(), self.state.user.remove(id));
}
fn remove_post(&mut self, id: &str) {
self.patch.post.insert(id.to_string(), None);
self.undo
.post
.insert(id.to_string(), self.state.post.remove(id));
}
}
#[derive(Debug, Clone)]
enum MutationOp {
InsertUser(User),
RemoveUser(String),
InsertPost(Post),
RemovePost(String),
}
#[derive(Debug, Clone)]
struct Mutation {
id: usize,
op: MutationOp,
}
struct ServerClientData {
last_mutation_id: usize,
}
struct Server {
state: State,
clients: HashMap<String, ServerClientData>,
global_version: usize,
recent_patches: Vec<Patch>,
}
struct PullRequest {
client_id: String,
cookie: usize,
}
struct PullResponse {
patch: Patch,
cookie: usize,
last_mutation_id: usize,
}
struct PushRequest {
client_id: String,
mutations: Vec<Mutation>,
}
impl Server {
fn new() -> Self {
Self {
state: State::new(),
clients: HashMap::new(),
global_version: 0,
recent_patches: Vec::new(),
}
}
fn push(&mut self, req: PushRequest) {
for mutation in req.mutations {
let tx = self.state.mutate(mutation.clone());
self.recent_patches.push(tx.patch);
self.global_version += 1;
self.clients
.get_mut(&req.client_id)
.unwrap()
.last_mutation_id = mutation.id;
}
}
fn pull(&mut self, req: PullRequest) -> PullResponse {
if (!self.clients.contains_key(&req.client_id)) {
self.clients.insert(
req.client_id.clone(),
ServerClientData {
last_mutation_id: 0,
},
);
}
let client = self.clients.get(&req.client_id).unwrap();
assert!(self.recent_patches.len() == self.global_version);
let patch = self.recent_patches
[self.recent_patches.len() - (self.global_version - req.cookie)..]
.iter()
.fold(Patch::new(), |mut patch, p| {
patch.merge(p);
patch
});
PullResponse {
patch,
cookie: self.global_version,
last_mutation_id: client.last_mutation_id,
}
}
}
#[derive(Debug, Clone)]
struct PendingMutation {
mutation: Mutation,
undo: Patch,
}
struct Client {
state: State,
cookie: usize,
client_id: String,
last_mutation_id: usize,
pending_mutations: Vec<PendingMutation>,
}
impl Client {
fn new(client_id: &str) -> Self {
Self {
state: State::new(),
cookie: 0,
client_id: client_id.to_string(),
last_mutation_id: 0,
pending_mutations: Vec::new(),
}
}
fn push(&mut self, server: &mut Server) {
let req = PushRequest {
client_id: self.client_id.clone(),
mutations: self
.pending_mutations
.iter()
.map(|m| m.mutation.clone())
.collect(),
};
server.push(req);
}
fn mutate(&mut self, op: MutationOp) {
let id = self.last_mutation_id + 1;
self.apply_mutation(Mutation { id, op });
}
fn apply_mutation(&mut self, mutation: Mutation) {
let tx = self.state.mutate(mutation.clone());
self.last_mutation_id = mutation.id;
self.pending_mutations.push(PendingMutation {
mutation,
undo: tx.undo,
});
}
fn pull(&mut self, server: &mut Server) {
let req = PullRequest {
client_id: self.client_id.clone(),
cookie: self.cookie,
};
let resp = server.pull(req);
self.cookie = resp.cookie;
// undo pending mutations
for pending in self.pending_mutations.iter().rev() {
self.state.apply_patch(&pending.undo);
}
// apply patch
self.state.apply_patch(&resp.patch);
// reapply mutations that were not applied by the server
let pending_mutations = std::mem::take(&mut self.pending_mutations);
for pending in pending_mutations.iter() {
self.apply_mutation(pending.mutation.clone());
}
}
}
fn main() {
println!("Hello, world!");
}
#[test]
fn test_state() {
let mut state = State::new();
let mut tx = state.tx();
tx.insert_user(User {
id: "1".to_string(),
name: "Alice".to_string(),
});
tx.insert_post(Post {
id: "1".to_string(),
user_id: "1".to_string(),
content: "Hello, world!".to_string(),
});
assert!(tx.state.user.contains_key("1"));
assert!(tx.state.post.contains_key("1"));
tx.state.apply_patch(&tx.undo);
assert!(!tx.state.user.contains_key("1"));
assert!(!tx.state.post.contains_key("1"));
}
#[test]
fn test_protocol() {
let mut server = Server::new();
let mut client_1 = Client::new("1");
let mut client_2 = Client::new("2");
client_1.pull(&mut server);
client_1.mutate(MutationOp::InsertUser(User {
id: "1".to_string(),
name: "Alice".to_string(),
}));
assert!(client_1.state.user.contains_key("1"));
client_1.push(&mut server);
client_1.pull(&mut server);
assert!(client_1.state.user.contains_key("1"));
client_2.pull(&mut server);
assert!(client_2.state.user.contains_key("1"));
client_2.mutate(MutationOp::InsertUser(User {
id: "2".to_string(),
name: "Bob".to_string(),
}));
client_1.mutate(MutationOp::InsertPost(Post {
id: "1".to_string(),
user_id: "1".to_string(),
content: "Hello, world!".to_string(),
}));
client_1.mutate(MutationOp::InsertPost(Post {
id: "2".to_string(),
user_id: "1".to_string(),
content: "Hello, world!".to_string(),
}));
client_1.mutate(MutationOp::RemovePost("1".to_string()));
client_2.push(&mut server);
client_1.push(&mut server);
client_1.mutate(MutationOp::InsertPost(Post {
id: "3".to_string(),
user_id: "1".to_string(),
content: "Hello, world!".to_string(),
}));
client_1.pull(&mut server);
client_2.pull(&mut server);
client_2.pull(&mut server);
assert!(client_1.state.post.contains_key("3"));
assert!(client_1.state.post.contains_key("2"));
assert!(client_2.state.post.contains_key("2"));
assert!(client_1.state.user.contains_key("2"));
assert!(client_2.state.user.contains_key("2"));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment