Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Last active June 13, 2022 23:25
Show Gist options
  • Save kmuthukk/4c7a7e77be023e80be08b838544903b8 to your computer and use it in GitHub Desktop.
Save kmuthukk/4c7a7e77be023e80be08b838544903b8 to your computer and use it in GitHub Desktop.
Sample load script for "fires" synthetic test data.
# Dependencies:
# On CentOS you can install psycopg2 thus:
#
# sudo yum install postgresql-libs
# sudo yum install python-psycopg2
import psycopg2;
import datetime;
from multiprocessing.dummy import Pool as ThreadPool
num_fires_per_thread=50000
num_threads=4
connect_string="host=127.0.0.1 dbname=yugabyte user=yugabyte port=5433"
def create_table():
conn = psycopg2.connect(connect_string)
conn.set_session(autocommit=True)
cur = conn.cursor()
cur.execute("""DROP TABLE IF EXISTS fires""");
cur.execute("""CREATE TABLE IF NOT EXISTS fires(
objectid integer,
fire_name text,
fire_year integer,
discovery_date double precision,
discovery_time text,
stat_cause_descr text,
fire_size double precision,
fire_size_class text,
latitude double precision,
longitude double precision,
state text,
county text,
discovery_date_j text,
discovery_date_d date,
PRIMARY KEY(objectid))
""")
print("Created fires table")
print("====================")
# cur.execute("""CREATE INDEX IF NOT EXISTS state_idx ON fires(state)""")
# print("Created name_idx on table")
def load_data_worker(thread_num):
thread_id = str(thread_num)
conn = psycopg2.connect(connect_string)
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Thread-" + thread_id + ": Inserting %d rows..." % (num_fires_per_thread))
num_inserts = 0
try:
for idx in range(num_fires_per_thread):
cur.execute("""INSERT INTO fires (objectid, fire_name, fire_year, discovery_date, discovery_time,"""
""" stat_cause_descr, fire_size, fire_size_class, latitude, longitude,"""
""" state, county, discovery_date_j, discovery_date_d) """
""" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
((idx + (num_fires_per_thread * thread_num)), # fire object id
"fire-"+thread_id+"-"+str(idx), # fire name
2000+thread_num, # fire year
2000+thread_num, # discovery date
str(2000+thread_num), # discovery time
"cause-"+thread_id+"-"+str(idx), # stat_cause_descr
(thread_num*idx+1234), # fire_size
"fire-size-class--"+str(idx % 100), # fire_size_class
idx, # latitude
idx, # longitude
"state-"+str(idx%50), # state
"county-"+str(idx%1000), # county
str(2000+thread_num), # discovery_date_j
datetime.date.today() - datetime.timedelta(365 * thread_num)
))
num_inserts += 1
if (idx % 10000 == 0):
print("Thread-" + thread_id + ": Inserted %d rows" % (num_inserts))
except Exception as e:
print("Unexpected exception: " + str(e))
print("Thread-" + thread_id + ": Inserted %d rows" % (num_inserts))
def load_data():
pool = ThreadPool(num_threads)
results = pool.map(load_data_worker, range(num_threads))
# Main
create_table()
load_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment