Skip to content

Instantly share code, notes, and snippets.

@gabefair
Last active September 25, 2018 05:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gabefair/26139ba91c13b5018940c6a0ce5a330d to your computer and use it in GitHub Desktop.
Save gabefair/26139ba91c13b5018940c6a0ce5a330d to your computer and use it in GitHub Desktop.
Json to Mongo file importer
# pylint: disable=unsubscriptable-object
import sys, os, json, datetime, platform, argparse, traceback, mechanize
from time import time
try:
import mechanize
except ImportError:
print("Please run `pip install mechanize` from a command line")
exit()
try:
import pymongo
except ImportError:
print("Please run `pip install pymongo` from a command line ")
exit()
try:
import scrape_posts
import scrape_comments
except ImportError:
print("Please run in the same directory as scrape_posts and scrape_comments")
exit()
try:
import dateutil.relativedelta
except ImportError:
print("Please run `pip install python-dateutil` from a command line ")
exit()
# from bson import json_util
from os.path import basename
try:
from exceptions import WindowsError
import curses
except ImportError:
class WindowsError(OSError):
pass
except:
pass
# Global Variables
total_imported = 0
any_errors = False
errors_array = []
error_ids = []
skipped_files = 0
response = 0
def update_progress(total_imported, skipped_files, current_location):
text = str(total_imported) + "/" + str(total_imported + skipped_files) + " imported. " + str(
len(error_ids)) + " errors. Looking in folder: " + current_location
sys.stdout.write('\r' + text)
sys.stdout.flush()
def report_last_file(full_path, response):
global total_imported
print("\nLast file imported: " + str(total_imported) + " At: " + full_path + " with mongodb response: " + str(
response))
return
def import_files(run_location, import_type, folder_path, database, destination_coll, connection_url):
global total_imported, any_errors, error_ids, errors_array, skipped_files, response
total_imported = 0
any_errors = False
errors_array = []
error_ids = []
skipped_files = 0
client = pymongo.MongoClient("mongodb://"+connection_url+":27017")
db = client[database]
collection_conn = db[destination_coll]
for root, dirs, files in os.walk(folder_path):
full_base = os.path.join(run_location, root) # type: str
if import_type in root:
try:
update_progress(total_imported, skipped_files, root)
for name in files:
if name.endswith((".json")) and not name == "auth.json":
full_path = os.path.join(full_base, name)
timestamp_of_json_collected = os.path.getmtime(
full_path) # Date created is not reliable as that could be the date it was copied to a folder, getting modified date instead
collected_on = datetime.datetime.fromtimestamp(timestamp_of_json_collected)
with open(full_path) as json_file:
json_data = json.load(json_file)
document = {}
document['id'] = os.path.splitext(name)[0]
document['collected_on'] = collected_on.strftime('%Y-%m-%d %H:%M:%S')
document['data'] = json_data
document['type'] = import_type
try:
response = collection_conn.insert_one(document)
total_imported += 1
except pymongo.errors.DuplicateKeyError:
skipped_files += 1
pass
del json_data, json_file
except KeyboardInterrupt:
report_last_file(full_path, response)
save_error_ids(len(error_ids), run_location, import_type)
try:
sys.exit(0)
except SystemExit:
os._exit(0)
exit()
except ValueError:
any_errors = True
error_msg = "Broken File: " + full_path
error_ids.append(basename(name))
errors_array.append(error_msg)
pass
except WindowsError:
raise
except:
report_last_file(full_path, response)
save_error_ids(len(error_ids), run_location, import_type)
raise
update_progress(total_imported, skipped_files, root)
def login(username="", password=""):
if not len(username) or not len(password):
auth_data = json.load(open("auth.json"))
try:
username = auth_data["username"]
except:
print "No username specified."
return
try:
password = auth_data["password"]
except:
print "No password specified."
return
browser = mechanize.Browser()
browser.set_handle_robots(False)
browser.set_handle_refresh(False)
browser.addheaders = [("User-agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36")]
r = browser.open("https://gab.ai/auth/login")
browser.select_form(nr=1)
browser["username"] = username
browser["password"] = password
r = browser.submit()
return browser
def start_scrape(jsons, import_type):
browser = login()
if browser is not None:
if (import_type == "users"):
process_users(browser, jsons)
else:
process_submissions(browser, jsons, import_type)
else:
print "Failed login."
def process_submissions(browser, jsons, import_type):
""" Scrapes the specified posts. """
fail = 0
j = 0
k = 0
for i in jsons:
# Check if the post already exists.
num = str(i)
ones = num[-1]
tens = num[-2:]
hundreds = num[-3:]
# Make directory structure if necessary.
error_dir = import_type + "_errors"
if not os.path.exists(error_dir):
os.makedirs(error_dir)
if not os.path.exists(error_dir + "/" + ones):
os.makedirs(error_dir + "/" + ones)
if not os.path.exists(error_dir + "/" + ones + "/" + tens):
os.makedirs(error_dir + "/" + ones + "/" + tens)
if not os.path.exists(error_dir + "/" + ones + "/" + tens + "/" + hundreds):
os.makedirs(error_dir + "/" + ones + "/" + tens + "/" + hundreds)
# Read the post
try:
if import_type == 'posts':
r = browser.open("https://gab.ai/posts/" + str(i))
elif import_type == 'comments':
# r = browser.open("https://gab.ai/posts/" + str(i) + "/comments?sort=score") #old version
r = browser.open("https://gab.ai/posts/" + str(i) + "/comments/index?sort=score")
else:
print("Unexpected scrape type. Must be posts or comments")
exit()
data = r.read()
with open(error_dir + "/" + ones + "/" + tens + "/" + hundreds + "/" + str(i) + ".json", "w") as f:
f.write(data)
except mechanize.HTTPError as error_data:
if isinstance(error_data.code, int) and error_data.code == 429:
print "ALERT TOO MANY REQUESTS SHUT DOWN"
print i
sys.exit(-1)
return
elif isinstance(error_data.code, int) and error_data.code == 404:
# print "Gab post deleted or ID not allocated"
# print i
fail = fail + 1
elif isinstance(error_data.code, int) and error_data.code == 400:
# print "Invalid request -- possibly a private Gab post?"
# print i
fail = fail + 1
else:
print error_data.code
print traceback.format_exc()
print "ERROR: DID NOT WORK"
print i
except:
print traceback.format_exc()
print "ERROR: STILL DID NOT WORK"
print i
def process_users(browser, user_names):
""" Scrapes the specified posts. """
global error_ids
j = 0
k = 0
total_imported = 0
skipped_files = 0
total_users = len(user_names)
for i in user_names:
high_intensity_user = 0
# Check if the post already exists.
prefix = i[0:2].lower()
if os.path.isfile("users/" + prefix + "/" + i + ".json"):
print "Already have user " + i + ". Skipping."
skipped_files += 1
continue
# Make directory structure if necessary.
if not os.path.exists("users"):
os.makedirs("users")
if not os.path.exists("users/" + prefix):
os.makedirs("users/" + prefix)
# Read the user
try:
# print str(i), "user page"
r = browser.open("https://gab.ai/users/" + str(i))
user_data = json.loads(r.read())
r = browser.open("https://gab.ai/users/" + str(i) + "/followers")
# print str(i), "follower page"
follower_data = json.loads(r.read())
if not follower_data["no-more"]:
page = 1
done = 0
while not done and page < 1500:
min_back = page * 30
r = browser.open("https://gab.ai/users/" + str(i) + "/followers?before=" + str(min_back))
page = page + 1
follower_page = json.loads(r.read())
if follower_page["no-more"]:
done = 1
follower_data["data"].extend(follower_page["data"])
if page % 10 == 1:
# print str(i), "follower page", str(page)
time.sleep(3)
high_intensity_user = 1
else:
time.sleep(0.1)
r = browser.open("https://gab.ai/users/" + str(i) + "/following")
# print str(i), "following page"
following_data = json.loads(r.read())
if i == "aux":
continue
if not following_data["no-more"]:
page = 1
done = 0
while not done and page < 1500:
min_back = page * 30
r = browser.open("https://gab.ai/users/" + str(i) + "/following?before=" + str(min_back))
page = page + 1
following_page = json.loads(r.read())
if following_page["no-more"]:
done = 1
following_data["data"].extend(following_page["data"])
if page % 10 == 1:
# print str(i), "following page", str(page)
time.sleep(3)
high_intensity_user = 1
else:
time.sleep(0.1)
data = {"user": user_data, "followers": follower_data, "following": following_data}
with open("users/" + prefix + "/" + str(i) + ".json", "w") as f:
json.dump(data, f)
# print data
# print i
# print ""
total_imported += 1
# Error handling.
except mechanize.HTTPError as error_code:
if isinstance(error_code.code, int) and error_code.code == 429:
print "TOO MANY REQUESTS. SHUT DOWN."
print i
sys.exit(-1)
return
# elif isinstance(error_code.code, int) and error_code.code == 404:
# print "Gab post deleted or ID not allocated"
# print i
# elif isinstance(error_code.code, int) and error_code.code == 400:
# print "Invalid request -- possibly a private Gab post?"
# print i
else:
print error_code.code
print traceback.format_exc()
print "ERROR: DID NOT WORK"
time.sleep(random.randint(1, 10))
error_ids.append(i)
print i
except:
print traceback.format_exc()
print "ERROR: STILL DID NOT WORK"
print i
# Pausing between jobs.
pause_timer = random.randint(1, 100)
if pause_timer >= 99:
# print "Waiting..."
time.sleep(random.randint(1, 3))
k = k + 1
j = j + 1
if k >= 15000:
# print "Long break."
time.sleep(random.randint(1, 10))
k = 0
if high_intensity_user:
print "Tough job, time to take a break."
time.sleep(random.randint(20, 30))
update_progress(total_imported, total_users, i)
if (any_errors):
save_error_ids(str(len(error_ids)), base_path, import_type)
def redownload_jsons(error_ids, import_type):
print("Redownloading " + import_type + " that had an error")
jsons = []
for json_id in error_ids:
jsons.append(json_id[:-5])
if import_type == "posts" or import_type == "post":
start_scrape(jsons, "posts")
elif import_type == "comments" or import_type == "comment":
start_scrape(jsons, "comments")
elif import_type == "users" or import_type == "user":
start_scrape(jsons, "users")
print "Reimporting now"
error_dir = import_type + "_errors"
base_path = os.path.split(os.path.abspath(os.path.realpath(sys.argv[0])))[0]
import_files(base_path, import_type, error_dir, sys.argv[3], sys.argv[4], sys.argv[5])
def save_error_ids(error_len, base_path, import_type):
global error_ids
save_loc = os.path.join(base_path, sys.argv[1] + '_import_errors.txt')
print("But there were " + str(
error_len) + " errors \n A file has been created at " + save_loc + " with all the ids of the " + import_type + " that were not collected correctly")
f = open(save_loc, 'w')
f.write(str(error_ids))
f.close()
redownload_jsons(error_ids, import_type)
def find_os():
return platform.system()
def rename_folder(input_folder):
full_address = os.path.realpath(input_folder)
new_folder_name = input_folder + "_v2_import_complete"
os.rename(input_folder, new_folder_name)
print("Renamed: " + str(input_folder) + " to: " + new_folder_name)
def fix_import_type(import_type):
if import_type == "posts" or import_type == "post":
import_type = "posts"
elif import_type == "comments" or import_type == "comment":
import_type = "comments"
elif import_type == "users" or import_type == "user":
import_type = "users"
else:
print("Can only import posts, users, or comments")
exit()
print("This script will skip the import of any json files that are not " + import_type)
return import_type
def main():
arg_length = len(sys.argv)
if (arg_length < 6):
print(
"Please specify at least 5 parameters: \n\t\t v2_import.py <posts|comments|users> <folder path> <mongodb database> <mongodb collection name> <connection_url> <Don't rename folder>")
exit()
start_time = datetime.datetime.fromtimestamp(time())
print("Starting Import with the following parameters: " + str(sys.argv))
base_path = os.path.split(os.path.abspath(os.path.realpath(sys.argv[0])))[0]
import_type = fix_import_type(sys.argv[1])
import_files(base_path, import_type, sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5])
end_time = datetime.datetime.fromtimestamp(time())
total_time = dateutil.relativedelta.relativedelta(end_time, start_time)
print "\nImport completed in: %d days, %d hours, %d minutes and %d seconds" % (
total_time.days, total_time.hours, total_time.minutes, total_time.seconds)
if (any_errors):
save_error_ids(str(len(error_ids)), base_path, import_type)
if(arg_length <= 5) and (not sys.argv[6]):
print("renaming input folder to reflect completed import")
rename_folder(sys.argv[2])
if __name__ == '__main__':
os.system('mode con: cols=150 lines=14')
main()
@gabefair
Copy link
Author

gabefair commented Jun 4, 2018

Example: python v2_import.py comment data\2018\comments research_db comment_collection
Parameters explained:

  1. the message type: posts or comments
  2. A relative path to the parent folder
  3. Mongo dabtabase name
  4. Mongo collection name

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment