Skip to content

Instantly share code, notes, and snippets.

@aakashjsr
Created May 25, 2019 12:57
Show Gist options
  • Save aakashjsr/7ca0a9210b6e38f62cd874ace20c93ec to your computer and use it in GitHub Desktop.
Save aakashjsr/7ca0a9210b6e38f62cd874ace20c93ec to your computer and use it in GitHub Desktop.
import csv, psycopg2, re, json
class DataProcessor:
def __init__(self):
self.valid_date = "2019-05-21"
self.file_name = 'log-actions-follow_2019-05-20_08-50PM_subset.csv'
self.data = []
self.map = {
"Account": "account",
"Date": "action_datetime",
"Target": "target",
"Keyword": "keyword",
"Link": "link",
"UserId": "user_id",
"Followed Back": "followed_back",
"Was Unfollowed": "was_unfollowed",
"IsPrivate": "is_private",
"FollowBackRatio": "follow_back_ratio"
}
def generate_data(self):
print("Generating data from csv")
with open(self.file_name, mode='r', encoding='utf-8-sig') as fh:
rd = csv.DictReader(fh, delimiter=',')
for row in rd:
if row["Date"] == self.valid_date:
self.data.append(row)
def map_column_names(self):
print("mapping columns")
new_data = []
user_name_pattern = re.compile(r'\d+')
for old_row in self.data:
row = {}
for old_col, new_col in self.map.items():
if old_col == "UserId":
row[new_col] = user_name_pattern.search(old_row[old_col]).group()
continue
row[new_col] = old_row[old_col]
new_data.append(row)
self.data = new_data
def filter_old_records(self):
print("filtering old records")
cursor = self.conn.cursor()
query = """
SELECT account,action_datetime FROM logs_epsilon_actions_follow;"""
cursor.execute(query)
existing_records = []
for row in cursor.fetchall():
existing_records.append("{}-{}".format(row[0], row[1]))
print("for1",row)
filtered_data = []
for row in self.data:
print("for2",row)
if "{}-{}".format(row["account"], row["action_datetime"]) in existing_records:
print("for3",row["account"], row["action_datetime"])
continue
else:
filtered_data.append(row)
print("for4",row)
self.data = filtered_data
def connect_to_db(self):
self.conn = psycopg2.connect(host="db-postgresql-sfo2-33077-do-user-4341547-0.db.ondigitalocean.com",port=25060, database="defaultdb", user="doadmin", password="c2jn691m0vdkhegn")
print("Database connected")
def insert_data(self):
print("inserting data")
chunk_size = 1000
query = """
INSERT INTO logs_epsilon_actions_follow (account,action_datetime,target,keyword,link,user_id,followed_back,was_unfollowed,is_private,follow_back_ratio) SELECT account,action_datetime,target,keyword,link,user_id,followed_back,was_unfollowed,is_private,follow_back_ratio FROM json_populate_recordset(null::logs_epsilon_actions_follow, %s);"""
length = len(self.data)
start = 0
while start <= length:
rowset = self.data[start:start+chunk_size]
cursor = self.conn.cursor()
cursor.execute(query, (json.dumps(rowset), ))
start += chunk_size
self.conn.commit()
if len(rowset):
print("{} percent complete".format((start*100) / length))
def start(self):
self.generate_data()
self.map_column_names()
self.connect_to_db()
self.filter_old_records()
self.insert_data()
DataProcessor().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment