Created
September 25, 2019 10:09
-
-
Save ahmadalsajid/85c7283f5f546a68e8a439ba673df87d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery | |
from celery.schedules import crontab | |
import MySQLdb | |
import random | |
import string | |
import time | |
app = Celery('db_update', broker="pyamqp://guest@localhost//") | |
# disable UTC to use local time | |
app.conf.enable_utc = False | |
@app.task | |
def generate_data(): | |
start = time.time() | |
try: | |
print('check db to track updates.') | |
db = MySQLdb.connect(user='root', passwd="qweqwe", db="celery_test") | |
c = db.cursor() | |
# c.execute("""SELECT * FROM `student_old`""") | |
# print(c.fetchall()) | |
insert_query = """INSERT INTO `student_old` (`name`, `email`, `address`, `class`) VALUES (%s, %s, %s, %s)""" | |
addr_list = ['Dhaka', 'Rajshahi', 'Gazipur', 'Rangpur'] | |
letters = string.ascii_lowercase | |
for i in range(200000): | |
print('id: ', i + 1) | |
rand_name = ''.join(random.choice(letters) for _ in range(10)) | |
query_data = (rand_name, rand_name + '@venturenxt.com', random.choice(addr_list), random.randint(1, 10)) | |
c.execute(insert_query, query_data) | |
db.commit() | |
db.close() | |
except Exception as e: | |
print(str(e)) | |
print('Execution time taken: ', time.time() - start) | |
@app.task | |
def update_data(): | |
start = time.time() | |
try: | |
print('check db to track updates.') | |
db = MySQLdb.connect(user='root', passwd="qweqwe", db="celery_test") | |
c = db.cursor() | |
offset, limit = 0, 30000 | |
truncate_query = """TRUNCATE TABLE `student_new`""" | |
query_str = """SELECT * from `student_old` LIMIT %s , %s""" | |
insert_query = """INSERT INTO `student_new` (`id`, `name`, `email`, `address`, `class`) VALUES (%s, %s, %s, %s, %s)""" | |
try: | |
# Truncate the table first | |
c.execute(truncate_query) | |
db.commit() | |
while True: | |
# get data from old table | |
results = c.execute(query_str, (offset, limit)) | |
print('Total retrieved data: ', results) | |
offset += limit | |
# rollback test | |
# if offset == 30000: | |
# raise Exception('Roll back test') | |
if not results: | |
print('Retrieved all data, exiting the function') | |
break | |
# insert into new table | |
data = c.fetchall() | |
c.executemany(insert_query, data) | |
db.commit() | |
except Exception as err: | |
print(str(err)) | |
db.rollback() | |
db.close() | |
except Exception as e: | |
print(str(e)) | |
print('Execution time taken: ', time.time() - start) | |
# add "update_data" task to the beat schedule | |
app.conf.beat_schedule = { | |
"sync-db": { | |
"task": "db_update.update_data", | |
"schedule": crontab(minute='*'), | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment