Created
May 25, 2019 12:57
-
-
Save aakashjsr/7ca0a9210b6e38f62cd874ace20c93ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv, psycopg2, re, json | |
class DataProcessor: | |
def __init__(self): | |
self.valid_date = "2019-05-21" | |
self.file_name = 'log-actions-follow_2019-05-20_08-50PM_subset.csv' | |
self.data = [] | |
self.map = { | |
"Account": "account", | |
"Date": "action_datetime", | |
"Target": "target", | |
"Keyword": "keyword", | |
"Link": "link", | |
"UserId": "user_id", | |
"Followed Back": "followed_back", | |
"Was Unfollowed": "was_unfollowed", | |
"IsPrivate": "is_private", | |
"FollowBackRatio": "follow_back_ratio" | |
} | |
def generate_data(self): | |
print("Generating data from csv") | |
with open(self.file_name, mode='r', encoding='utf-8-sig') as fh: | |
rd = csv.DictReader(fh, delimiter=',') | |
for row in rd: | |
if row["Date"] == self.valid_date: | |
self.data.append(row) | |
def map_column_names(self): | |
print("mapping columns") | |
new_data = [] | |
user_name_pattern = re.compile(r'\d+') | |
for old_row in self.data: | |
row = {} | |
for old_col, new_col in self.map.items(): | |
if old_col == "UserId": | |
row[new_col] = user_name_pattern.search(old_row[old_col]).group() | |
continue | |
row[new_col] = old_row[old_col] | |
new_data.append(row) | |
self.data = new_data | |
def filter_old_records(self): | |
print("filtering old records") | |
cursor = self.conn.cursor() | |
query = """ | |
SELECT account,action_datetime FROM logs_epsilon_actions_follow;""" | |
cursor.execute(query) | |
existing_records = [] | |
for row in cursor.fetchall(): | |
existing_records.append("{}-{}".format(row[0], row[1])) | |
print("for1",row) | |
filtered_data = [] | |
for row in self.data: | |
print("for2",row) | |
if "{}-{}".format(row["account"], row["action_datetime"]) in existing_records: | |
print("for3",row["account"], row["action_datetime"]) | |
continue | |
else: | |
filtered_data.append(row) | |
print("for4",row) | |
self.data = filtered_data | |
def connect_to_db(self): | |
self.conn = psycopg2.connect(host="db-postgresql-sfo2-33077-do-user-4341547-0.db.ondigitalocean.com",port=25060, database="defaultdb", user="doadmin", password="c2jn691m0vdkhegn") | |
print("Database connected") | |
def insert_data(self): | |
print("inserting data") | |
chunk_size = 1000 | |
query = """ | |
INSERT INTO logs_epsilon_actions_follow (account,action_datetime,target,keyword,link,user_id,followed_back,was_unfollowed,is_private,follow_back_ratio) SELECT account,action_datetime,target,keyword,link,user_id,followed_back,was_unfollowed,is_private,follow_back_ratio FROM json_populate_recordset(null::logs_epsilon_actions_follow, %s);""" | |
length = len(self.data) | |
start = 0 | |
while start <= length: | |
rowset = self.data[start:start+chunk_size] | |
cursor = self.conn.cursor() | |
cursor.execute(query, (json.dumps(rowset), )) | |
start += chunk_size | |
self.conn.commit() | |
if len(rowset): | |
print("{} percent complete".format((start*100) / length)) | |
def start(self): | |
self.generate_data() | |
self.map_column_names() | |
self.connect_to_db() | |
self.filter_old_records() | |
self.insert_data() | |
DataProcessor().start() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment