Skip to content

Instantly share code, notes, and snippets.

@ghl3
Last active March 21, 2023 14:07
Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ghl3/7e8147ae4dcf08f3d325 to your computer and use it in GitHub Desktop.
Save ghl3/7e8147ae4dcf08f3d325 to your computer and use it in GitHub Desktop.
Implementation of an immutable database in postgres
-- An alternative style of creating a user view
-- that leverages group-by and aggregates
CREATE OR REPLACE VIEW users_group_by AS (
SELECT user_base.id
,first_not_null(email ORDER BY user_credentials.time DESC) AS email
,first_not_null(firstname ORDER BY user_metadata.time DESC) AS firstname
,first_not_null(lastname ORDER BY user_metadata.time DESC) AS lastname
FROM user_base
LEFT JOIN user_deletion ON user_deletion.user_id=user_base.id
LEFT JOIN user_credentials ON user_credentials.user_id=user_base.id
LEFT JOIN user_metadata ON user_metadata.user_id=user_base.id
WHERE user_deletion.id IS NULL
GROUP BY user_base.id
);
-- Create a view that represents a user using only
-- the most recently added data, regardless of whether
-- it is null or not
CREATE OR REPLACE VIEW users_strictly_most_recent AS (
SELECT user_base.*,
credentials.*,
metadata.*
FROM user_base
LEFT JOIN LATERAL (
SELECT email
FROM user_credentials
WHERE user_id=user_base.id
ORDER BY time DESC
LIMIT 1
) AS credentials ON true
LEFT JOIN LATERAL (
SELECT firstname, lastname
FROM user_metadata
WHERE user_id=user_base.id
ORDER BY time DESC
LIMIT 1
) AS metadata ON true
);
DROP SEQUENCE IF EXISTS id_sequence CASCADE;
CREATE SEQUENCE id_sequence START 101;
-- Core user object that is created
-- when the user is created
DROP TABLE IF EXISTS user_base CASCADE;
CREATE TABLE user_base (
id INTEGER PRIMARY KEY DEFAULT nextval('id_sequence'),
time TIMESTAMP WITH TIME ZONE DEFAULT clock_timestamp() -- NOW()
);
CREATE INDEX user_base_time_idx ON user_base(time);
CREATE INDEX user_base_id_time_idx ON user_base(id, time);
CREATE OR REPLACE FUNCTION first_or_null(anyelement, anyelement) RETURNS anyelement AS
$$
SELECT coalesce($1, $2);
$$ LANGUAGE SQL;
-- Polymorphic aggregate function
DROP AGGREGATE first_not_null (anyelement);
CREATE AGGREGATE first_not_null (anyelement)
(
sfunc = first_or_null,
stype = anyelement
);
DROP SEQUENCE IF EXISTS id_sequence CASCADE;
CREATE SEQUENCE id_sequence START 101;
-- Core user object that is created
-- when the user is created
DROP TABLE IF EXISTS user_base CASCADE;
CREATE TABLE user_base (
id INTEGER PRIMARY KEY DEFAULT nextval('id_sequence'),
time TIMESTAMP WITH TIME ZONE DEFAULT clock_timestamp() -- NOW()
);
CREATE INDEX user_base_time_idx ON user_base(time);
CREATE INDEX user_base_id_time_idx ON user_base(id, time);
DROP TABLE IF EXISTS user_credentials CASCADE;
CREATE TABLE user_credentials (
id INTEGER PRIMARY KEY DEFAULT nextval('id_sequence'),
time TIMESTAMP WITH TIME ZONE DEFAULT clock_timestamp(), --NOW(),
user_id INTEGER REFERENCES user_base(id),
email TEXT,
password_hash TEXT
);
CREATE INDEX user_credentials_time_idx ON user_credentials(time);
CREATE INDEX user_credentials_id_time_idx ON user_credentials(id, time);
CREATE INDEX user_credentials_user_id_idx ON user_credentials(user_id);
CREATE INDEX user_credentials_user_id_time_idx ON user_credentials(user_id, time);
DROP TABLE IF EXISTS user_metadata CASCADE;
CREATE TABLE user_metadata (
id integer PRIMARY KEY DEFAULT nextval('id_sequence'),
time TIMESTAMP WITH TIME ZONE DEFAULT clock_timestamp(), --NOW(),
user_id integer REFERENCES user_base(id),
firstname TEXT,
lastname TEXT
);
CREATE INDEX user_metadata_time_idx ON user_metadata(time);
CREATE INDEX user_metadata_id_time_idx ON user_metadata(id, time);
CREATE INDEX user_metadata_user_id_idx_idx ON user_metadata(user_id);
CREATE INDEX user_metadata_user_id_idx_time_idx ON user_metadata(user_id, time);
DROP TABLE IF EXISTS user_deletion CASCADE;
CREATE TABLE user_deletion (
id integer PRIMARY KEY DEFAULT nextval('id_sequence'),
time TIMESTAMP WITH TIME ZONE DEFAULT clock_timestamp(),
user_id integer REFERENCES user_base(id),
is_deleted boolean DEFAULT TRUE
);
CREATE INDEX user_deletion_id_idx ON user_deletion(id);
CREATE INDEX user_deletion_time_idx ON user_deletion(time);
CREATE INDEX user_deletion_user_id_idx ON user_deletion(user_id);
CREATE INDEX user_deletion_is_deleted_idx ON user_deletion(is_deleted)
CREATE INDEX user_deletion_user_id_time_idx ON user_deletion(user_id, time);
CREATE INDEX user_deletion_user_id_time_is_deleted_idx ON user_deletion(user_id, time, is_deleted);
DROP TABLE IF EXISTS user_credentials CASCADE;
CREATE TABLE user_credentials (
id INTEGER PRIMARY KEY DEFAULT nextval('id_sequence'),
time TIMESTAMP WITH TIME ZONE DEFAULT clock_timestamp(), --NOW(),
user_id INTEGER REFERENCES user_base(id),
email TEXT,
password_hash TEXT
);
CREATE INDEX user_credentials_time_idx ON user_credentials(time);
CREATE INDEX user_credentials_id_time_idx ON user_credentials(id, time);
CREATE INDEX user_credentials_user_id_idx ON user_credentials(user_id);
CREATE INDEX user_credentials_user_id_time_idx ON user_credentials(user_id, time);
DROP TABLE IF EXISTS user_deletion CASCADE;
CREATE TABLE user_deletion (
id integer PRIMARY KEY DEFAULT nextval('id_sequence'),
time TIMESTAMP WITH TIME ZONE DEFAULT clock_timestamp(),
user_id integer REFERENCES user_base(id),
is_deleted boolean DEFAULT TRUE
);
CREATE INDEX user_deletion_id_idx ON user_deletion(id);
CREATE INDEX user_deletion_time_idx ON user_deletion(time);
CREATE INDEX user_deletion_user_id_idx ON user_deletion(user_id);
CREATE INDEX user_deletion_is_deleted_idx ON user_deletion(is_deleted);
CREATE INDEX user_deletion_user_id_time_idx ON user_deletion(user_id, time);
CREATE INDEX user_deletion_user_id_time_is_deleted_idx ON user_deletion(user_id, time, is_deleted);
DROP TABLE IF EXISTS user_metadata CASCADE;
CREATE TABLE user_metadata (
id integer PRIMARY KEY DEFAULT nextval('id_sequence'),
time TIMESTAMP WITH TIME ZONE DEFAULT clock_timestamp(), --NOW(),
user_id integer REFERENCES user_base(id),
firstname TEXT,
lastname TEXT
);
CREATE INDEX user_metadata_time_idx ON user_metadata(time);
CREATE INDEX user_metadata_id_time_idx ON user_metadata(id, time);
CREATE INDEX user_metadata_user_id_idx_idx ON user_metadata(user_id);
CREATE INDEX user_metadata_user_id_idx_time_idx ON user_metadata(user_id, time);
-- Create a view that represents a user such that
-- every value is the most recent non-null value
-- for each requested column (if such a value exists)
CREATE OR REPLACE VIEW users AS (
SELECT user_base.*,
credentials.*,
metadata.*
FROM user_base
LEFT JOIN user_deletion ON user_deletion.user_id=user_base.id
LEFT JOIN LATERAL (
SELECT
first_not_null(email ORDER BY time DESC) AS email
FROM user_credentials
WHERE user_id=user_base.id
) AS credentials ON true
LEFT JOIN LATERAL (
SELECT
first_not_null(firstname ORDER BY TIME DESC) AS firstname,
first_not_null(lastname ORDER BY TIME DESC) AS lastname
FROM user_metadata
WHERE user_id=user_base.id
) AS metadata ON true
WHERE user_deletion.id IS NULL
);
import random
import hashlib
import argparse
import psycopg2
from faker import Faker
def add_credentials(cur, user_id, fake):
email = fake.email()
password = hashlib.sha224(email).hexdigest()
cur.execute("INSERT INTO user_credentials VALUES (DEFAULT, DEFAULT, %s, %s, %s) RETURNING id",
(user_id, email, password))
return cur.fetchone()[0]
def add_metadata(cur, user_id, fake):
firstname, lastname = fake.first_name(), fake.last_name()
cur.execute("INSERT INTO user_metadata VALUES (DEFAULT, DEFAULT, %s, %s, %s) RETURNING id",
(user_id, firstname, lastname))
return cur.fetchone()[0]
def delete_user(cur, user_id):
cur.execute("INSERT INTO user_deletion VALUES (DEFAULT, DEFAULT, %s, DEFAULT) RETURNING id",
(user_id,))
return cur.fetchone()[0]
def random_true(num_true=1, num_false=1):
return random.choice([True if i < num_true else False for i in range(num_true+num_false)])
def main():
fake = Faker()
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('--num_users', type=int, default=1000, help='Number of test users to create')
parser.add_argument('--database', help="Database Name")
parser.add_argument('--user', help="Database User")
parser.add_argument('--host', default='localhost', help="Database User")
args = parser.parse_args()
with psycopg2.connect(database=args.database, user=args.user, host=args.host) as conn:
with conn.cursor() as cur:
for i in range(args.num_users):
cur.execute("INSERT INTO user_base DEFAULT VALUES RETURNING id")
user_id = cur.fetchone()[0]
while random_true(2):
add_credentials(cur, user_id, fake)
while random_true(2):
add_metadata(cur, user_id, fake)
if random_true(1, 5):
delete_user(cur, user_id)
conn.commit()
if __name__ == '__main__':
main()
psql postgres postgres -f create_database.sql
psql immutable_database immutable -f create_functions.sql
psql immutable_database immutable -f create_base_user.sql
psql immutable_database immutable -f create_user_credentials.sql
psql immutable_database immutable -f create_user_metadata.sql
psql immutable_database immutable -f create_user_deletion.sql
psql immutable_database immutable -f create_user_view.sql
python generate_data.py --num_users 1000 --database immutable_database --user immutable
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment