Skip to content

Instantly share code, notes, and snippets.

@edvardm
Last active April 28, 2022 20:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save edvardm/661522923300a1fc657ac7480530faa7 to your computer and use it in GitHub Desktop.
Save edvardm/661522923300a1fc657ac7480530faa7 to your computer and use it in GitHub Desktop.
single producer/consumer skeleton for keeping Queue full of db rows to process
# Using sample data from https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads
# See https://wiki.postgresql.org/wiki/Sample_Databases for table schema & instructions how to populate
# Simple script to test the idea of keeping Queue full of database rows ready for consumption, using appropriate-sized
# chunks of rows
import multiprocessing as mp
import queue
import psycopg2
MAX_QUEUE_SIZE = 8
BATCH_SIZE = 5000
TIMEOUT = 30
db_conn = psycopg2.connect("postgresql://postgres:sekrit@localhost/test")
def consume(q):
while True:
try:
batch = q.get(block=True, timeout=TIMEOUT)
process_batch(batch)
except queue.Empty:
print("All done, I'm outta here")
break
def run():
q = mp.Queue(maxsize=MAX_QUEUE_SIZE)
processes = []
# create single producer, but would be simple to scale to multiple given reasonable way
# to partition entries
proc = mp.Process(target=fetch_from_db, args=(q,))
proc.start()
processes.append(proc)
# create single consumer, but could be simple to scale given this and that
proc = mp.Process(target=consume, args=(q,))
proc.start()
processes.append(proc)
# wait for processes to finish
print("reap threads")
for process in processes:
process.join()
q.close()
def fetch_from_db(q):
cursor = db_conn.cursor()
print("Executing query")
cursor.execute(
"SELECT transaction, price, transfer_date, postcode, street, city from land_registry_price_paid_uk"
)
count = 0
idx = 0
while True:
idx += 1
batch = cursor.fetchmany(BATCH_SIZE)
count += len(batch)
if not batch:
print(f"*** Producer done after {count} entries")
break
print(f"Inserting batch {idx}, head {batch[0]}", end="... ")
q.put(batch)
print("done")
def process_batch(chunk):
print(f"processing chunk of size {len(chunk)}, head {chunk[0]}", end="... ")
with open("tmp.dat", "a") as fh:
for row in chunk:
fh.write(f"{row}\n")
print("done")
if __name__ == "__main__":
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment