Skip to content

Instantly share code, notes, and snippets.

@purpleposeidon
Created November 9, 2021 13:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save purpleposeidon/6a877af0c6fecd7cf228c8de886b6638 to your computer and use it in GitHub Desktop.
Save purpleposeidon/6a877af0c6fecd7cf228c8de886b6638 to your computer and use it in GitHub Desktop.
Fixing data denormalization with differential events
// Sketch in response to https://lironshapira.medium.com/data-denormalization-is-broken-7b697352f405
// Schema
#[table]
struct room {
name: String,
}
#[table]
struct msg {
room: room::Id,
user: user::Id,
body: String,
when: Instant,
}
#[table]
struct member {
user: user::Id,
room: room::Id,
last: Instant,
unread: usize,
}
#[table]
struct user {
name: String,
}
// Materialize
fn materialize_member_unread<
D: Differentiable,
// This requires generic associated types. It may be possible to do without,
// particularly if this function gets wrapped in a de-sugaring macro.
>(
d: D,
user: D::At<user::Id>,
user2member: D::Input<IndexOf<member::own::user>>,
member_last: D::Input<member::own::last>,
msg_ids: D::Input<msg::Ids>,
msg_when: D::Input<msg::own::when>,
) -> impl D::Output<user::read::notifications>
{
// We might write normal-looking rust code and use a #[macro] to transform it to the below.
let mut notifications = 0;
let membership = user2member.get(user);
// fn user2map::get(user::Id) -> &[room::Id]
for member in membership {
for msg in msg_ids.iter().rev() {
if msg_when[msg] < member_last[member] {
// btw, this is an optimization that assumes msg_when is always increasing.
break;
} else {
notifications += 1;
}
}
}
return notifications;
// Here's what it really looks like.
// We use << like a Haskell monad, or a C++ output stream,
// to apply modifications made to the output variable.
// All flow control must pass through `d`.
// I don't know how you'd soundly do that with a simple macro, so right now I prefer this more
// explicit version:
let membership = user2member.get(user);
d.output(0) << d._for(membership, |d, member| {
// `prior` reaches back to the tail of the monoid.
d.prior() << d._for(msg_ids.iter().rev(), |d, msg| {
d.prior() << d._if(msg_when[msg] < member_last[member], |d| {
d.break_flow()
}, |d| {
d.prior() + d.output(1)
})
})
})
}
fn init(sys: &mut v9::Universe) {
// So we construct a derivative "With Respect To" each table that can change...
sys.materialize(materialize_member_unread)
.wrt::<msg::Marker>()
.wrt::<member::Marker>()
.wrt::<user::Marker>()
;
// ... and also WRT each primal v9 event:
// - push row
// - remove row
// - edit elements
// Each event consists of a list of row IDs. (And edit events include the prior value.)
// Eg in a single gametick, there would be one event removing enemies with < 0 health,
// and another event that regens all enemies in a healing zone.
}
// Let's look at a few instatiations of these.
fn materialize_member_unread__push_user(
d: delta::DeltaPush<user::Marker>,
// All arguments become constant...
user: user::Id,
user2member: delta::Pushed<IndexOf<member::own::user>>,
// ^^^^^^^^^^^^^ ... except for this one.
member_last: member::own::last,
msg_ids: msg::Ids,
msg_when: msg::own::when,
) -> impl delta::Output<user::read::notifications>
{
// Copying down the same code:
let membership = user2member.get(user);
// What happens ^^^^^^^^^^^^^^^^^^^^^ here?
d.output(0) << d._for(membership, |d, member| {
d.prior() << d._for(msg_ids.iter().rev(), |d, msg| {
d.prior() << d._if(msg_when[msg] < member_last[member], |d| {
d.break_flow()
}, |d| {
d.prior() + d.output(1)
})
})
})
// IndexOf works like HashMap<UserId, &[RoomId]>.
// However, delta::Pushed<_> is measuring a CHANGE in this HashMap;
// in fact just the insertions.
// So unless our user is has just been inserted, the HashMap isn't going to return anything?
// Not quite. In the undifferentiated case, it returns whatever was in the HashMap (it's
// guaranteed to be non-panicking by v9's events ensuring consistency.)
// So what it returns in that case is `NoChange`.
// (This suggests that materialize_member_unread__push_user is called for all users when
// something is pushed? So some smarts are needed around that.)
// Since we're measuring changes, AND there is no further reference to user2member [uh oh, how
// can we know this?], we don't need to check the body of the for loop.
//
// But if there IS an entry, we were just pushed. This means we must be a new user who has
// never been in a chat, but if we are somehow, our notifications get counted.
}
fn materialize_member_unread__remove_msg(
d: delta::DeltaRemove<msg::Marker>,
user: user::Id,
user2member: IndexOf<member::own::user>,
member_last: member::own::last,
msg_ids: delta::Removed<msg::Ids>,
// ^^^^^^^^^^^^^^
msg_when: msg::own::when,
// Note: msg_when[removed_id] is valid because the ID is still live, like in Drop.
) -> impl delta::Output<user::read::notifications>
{
let membership = user2member.get(user);
d.output(0) << d._for(membership, |d, member| {
d.prior() << d._for(msg_ids.iter().rev(), |d, msg| {
// ^^^^^^^ We iterate over all removed IDs, and reverse their effects.
d.prior() << d._if(msg_when[msg] < member_last[member], |d| {
// FIXME: uh
d.break_flow()
}, |d| {
// FIXME: uh
d.prior() + d.output(1)
})
})
})
// Maybe the constant & differential for get executed in parallel?
// But that's against the benefit of not needing to recalc...
}
fn materialize_member_unread__edit_msg_when(
d: delta::DeltaEdit<msg::own::when>,
user: user::Id,
user2member: IndexOf<member::own::user>,
member_last: member::own::last,
msg_ids: msg::Ids,
msg_when: delta::Edited<msg::own::when>,
// ^^^^^^^^^^^^^
) -> impl delta::Output<user::read::notifications>
{
let membership = user2member.get(user);
d.output(0) << d._for(membership, |d, member| {
d.prior() << d._for(msg_ids.iter().rev(), |d, msg| {
d.prior() << d._if(msg_when[msg] < member_last[member], |d| {
// ^^^^^^^^^^^^^
// Returns the pair (old, new).
// (FIXME: Rust *STILL* doesn't have IndexGet)
d.break_flow()
// FIXME: Editing the timestamp breaks this optimization. To ignore this, let's say
// you can only edit the last sent message.
}, |d| {
d.prior() + d.output(1)
})
})
})
}
// The chain rule:
// d/dx (f(g(x)) = f'(g(x)) g'(x)
// I'm not sure it applies to any of this because g' is always 1, in some sense?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment