Skip to content

Instantly share code, notes, and snippets.

@ahmadalsajid
Created September 25, 2019 10:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahmadalsajid/85c7283f5f546a68e8a439ba673df87d to your computer and use it in GitHub Desktop.
Save ahmadalsajid/85c7283f5f546a68e8a439ba673df87d to your computer and use it in GitHub Desktop.
from celery import Celery
from celery.schedules import crontab
import MySQLdb
import random
import string
import time
app = Celery('db_update', broker="pyamqp://guest@localhost//")
# disable UTC to use local time
app.conf.enable_utc = False
@app.task
def generate_data():
start = time.time()
try:
print('check db to track updates.')
db = MySQLdb.connect(user='root', passwd="qweqwe", db="celery_test")
c = db.cursor()
# c.execute("""SELECT * FROM `student_old`""")
# print(c.fetchall())
insert_query = """INSERT INTO `student_old` (`name`, `email`, `address`, `class`) VALUES (%s, %s, %s, %s)"""
addr_list = ['Dhaka', 'Rajshahi', 'Gazipur', 'Rangpur']
letters = string.ascii_lowercase
for i in range(200000):
print('id: ', i + 1)
rand_name = ''.join(random.choice(letters) for _ in range(10))
query_data = (rand_name, rand_name + '@venturenxt.com', random.choice(addr_list), random.randint(1, 10))
c.execute(insert_query, query_data)
db.commit()
db.close()
except Exception as e:
print(str(e))
print('Execution time taken: ', time.time() - start)
@app.task
def update_data():
start = time.time()
try:
print('check db to track updates.')
db = MySQLdb.connect(user='root', passwd="qweqwe", db="celery_test")
c = db.cursor()
offset, limit = 0, 30000
truncate_query = """TRUNCATE TABLE `student_new`"""
query_str = """SELECT * from `student_old` LIMIT %s , %s"""
insert_query = """INSERT INTO `student_new` (`id`, `name`, `email`, `address`, `class`) VALUES (%s, %s, %s, %s, %s)"""
try:
# Truncate the table first
c.execute(truncate_query)
db.commit()
while True:
# get data from old table
results = c.execute(query_str, (offset, limit))
print('Total retrieved data: ', results)
offset += limit
# rollback test
# if offset == 30000:
# raise Exception('Roll back test')
if not results:
print('Retrieved all data, exiting the function')
break
# insert into new table
data = c.fetchall()
c.executemany(insert_query, data)
db.commit()
except Exception as err:
print(str(err))
db.rollback()
db.close()
except Exception as e:
print(str(e))
print('Execution time taken: ', time.time() - start)
# add "update_data" task to the beat schedule
app.conf.beat_schedule = {
"sync-db": {
"task": "db_update.update_data",
"schedule": crontab(minute='*'),
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment