A very messy script
# Requires Python 3.2 and onwards | |
import sqlsoup | |
import sys | |
import itertools | |
import os | |
from imgurpython import ImgurClient | |
import tempfile | |
import hashlib | |
from imgurpython import ImgurClient | |
from operator import is_not | |
from functools import partial | |
import argparse | |
from glob import glob | |
# Place it in inside aslo-v3 root folder for imports | |
from aslo.service import activity as activity_service | |
from aslo.models.activity import ActivityModel, DeveloperModel | |
from aslo.models.release import ReleaseModel | |
db = sqlsoup.SQLSoup("mysql://{}:{}@{}/{}".format("root", | |
"YourPassword", "localhost", "activities")) | |
IMGUR_CLIENT_ID = "XXXXXXXXXXXXXXXXXXXXX" | |
IMGUR_CLIENT_SECRET = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXX" | |
imgur_client = ImgurClient( | |
IMGUR_CLIENT_ID, IMGUR_CLIENT_SECRET | |
) | |
POPULAR_CATEGORIES = ["programming", "robotics", "internet", "science", "maths", | |
"language", "geography", "documents", "music", "media", "art", "teacher", "system"] | |
FALLBACK_ICON = db.addons.first().icondata | |
FALLBACK_ICON_TYPE = db.addons.first().icontype | |
def save_to_temp_file(img_data): | |
file_location = tempfile.mktemp() + ".png" | |
with open(file_location, "wb") as f: | |
f.write(img_data) | |
return file_location | |
# https://stackoverflow.com/a/26492671/3455743 | |
def fix_encoding(target_string): | |
# Ignore to suppress errros | |
if target_string is None: | |
return " " | |
try: | |
# MySql uses latin1 by default :( | |
return target_string.encode('latin1', 'ignore').decode('utf8') | |
except UnicodeError as e: | |
# If you cannot encode it, leave it as it is :( | |
return target_string | |
def upload_img_to_imgur(image_path): | |
print("Uploading image .. {}".format(image_path)) | |
result = imgur_client.upload_from_path(image_path) | |
return (result['link'], result['deletehash']) | |
def get_img_hash(img_path, blocksize=2**20): | |
h = hashlib.sha1() | |
with open(img_path, 'rb') as f: | |
for chunk in iter(lambda: f.read(blocksize), b''): | |
h.update(chunk) | |
return str(h.hexdigest()) | |
def get_screenshots(addon_id): | |
print("Processing Screenshots ") | |
# Limit to 5 screenshots | |
images = db.previews.filter(db.previews.addon_id == addon_id).limit(5) | |
new_screenshots = {} | |
# For now default to en | |
lang = 'en' | |
new_screenshots['en'] = {} | |
for image in images: | |
image = save_to_temp_file(image.filedata) | |
_hash = get_img_hash(image) | |
new_screenshots[lang][_hash] = {} | |
link, deletehash = upload_img_to_imgur(image) | |
import time | |
time.sleep(8) | |
new_screenshots[lang][_hash] = (link, deletehash) | |
return new_screenshots | |
# Setup log directories | |
# Python 3.2 onwards | |
os.makedirs("faulty_addons/", exist_ok=True) | |
os.makedirs("good_addons/", exist_ok=True) | |
import mongoengine as me | |
me.connect('aslo') | |
def make_user_hash(user_info): | |
user_hash = dict.fromkeys(['name', 'page', 'email', 'avatar'], None) | |
user_hash["name"] = fix_encoding( | |
user_info.firstname) + ' ' + fix_encoding(user_info.lastname) | |
user_hash["page"] = (user_info.homepage or None) | |
user_hash["email"] = user_info.email | |
user_hash["avatar"] = 'https://avatars2.githubusercontent.com/u/3996398?v=4' | |
return user_hash | |
def modify_locale_underscore(translation_entry): | |
# Both tuples and dict keys are immutable :( | |
new_locale, localized_string = translation_entry[0].replace( | |
"-", "_"), fix_encoding(translation_entry[1]) | |
return (new_locale, localized_string) | |
def convert_translations(translation_id, without_locale=False, singular=False): | |
# Handle Null/None translation ids | |
if translation_id is None: | |
return None | |
if without_locale: | |
if singular: | |
translation = db.translations.with_entities(db.translations.localized_string).filter( | |
db.translations.id == translation_id).first() | |
else: | |
translation = db.translations.with_entities(db.translations.localized_string).filter( | |
db.translations.id == translation_id).all() | |
return translation | |
else: | |
translation = db.translations.with_entities(db.translations.locale, db.translations.localized_string).filter( | |
db.translations.id == translation_id).all() | |
translation = list(map(modify_locale_underscore, translation)) | |
return dict(translation) | |
def find_developers(addon_id): | |
user_ids = db.addons_users.with_entities(db.addons_users.user_id).filter( | |
db.addons_users.addon_id == addon_id).all() | |
user_lists = [] | |
for user in user_ids: | |
user_info = db.users.filter(db.users.id == user[0]).one() | |
user_lists.append(make_user_hash(user_info)) | |
return user_lists | |
def get_bundle_details(version_id): | |
result = db.files.with_entities(db.files.filename, db.files.created).filter( | |
db.files.version_id == version_id) | |
if result.count() > 0: | |
return result.one() | |
else: | |
return None, None | |
# App version id is different from version id | |
def get_sugar_version(app_version_id): | |
return db.appversions.filter(db.appversions.id == app_version_id).one().version | |
def get_license(license_id): | |
FALLBACK_LICENSE = None | |
if license_id is None: | |
return FALLBACK_LICENSE | |
text_id = db.licenses.filter(db.licenses.id == license_id).one().text | |
if text_id is None: | |
return FALLBACK_LICENSE | |
# Strip license with long texts and return header only | |
license_text = convert_translations(text_id, without_locale=True, singular=True)[ | |
0].strip().split("\r")[0].strip() | |
if not license_text: | |
return FALLBACK_LICENSE | |
else: | |
return license_text | |
def flatten_singly_tuple_list(target_list): | |
def single_out(el): return el[0] | |
return list(map(single_out, target_list)) | |
def get_sugar_info(version_id): | |
sugar_info = dict.fromkeys( | |
['is_web', 'is_gtk3', 'has_old_toolbars'], False) | |
version_info = db.applications_versions.filter( | |
db.applications_versions.version_id == version_id) | |
if version_info.count() > 0: | |
sugar_info['min_sugar_version'] = get_sugar_version( | |
version_info.one().min) | |
else: | |
sugar_info['min_sugar_version'] = 0.0 | |
return sugar_info | |
def get_category_text(category_id): | |
translation_id = db.categories.filter( | |
db.categories.id == category_id).one().name | |
# Get only en_US translations | |
category = convert_translations( | |
translation_id, without_locale=False).get('en_US', "").lower() | |
if category in POPULAR_CATEGORIES: | |
return category | |
def get_addon_categories(addon_id): | |
category_ids = db.addons_categories.with_entities( | |
db.addons_categories.category_id).filter(db.addons_categories.addon_id == addon_id).all() | |
category_ids = flatten_singly_tuple_list(category_ids) | |
categories = list(map(get_category_text, category_ids)) | |
# Strip any None/Falsy values | |
categories = filter(partial(is_not, None), categories) | |
# Flattens a nested list and join them into space separated string | |
# Probably not the best way to it | |
return " " .join(list(categories)) | |
def generate_old_download_url(bundle_name, addon_id): | |
return "http://activities.sugarlabs.org/activities/{}/{}".format(addon_id, bundle_name) | |
def get_addon_releases_info(addon, process_screenshots=False): | |
addon_id = addon.id | |
releases = [] | |
def isFloat(el): | |
try: | |
float(el[1]) | |
return True | |
except: | |
with open("faulty_addons/{}.log".format(addon_id), 'a') as f: | |
print("Skipping version id {} and activity version {}. Reason: Invalid/non-float version number ".format( | |
el[0], el[1]), file=f) | |
return False | |
version_info = db.versions.with_entities(db.versions.id, | |
db.versions.version, db.versions.releasenotes, db.versions.license_id).filter(db.versions.addon_id == addon_id).all() | |
# Remove non float versions | |
version_info = [el for el in version_info if isFloat(el)] | |
# Couldn't sqlachemy way of supplying custom function of order, there is one out there but I did a hack of sorting the result | |
# Sort by float value of versions | |
# We are sorting to avoid error by activity_service | |
version_info = sorted(version_info, key=lambda el: float(el[1])) | |
# Convert release notes/ | |
i18n_name = convert_translations(addon.name) | |
i18n_summary = convert_translations(addon.summary) | |
# Add placeholder in case of no summary | |
if i18n_summary is None: | |
i18n_summary = {'en': 'No Summary '} | |
homepage = convert_translations( | |
addon.homepage, without_locale=True, singular=True) | |
# Handle empty links | |
if homepage is not None: | |
homepage = homepage[0] | |
# Else fallback to support url and then to emptyd | |
else: | |
supporturl = convert_translations( | |
addon.supporturl, without_locale=True, singular=True) | |
if supporturl is not None: | |
homepage = supporturl[0] | |
else: | |
homepage = "https://sugarlabs.org" | |
developers = find_developers(addon_id) | |
icon_type = addon.icontype | |
icon_bin = addon.icondata | |
categories = get_addon_categories(addon_id) | |
bundle_id = addon.guid | |
if process_screenshots: | |
screenshots = get_screenshots(addon_id) | |
# First look for license in the suggested amount | |
license = addon.suggested_amount | |
for version_id, version, releasenote, license_id in version_info: | |
release_info = {} | |
release_info['release'] = {} | |
release_info['screenshots'] = {} | |
releasenote = convert_translations( | |
releasenote, without_locale=True, singular=True) | |
release_info['sugar'] = get_sugar_info(version_id) | |
release_info['activity_version'] = version | |
release_info['categories'] = categories | |
bundle_name, release_info['release']['time'] = get_bundle_details( | |
version_id) | |
if bundle_name is None: | |
with open("faulty_addons/{}.log".format(addon_id), 'a') as f: | |
print("Skipping version id {} and activity version {}. Reason: No bundle file found ".format( | |
version_id, version), file=f) | |
continue | |
# Priortize suggested amount, if None then only get license from license id | |
if license is None: | |
release_info['license'] = get_license(license_id) | |
else: | |
release_info['license'] = license | |
release_info['download_url'] = generate_old_download_url( | |
bundle_name=bundle_name, addon_id=addon_id) | |
release_info['bundle_name'] = bundle_name | |
release_info['i18n_name'] = i18n_name | |
release_info['i18n_summary'] = i18n_summary | |
release_info['developers'] = developers | |
release_info['repository'] = homepage | |
if release_info['repository'] is None: | |
release_info['repository'] = "https://sugarlabs.org" | |
release_info['icon_type'] = icon_type | |
release_info['icon_bin'] = icon_bin | |
release_info['bundle_id'] = bundle_id | |
if process_screenshots: | |
release_info['screenshots'] = screenshots | |
if releasenote is not None: | |
release_info['release']['notes'] = fix_encoding(releasenote[0]) | |
else: | |
release_info['release']['notes'] = " " | |
# If icon_bin use fallback Icon | |
if icon_bin is None: | |
release_info['icon_type'] = FALLBACK_ICON_TYPE | |
release_info['icon_bin'] = FALLBACK_ICON | |
# Skip Activities with unknown, undefined, and no licenses | |
if release_info['license'] is not None: | |
releases.append(release_info) | |
with open("good_addons/{}.log".format(addon_id), 'a') as f: | |
print("Added to Database . version id {} and activity version {}".format( | |
version_id, version), file=f) | |
# print(release_info) | |
activity_service.insert_activity(release_info) | |
else: | |
with open("faulty_addons/{}.log".format(addon_id), 'a') as f: | |
print("Skipping version id {} and activity version {}. Reason: Unknown license ".format( | |
version_id, version), file=f) | |
return releases | |
def extract_addon_id_from_log(log_file): | |
return os.path.splitext(os.path.basename(log_file))[0] | |
def patch_existing_addons_with_screenshots(skip_before_bundle_id=None): | |
activities = list(ActivityModel.objects()) | |
if skip_before_bundle_id: | |
skip_index = [el.bundle_id for el in activities].index(skip_before_bundle_id) | |
print("Skipping all activities before {}".format(skip_before_bundle_id)) | |
activities = activities[skip_index:] | |
for activity in activities: | |
bundle_id = activity.bundle_id | |
# Find addon_id of bundle from old db | |
addon_id = db.addons.filter(db.addons.guid == bundle_id).one().id | |
print("Patching screenshots for {} -- {}".format(bundle_id, addon_id)) | |
# Fetch Screenshot from the Database | |
try: | |
screenshots = get_screenshots(addon_id) | |
except Exception as error: | |
print("Stopped due to {} at {} .Resume by passing bundle_id and it will pick up from where it left".format( | |
error, bundle_id)) | |
sys.exit() | |
# Update screenshots | |
activity.latest_release.update(set__screenshots=screenshots) | |
# Update previous releases as well | |
for old_release in activity.previous_releases: | |
old_release.update(set__screenshots=screenshots) | |
if __name__ == "__main__": | |
non_zero = 0 | |
zero = 0 | |
parser = argparse.ArgumentParser() | |
parser = argparse.ArgumentParser( | |
description='Migrate old aslo data from Mysql to Mongo') | |
parser.add_argument('-ws', "--with-screenshots", | |
help='Uploads good addons screenshots', | |
nargs="?") | |
parser.add_argument('-pe', "--patch-existing", | |
help="Patch existing Mongo database with screenshots (overrides if they already exist", | |
nargs="?") | |
parser.add_argument('-b_id', "--bundle-id", | |
help="Bundle id, all activities before this bundle_id will be skipped", | |
nargs="?", default=None) | |
args=vars(parser.parse_args()) | |
print(args) | |
if args['patch_existing']: | |
patch_existing_addons_with_screenshots( | |
skip_before_bundle_id = args['bundle_id']) | |
elif args['with_screenshots']: | |
print("Processing good addons with screenshots") | |
addons=list(map(extract_addon_id_from_log, | |
glob("good_addons/*.log"))) | |
# print(len(addons)) | |
for id in addons: | |
print("Addon id {}".format(id)) | |
addon=db.addons.filter(db.addons.id == id).one() | |
result=get_addon_releases_info(addon, process_screenshots = True) | |
import time | |
print("Mandatory pause ...") | |
time.sleep(12) | |
# print(find_developers(addon.id)) | |
if len(result) > 0: | |
non_zero=non_zero + 1 | |
print("Non zero release") | |
else: | |
zero=zero + 1 | |
print("Zero release") | |
print("Non-zero activities : {}".format(non_zero)) | |
print("Zero activities : {} ".format(zero)) | |
else: | |
for addon in db.addons.all(): | |
print("Addon id {}".format(addon.id)) | |
result=get_addon_releases_info(addon, process_screenshots = False) | |
# print(find_developers(addon.id)) | |
if len(result) > 0: | |
non_zero=non_zero + 1 | |
print("Non zero release") | |
else: | |
zero=zero + 1 | |
print("Zero release") | |
print("Non-zero activities : {}".format(non_zero)) | |
print("Zero activities : {} ".format(zero)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment