Skip to content

Instantly share code, notes, and snippets.

@dslaw
Last active August 9, 2019 21:25
Show Gist options
  • Save dslaw/ce1a72e53cc4eba48272d6524870d77f to your computer and use it in GitHub Desktop.
Save dslaw/ce1a72e53cc4eba48272d6524870d77f to your computer and use it in GitHub Desktop.
Example of CDC using a trigger.

Create a dev database:

docker run \
    -d \
    -p 8011:5432 \
    -e POSTGRES_USER=dev \
    -e POSTGRES_DB=dev \
    -e POSTGRES_PASSWORD=dev \
    --name=pubsub-example \
    postgres:11

psql --host=localhost --port=8011 --dbname=dev --user=dev -f schema.sql

Install dependencies:

pip install persist-queue psycopg2-binary

Run the example:

python subscriber.py &
python writer.py --number=50
create extension if not exists "uuid-ossp";
create table if not exists outbreak (
disease text not null,
region text not null,
infected integer not null,
dead integer not null,
treated integer not null,
modified_at timestamp not null,
check (infected >= 0),
check (dead >= 0),
check (treated >= 0),
primary key (disease, region)
);
create table if not exists table_audit (
id serial primary key,
tablename text not null,
status text not null,
modified_at timestamp not null
);
create or replace function cdc_publish() returns trigger as $$
begin
perform pg_notify(
'processed',
json_build_object(
'id', uuid_generate_v4()::text,
'op', tg_op,
'table', tg_table_schema || '.' || tg_table_name,
'data', row_to_json(new)
)::text
);
return null;
end;
$$ language plpgsql;
drop trigger if exists outbreak_cdc_publish on outbreak;
create trigger outbreak_cdc_publish
after insert or update or delete on outbreak
for each row execute procedure cdc_publish();
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import persistqueue
import psycopg2
import select
from writer import params
CHANNEL = "processed"
POLL_TIMEOUT = 5
def poll(conn, callback):
while True:
if select.select([conn], [], [], POLL_TIMEOUT) == ([], [], []):
continue
conn.poll()
while conn.notifies:
notification = conn.notifies.pop(0)
if notification.channel == CHANNEL:
print("Received: ", notification.payload)
callback(notification)
return
def push_message(queue, notification):
message = notification.payload
queue.put(message)
return
def main():
pq = persistqueue.SQLiteAckQueue("queue", auto_commit=True)
def handler(notification):
return push_message(pq, notification)
with psycopg2.connect(**params) as conn:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
cursor.execute(f"listen {CHANNEL}")
poll(conn, handler)
return
if __name__ == "__main__":
main()
from datetime import datetime
from time import sleep
import argparse
import psycopg2
import random
params = {
"host": "localhost",
"port": 8011,
"database": "dev",
"user": "dev",
"password": "dev",
}
def make_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--number", "-n", type=int, default=50)
return parser
def update_counts(previous):
out = {}
for field in ("infected", "dead", "treated"):
previous_count = previous[field]
change = random.randint(-10, 10)
applied = previous_count + change
out[field] = max(applied, 0)
return {**previous, **out}
def update_outbreak_status(cursor, state, timestamp):
current = update_counts(state)
cursor.execute("""
insert into outbreak
(disease, region, infected, dead, treated, modified_at)
values
(%(disease)s, %(region)s, %(infected)s, %(dead)s, %(treated)s, %(modified_at)s)
on conflict (disease, region) do update
set
infected = %(infected)s,
dead = %(dead)s,
treated = %(treated)s,
modified_at = %(modified_at)s
""", {
"modified_at": timestamp,
**current
})
return current
def update_outbreak_statuses(cursor, states):
modified_at = datetime.utcnow()
batch_size = random.choice([1, 2, 3])
indices = set(random.sample([0, 1, 2], k=batch_size))
updated = []
for idx, state in enumerate(states):
if idx not in indices:
updated.append(state)
continue
updated.append(
update_outbreak_status(cursor, state, modified_at)
)
cursor.execute("""
insert into table_audit
(tablename, status, modified_at)
values
('outbreak', 'Success', %(modified_at)s)
""", {"modified_at": modified_at})
return updated
def main():
parser = make_parser()
args = parser.parse_args()
initial_states = [
{
"disease": "Cholera",
"region": "Congo Basin",
"infected": 2,
"dead": 0,
"treated": 0,
}, {
"disease": "SARS",
"region": "Western China",
"infected": 20,
"dead": 4,
"treated": 2,
}, {
"disease": "Avian Flu",
"region": "Southern China",
"infected": 4,
"dead": 0,
"treated": 1,
}
]
random.seed(13)
with psycopg2.connect(**params) as conn:
cursor = conn.cursor()
cursor.execute("truncate table outbreak")
cursor.execute("truncate table table_audit")
conn.commit()
states = initial_states
for _ in range(args.number):
states = update_outbreak_statuses(cursor, states)
conn.commit()
delay_ms = random.randrange(500, 1_200)
sleep(delay_ms / 1_000)
return
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment