-
-
Save jatindhankhar/91ea2d79674e6874f68b87e917e8353a to your computer and use it in GitHub Desktop.
A very messy script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Requires Python 3.2 and onwards | |
import sqlsoup | |
import sys | |
import itertools | |
import os | |
from imgurpython import ImgurClient | |
import tempfile | |
import hashlib | |
from imgurpython import ImgurClient | |
from operator import is_not | |
from functools import partial | |
import argparse | |
from glob import glob | |
# Place it in inside aslo-v3 root folder for imports | |
from aslo.service import activity as activity_service | |
from aslo.models.activity import ActivityModel, DeveloperModel | |
from aslo.models.release import ReleaseModel | |
db = sqlsoup.SQLSoup("mysql://{}:{}@{}/{}".format("root", | |
"YourPassword", "localhost", "activities")) | |
IMGUR_CLIENT_ID = "XXXXXXXXXXXXXXXXXXXXX" | |
IMGUR_CLIENT_SECRET = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXX" | |
imgur_client = ImgurClient( | |
IMGUR_CLIENT_ID, IMGUR_CLIENT_SECRET | |
) | |
POPULAR_CATEGORIES = ["programming", "robotics", "internet", "science", "maths", | |
"language", "geography", "documents", "music", "media", "art", "teacher", "system"] | |
FALLBACK_ICON = db.addons.first().icondata | |
FALLBACK_ICON_TYPE = db.addons.first().icontype | |
def save_to_temp_file(img_data): | |
file_location = tempfile.mktemp() + ".png" | |
with open(file_location, "wb") as f: | |
f.write(img_data) | |
return file_location | |
# https://stackoverflow.com/a/26492671/3455743 | |
def fix_encoding(target_string): | |
# Ignore to suppress errros | |
if target_string is None: | |
return " " | |
try: | |
# MySql uses latin1 by default :( | |
return target_string.encode('latin1', 'ignore').decode('utf8') | |
except UnicodeError as e: | |
# If you cannot encode it, leave it as it is :( | |
return target_string | |
def upload_img_to_imgur(image_path): | |
print("Uploading image .. {}".format(image_path)) | |
result = imgur_client.upload_from_path(image_path) | |
return (result['link'], result['deletehash']) | |
def get_img_hash(img_path, blocksize=2**20): | |
h = hashlib.sha1() | |
with open(img_path, 'rb') as f: | |
for chunk in iter(lambda: f.read(blocksize), b''): | |
h.update(chunk) | |
return str(h.hexdigest()) | |
def get_screenshots(addon_id): | |
print("Processing Screenshots ") | |
# Limit to 5 screenshots | |
images = db.previews.filter(db.previews.addon_id == addon_id).limit(5) | |
new_screenshots = {} | |
# For now default to en | |
lang = 'en' | |
new_screenshots['en'] = {} | |
for image in images: | |
image = save_to_temp_file(image.filedata) | |
_hash = get_img_hash(image) | |
new_screenshots[lang][_hash] = {} | |
link, deletehash = upload_img_to_imgur(image) | |
import time | |
time.sleep(8) | |
new_screenshots[lang][_hash] = (link, deletehash) | |
return new_screenshots | |
# Setup log directories | |
# Python 3.2 onwards | |
os.makedirs("faulty_addons/", exist_ok=True) | |
os.makedirs("good_addons/", exist_ok=True) | |
import mongoengine as me | |
me.connect('aslo') | |
def make_user_hash(user_info): | |
user_hash = dict.fromkeys(['name', 'page', 'email', 'avatar'], None) | |
user_hash["name"] = fix_encoding( | |
user_info.firstname) + ' ' + fix_encoding(user_info.lastname) | |
user_hash["page"] = (user_info.homepage or None) | |
user_hash["email"] = user_info.email | |
user_hash["avatar"] = 'https://avatars2.githubusercontent.com/u/3996398?v=4' | |
return user_hash | |
def modify_locale_underscore(translation_entry): | |
# Both tuples and dict keys are immutable :( | |
new_locale, localized_string = translation_entry[0].replace( | |
"-", "_"), fix_encoding(translation_entry[1]) | |
return (new_locale, localized_string) | |
def convert_translations(translation_id, without_locale=False, singular=False): | |
# Handle Null/None translation ids | |
if translation_id is None: | |
return None | |
if without_locale: | |
if singular: | |
translation = db.translations.with_entities(db.translations.localized_string).filter( | |
db.translations.id == translation_id).first() | |
else: | |
translation = db.translations.with_entities(db.translations.localized_string).filter( | |
db.translations.id == translation_id).all() | |
return translation | |
else: | |
translation = db.translations.with_entities(db.translations.locale, db.translations.localized_string).filter( | |
db.translations.id == translation_id).all() | |
translation = list(map(modify_locale_underscore, translation)) | |
return dict(translation) | |
def find_developers(addon_id): | |
user_ids = db.addons_users.with_entities(db.addons_users.user_id).filter( | |
db.addons_users.addon_id == addon_id).all() | |
user_lists = [] | |
for user in user_ids: | |
user_info = db.users.filter(db.users.id == user[0]).one() | |
user_lists.append(make_user_hash(user_info)) | |
return user_lists | |
def get_bundle_details(version_id): | |
result = db.files.with_entities(db.files.filename, db.files.created).filter( | |
db.files.version_id == version_id) | |
if result.count() > 0: | |
return result.one() | |
else: | |
return None, None | |
# App version id is different from version id | |
def get_sugar_version(app_version_id): | |
return db.appversions.filter(db.appversions.id == app_version_id).one().version | |
def get_license(license_id): | |
FALLBACK_LICENSE = None | |
if license_id is None: | |
return FALLBACK_LICENSE | |
text_id = db.licenses.filter(db.licenses.id == license_id).one().text | |
if text_id is None: | |
return FALLBACK_LICENSE | |
# Strip license with long texts and return header only | |
license_text = convert_translations(text_id, without_locale=True, singular=True)[ | |
0].strip().split("\r")[0].strip() | |
if not license_text: | |
return FALLBACK_LICENSE | |
else: | |
return license_text | |
def flatten_singly_tuple_list(target_list): | |
def single_out(el): return el[0] | |
return list(map(single_out, target_list)) | |
def get_sugar_info(version_id): | |
sugar_info = dict.fromkeys( | |
['is_web', 'is_gtk3', 'has_old_toolbars'], False) | |
version_info = db.applications_versions.filter( | |
db.applications_versions.version_id == version_id) | |
if version_info.count() > 0: | |
sugar_info['min_sugar_version'] = get_sugar_version( | |
version_info.one().min) | |
else: | |
sugar_info['min_sugar_version'] = 0.0 | |
return sugar_info | |
def get_category_text(category_id): | |
translation_id = db.categories.filter( | |
db.categories.id == category_id).one().name | |
# Get only en_US translations | |
category = convert_translations( | |
translation_id, without_locale=False).get('en_US', "").lower() | |
if category in POPULAR_CATEGORIES: | |
return category | |
def get_addon_categories(addon_id): | |
category_ids = db.addons_categories.with_entities( | |
db.addons_categories.category_id).filter(db.addons_categories.addon_id == addon_id).all() | |
category_ids = flatten_singly_tuple_list(category_ids) | |
categories = list(map(get_category_text, category_ids)) | |
# Strip any None/Falsy values | |
categories = filter(partial(is_not, None), categories) | |
# Flattens a nested list and join them into space separated string | |
# Probably not the best way to it | |
return " " .join(list(categories)) | |
def generate_old_download_url(bundle_name, addon_id): | |
return "http://activities.sugarlabs.org/activities/{}/{}".format(addon_id, bundle_name) | |
def get_addon_releases_info(addon, process_screenshots=False): | |
addon_id = addon.id | |
releases = [] | |
def isFloat(el): | |
try: | |
float(el[1]) | |
return True | |
except: | |
with open("faulty_addons/{}.log".format(addon_id), 'a') as f: | |
print("Skipping version id {} and activity version {}. Reason: Invalid/non-float version number ".format( | |
el[0], el[1]), file=f) | |
return False | |
version_info = db.versions.with_entities(db.versions.id, | |
db.versions.version, db.versions.releasenotes, db.versions.license_id).filter(db.versions.addon_id == addon_id).all() | |
# Remove non float versions | |
version_info = [el for el in version_info if isFloat(el)] | |
# Couldn't sqlachemy way of supplying custom function of order, there is one out there but I did a hack of sorting the result | |
# Sort by float value of versions | |
# We are sorting to avoid error by activity_service | |
version_info = sorted(version_info, key=lambda el: float(el[1])) | |
# Convert release notes/ | |
i18n_name = convert_translations(addon.name) | |
i18n_summary = convert_translations(addon.summary) | |
# Add placeholder in case of no summary | |
if i18n_summary is None: | |
i18n_summary = {'en': 'No Summary '} | |
homepage = convert_translations( | |
addon.homepage, without_locale=True, singular=True) | |
# Handle empty links | |
if homepage is not None: | |
homepage = homepage[0] | |
# Else fallback to support url and then to emptyd | |
else: | |
supporturl = convert_translations( | |
addon.supporturl, without_locale=True, singular=True) | |
if supporturl is not None: | |
homepage = supporturl[0] | |
else: | |
homepage = "https://sugarlabs.org" | |
developers = find_developers(addon_id) | |
icon_type = addon.icontype | |
icon_bin = addon.icondata | |
categories = get_addon_categories(addon_id) | |
bundle_id = addon.guid | |
if process_screenshots: | |
screenshots = get_screenshots(addon_id) | |
# First look for license in the suggested amount | |
license = addon.suggested_amount | |
for version_id, version, releasenote, license_id in version_info: | |
release_info = {} | |
release_info['release'] = {} | |
release_info['screenshots'] = {} | |
releasenote = convert_translations( | |
releasenote, without_locale=True, singular=True) | |
release_info['sugar'] = get_sugar_info(version_id) | |
release_info['activity_version'] = version | |
release_info['categories'] = categories | |
bundle_name, release_info['release']['time'] = get_bundle_details( | |
version_id) | |
if bundle_name is None: | |
with open("faulty_addons/{}.log".format(addon_id), 'a') as f: | |
print("Skipping version id {} and activity version {}. Reason: No bundle file found ".format( | |
version_id, version), file=f) | |
continue | |
# Priortize suggested amount, if None then only get license from license id | |
if license is None: | |
release_info['license'] = get_license(license_id) | |
else: | |
release_info['license'] = license | |
release_info['download_url'] = generate_old_download_url( | |
bundle_name=bundle_name, addon_id=addon_id) | |
release_info['bundle_name'] = bundle_name | |
release_info['i18n_name'] = i18n_name | |
release_info['i18n_summary'] = i18n_summary | |
release_info['developers'] = developers | |
release_info['repository'] = homepage | |
if release_info['repository'] is None: | |
release_info['repository'] = "https://sugarlabs.org" | |
release_info['icon_type'] = icon_type | |
release_info['icon_bin'] = icon_bin | |
release_info['bundle_id'] = bundle_id | |
if process_screenshots: | |
release_info['screenshots'] = screenshots | |
if releasenote is not None: | |
release_info['release']['notes'] = fix_encoding(releasenote[0]) | |
else: | |
release_info['release']['notes'] = " " | |
# If icon_bin use fallback Icon | |
if icon_bin is None: | |
release_info['icon_type'] = FALLBACK_ICON_TYPE | |
release_info['icon_bin'] = FALLBACK_ICON | |
# Skip Activities with unknown, undefined, and no licenses | |
if release_info['license'] is not None: | |
releases.append(release_info) | |
with open("good_addons/{}.log".format(addon_id), 'a') as f: | |
print("Added to Database . version id {} and activity version {}".format( | |
version_id, version), file=f) | |
# print(release_info) | |
activity_service.insert_activity(release_info) | |
else: | |
with open("faulty_addons/{}.log".format(addon_id), 'a') as f: | |
print("Skipping version id {} and activity version {}. Reason: Unknown license ".format( | |
version_id, version), file=f) | |
return releases | |
def extract_addon_id_from_log(log_file): | |
return os.path.splitext(os.path.basename(log_file))[0] | |
def patch_existing_addons_with_screenshots(skip_before_bundle_id=None): | |
activities = list(ActivityModel.objects()) | |
if skip_before_bundle_id: | |
skip_index = [el.bundle_id for el in activities].index(skip_before_bundle_id) | |
print("Skipping all activities before {}".format(skip_before_bundle_id)) | |
activities = activities[skip_index:] | |
for activity in activities: | |
bundle_id = activity.bundle_id | |
# Find addon_id of bundle from old db | |
addon_id = db.addons.filter(db.addons.guid == bundle_id).one().id | |
print("Patching screenshots for {} -- {}".format(bundle_id, addon_id)) | |
# Fetch Screenshot from the Database | |
try: | |
screenshots = get_screenshots(addon_id) | |
except Exception as error: | |
print("Stopped due to {} at {} .Resume by passing bundle_id and it will pick up from where it left".format( | |
error, bundle_id)) | |
sys.exit() | |
# Update screenshots | |
activity.latest_release.update(set__screenshots=screenshots) | |
# Update previous releases as well | |
for old_release in activity.previous_releases: | |
old_release.update(set__screenshots=screenshots) | |
if __name__ == "__main__": | |
non_zero = 0 | |
zero = 0 | |
parser = argparse.ArgumentParser() | |
parser = argparse.ArgumentParser( | |
description='Migrate old aslo data from Mysql to Mongo') | |
parser.add_argument('-ws', "--with-screenshots", | |
help='Uploads good addons screenshots', | |
nargs="?") | |
parser.add_argument('-pe', "--patch-existing", | |
help="Patch existing Mongo database with screenshots (overrides if they already exist", | |
nargs="?") | |
parser.add_argument('-b_id', "--bundle-id", | |
help="Bundle id, all activities before this bundle_id will be skipped", | |
nargs="?", default=None) | |
args=vars(parser.parse_args()) | |
print(args) | |
if args['patch_existing']: | |
patch_existing_addons_with_screenshots( | |
skip_before_bundle_id = args['bundle_id']) | |
elif args['with_screenshots']: | |
print("Processing good addons with screenshots") | |
addons=list(map(extract_addon_id_from_log, | |
glob("good_addons/*.log"))) | |
# print(len(addons)) | |
for id in addons: | |
print("Addon id {}".format(id)) | |
addon=db.addons.filter(db.addons.id == id).one() | |
result=get_addon_releases_info(addon, process_screenshots = True) | |
import time | |
print("Mandatory pause ...") | |
time.sleep(12) | |
# print(find_developers(addon.id)) | |
if len(result) > 0: | |
non_zero=non_zero + 1 | |
print("Non zero release") | |
else: | |
zero=zero + 1 | |
print("Zero release") | |
print("Non-zero activities : {}".format(non_zero)) | |
print("Zero activities : {} ".format(zero)) | |
else: | |
for addon in db.addons.all(): | |
print("Addon id {}".format(addon.id)) | |
result=get_addon_releases_info(addon, process_screenshots = False) | |
# print(find_developers(addon.id)) | |
if len(result) > 0: | |
non_zero=non_zero + 1 | |
print("Non zero release") | |
else: | |
zero=zero + 1 | |
print("Zero release") | |
print("Non-zero activities : {}".format(non_zero)) | |
print("Zero activities : {} ".format(zero)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment