Skip to content

Instantly share code, notes, and snippets.

@jatindhankhar
Last active September 4, 2017 14:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jatindhankhar/91ea2d79674e6874f68b87e917e8353a to your computer and use it in GitHub Desktop.
Save jatindhankhar/91ea2d79674e6874f68b87e917e8353a to your computer and use it in GitHub Desktop.
A very messy script
# Requires Python 3.2 and onwards
import sqlsoup
import sys
import itertools
import os
from imgurpython import ImgurClient
import tempfile
import hashlib
from imgurpython import ImgurClient
from operator import is_not
from functools import partial
import argparse
from glob import glob
# Place it in inside aslo-v3 root folder for imports
from aslo.service import activity as activity_service
from aslo.models.activity import ActivityModel, DeveloperModel
from aslo.models.release import ReleaseModel
db = sqlsoup.SQLSoup("mysql://{}:{}@{}/{}".format("root",
"YourPassword", "localhost", "activities"))
IMGUR_CLIENT_ID = "XXXXXXXXXXXXXXXXXXXXX"
IMGUR_CLIENT_SECRET = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
imgur_client = ImgurClient(
IMGUR_CLIENT_ID, IMGUR_CLIENT_SECRET
)
POPULAR_CATEGORIES = ["programming", "robotics", "internet", "science", "maths",
"language", "geography", "documents", "music", "media", "art", "teacher", "system"]
FALLBACK_ICON = db.addons.first().icondata
FALLBACK_ICON_TYPE = db.addons.first().icontype
def save_to_temp_file(img_data):
file_location = tempfile.mktemp() + ".png"
with open(file_location, "wb") as f:
f.write(img_data)
return file_location
# https://stackoverflow.com/a/26492671/3455743
def fix_encoding(target_string):
# Ignore to suppress errros
if target_string is None:
return " "
try:
# MySql uses latin1 by default :(
return target_string.encode('latin1', 'ignore').decode('utf8')
except UnicodeError as e:
# If you cannot encode it, leave it as it is :(
return target_string
def upload_img_to_imgur(image_path):
print("Uploading image .. {}".format(image_path))
result = imgur_client.upload_from_path(image_path)
return (result['link'], result['deletehash'])
def get_img_hash(img_path, blocksize=2**20):
h = hashlib.sha1()
with open(img_path, 'rb') as f:
for chunk in iter(lambda: f.read(blocksize), b''):
h.update(chunk)
return str(h.hexdigest())
def get_screenshots(addon_id):
print("Processing Screenshots ")
# Limit to 5 screenshots
images = db.previews.filter(db.previews.addon_id == addon_id).limit(5)
new_screenshots = {}
# For now default to en
lang = 'en'
new_screenshots['en'] = {}
for image in images:
image = save_to_temp_file(image.filedata)
_hash = get_img_hash(image)
new_screenshots[lang][_hash] = {}
link, deletehash = upload_img_to_imgur(image)
import time
time.sleep(8)
new_screenshots[lang][_hash] = (link, deletehash)
return new_screenshots
# Setup log directories
# Python 3.2 onwards
os.makedirs("faulty_addons/", exist_ok=True)
os.makedirs("good_addons/", exist_ok=True)
import mongoengine as me
me.connect('aslo')
def make_user_hash(user_info):
user_hash = dict.fromkeys(['name', 'page', 'email', 'avatar'], None)
user_hash["name"] = fix_encoding(
user_info.firstname) + ' ' + fix_encoding(user_info.lastname)
user_hash["page"] = (user_info.homepage or None)
user_hash["email"] = user_info.email
user_hash["avatar"] = 'https://avatars2.githubusercontent.com/u/3996398?v=4'
return user_hash
def modify_locale_underscore(translation_entry):
# Both tuples and dict keys are immutable :(
new_locale, localized_string = translation_entry[0].replace(
"-", "_"), fix_encoding(translation_entry[1])
return (new_locale, localized_string)
def convert_translations(translation_id, without_locale=False, singular=False):
# Handle Null/None translation ids
if translation_id is None:
return None
if without_locale:
if singular:
translation = db.translations.with_entities(db.translations.localized_string).filter(
db.translations.id == translation_id).first()
else:
translation = db.translations.with_entities(db.translations.localized_string).filter(
db.translations.id == translation_id).all()
return translation
else:
translation = db.translations.with_entities(db.translations.locale, db.translations.localized_string).filter(
db.translations.id == translation_id).all()
translation = list(map(modify_locale_underscore, translation))
return dict(translation)
def find_developers(addon_id):
user_ids = db.addons_users.with_entities(db.addons_users.user_id).filter(
db.addons_users.addon_id == addon_id).all()
user_lists = []
for user in user_ids:
user_info = db.users.filter(db.users.id == user[0]).one()
user_lists.append(make_user_hash(user_info))
return user_lists
def get_bundle_details(version_id):
result = db.files.with_entities(db.files.filename, db.files.created).filter(
db.files.version_id == version_id)
if result.count() > 0:
return result.one()
else:
return None, None
# App version id is different from version id
def get_sugar_version(app_version_id):
return db.appversions.filter(db.appversions.id == app_version_id).one().version
def get_license(license_id):
FALLBACK_LICENSE = None
if license_id is None:
return FALLBACK_LICENSE
text_id = db.licenses.filter(db.licenses.id == license_id).one().text
if text_id is None:
return FALLBACK_LICENSE
# Strip license with long texts and return header only
license_text = convert_translations(text_id, without_locale=True, singular=True)[
0].strip().split("\r")[0].strip()
if not license_text:
return FALLBACK_LICENSE
else:
return license_text
def flatten_singly_tuple_list(target_list):
def single_out(el): return el[0]
return list(map(single_out, target_list))
def get_sugar_info(version_id):
sugar_info = dict.fromkeys(
['is_web', 'is_gtk3', 'has_old_toolbars'], False)
version_info = db.applications_versions.filter(
db.applications_versions.version_id == version_id)
if version_info.count() > 0:
sugar_info['min_sugar_version'] = get_sugar_version(
version_info.one().min)
else:
sugar_info['min_sugar_version'] = 0.0
return sugar_info
def get_category_text(category_id):
translation_id = db.categories.filter(
db.categories.id == category_id).one().name
# Get only en_US translations
category = convert_translations(
translation_id, without_locale=False).get('en_US', "").lower()
if category in POPULAR_CATEGORIES:
return category
def get_addon_categories(addon_id):
category_ids = db.addons_categories.with_entities(
db.addons_categories.category_id).filter(db.addons_categories.addon_id == addon_id).all()
category_ids = flatten_singly_tuple_list(category_ids)
categories = list(map(get_category_text, category_ids))
# Strip any None/Falsy values
categories = filter(partial(is_not, None), categories)
# Flattens a nested list and join them into space separated string
# Probably not the best way to it
return " " .join(list(categories))
def generate_old_download_url(bundle_name, addon_id):
return "http://activities.sugarlabs.org/activities/{}/{}".format(addon_id, bundle_name)
def get_addon_releases_info(addon, process_screenshots=False):
addon_id = addon.id
releases = []
def isFloat(el):
try:
float(el[1])
return True
except:
with open("faulty_addons/{}.log".format(addon_id), 'a') as f:
print("Skipping version id {} and activity version {}. Reason: Invalid/non-float version number ".format(
el[0], el[1]), file=f)
return False
version_info = db.versions.with_entities(db.versions.id,
db.versions.version, db.versions.releasenotes, db.versions.license_id).filter(db.versions.addon_id == addon_id).all()
# Remove non float versions
version_info = [el for el in version_info if isFloat(el)]
# Couldn't sqlachemy way of supplying custom function of order, there is one out there but I did a hack of sorting the result
# Sort by float value of versions
# We are sorting to avoid error by activity_service
version_info = sorted(version_info, key=lambda el: float(el[1]))
# Convert release notes/
i18n_name = convert_translations(addon.name)
i18n_summary = convert_translations(addon.summary)
# Add placeholder in case of no summary
if i18n_summary is None:
i18n_summary = {'en': 'No Summary '}
homepage = convert_translations(
addon.homepage, without_locale=True, singular=True)
# Handle empty links
if homepage is not None:
homepage = homepage[0]
# Else fallback to support url and then to emptyd
else:
supporturl = convert_translations(
addon.supporturl, without_locale=True, singular=True)
if supporturl is not None:
homepage = supporturl[0]
else:
homepage = "https://sugarlabs.org"
developers = find_developers(addon_id)
icon_type = addon.icontype
icon_bin = addon.icondata
categories = get_addon_categories(addon_id)
bundle_id = addon.guid
if process_screenshots:
screenshots = get_screenshots(addon_id)
# First look for license in the suggested amount
license = addon.suggested_amount
for version_id, version, releasenote, license_id in version_info:
release_info = {}
release_info['release'] = {}
release_info['screenshots'] = {}
releasenote = convert_translations(
releasenote, without_locale=True, singular=True)
release_info['sugar'] = get_sugar_info(version_id)
release_info['activity_version'] = version
release_info['categories'] = categories
bundle_name, release_info['release']['time'] = get_bundle_details(
version_id)
if bundle_name is None:
with open("faulty_addons/{}.log".format(addon_id), 'a') as f:
print("Skipping version id {} and activity version {}. Reason: No bundle file found ".format(
version_id, version), file=f)
continue
# Priortize suggested amount, if None then only get license from license id
if license is None:
release_info['license'] = get_license(license_id)
else:
release_info['license'] = license
release_info['download_url'] = generate_old_download_url(
bundle_name=bundle_name, addon_id=addon_id)
release_info['bundle_name'] = bundle_name
release_info['i18n_name'] = i18n_name
release_info['i18n_summary'] = i18n_summary
release_info['developers'] = developers
release_info['repository'] = homepage
if release_info['repository'] is None:
release_info['repository'] = "https://sugarlabs.org"
release_info['icon_type'] = icon_type
release_info['icon_bin'] = icon_bin
release_info['bundle_id'] = bundle_id
if process_screenshots:
release_info['screenshots'] = screenshots
if releasenote is not None:
release_info['release']['notes'] = fix_encoding(releasenote[0])
else:
release_info['release']['notes'] = " "
# If icon_bin use fallback Icon
if icon_bin is None:
release_info['icon_type'] = FALLBACK_ICON_TYPE
release_info['icon_bin'] = FALLBACK_ICON
# Skip Activities with unknown, undefined, and no licenses
if release_info['license'] is not None:
releases.append(release_info)
with open("good_addons/{}.log".format(addon_id), 'a') as f:
print("Added to Database . version id {} and activity version {}".format(
version_id, version), file=f)
# print(release_info)
activity_service.insert_activity(release_info)
else:
with open("faulty_addons/{}.log".format(addon_id), 'a') as f:
print("Skipping version id {} and activity version {}. Reason: Unknown license ".format(
version_id, version), file=f)
return releases
def extract_addon_id_from_log(log_file):
return os.path.splitext(os.path.basename(log_file))[0]
def patch_existing_addons_with_screenshots(skip_before_bundle_id=None):
activities = list(ActivityModel.objects())
if skip_before_bundle_id:
skip_index = [el.bundle_id for el in activities].index(skip_before_bundle_id)
print("Skipping all activities before {}".format(skip_before_bundle_id))
activities = activities[skip_index:]
for activity in activities:
bundle_id = activity.bundle_id
# Find addon_id of bundle from old db
addon_id = db.addons.filter(db.addons.guid == bundle_id).one().id
print("Patching screenshots for {} -- {}".format(bundle_id, addon_id))
# Fetch Screenshot from the Database
try:
screenshots = get_screenshots(addon_id)
except Exception as error:
print("Stopped due to {} at {} .Resume by passing bundle_id and it will pick up from where it left".format(
error, bundle_id))
sys.exit()
# Update screenshots
activity.latest_release.update(set__screenshots=screenshots)
# Update previous releases as well
for old_release in activity.previous_releases:
old_release.update(set__screenshots=screenshots)
if __name__ == "__main__":
non_zero = 0
zero = 0
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(
description='Migrate old aslo data from Mysql to Mongo')
parser.add_argument('-ws', "--with-screenshots",
help='Uploads good addons screenshots',
nargs="?")
parser.add_argument('-pe', "--patch-existing",
help="Patch existing Mongo database with screenshots (overrides if they already exist",
nargs="?")
parser.add_argument('-b_id', "--bundle-id",
help="Bundle id, all activities before this bundle_id will be skipped",
nargs="?", default=None)
args=vars(parser.parse_args())
print(args)
if args['patch_existing']:
patch_existing_addons_with_screenshots(
skip_before_bundle_id = args['bundle_id'])
elif args['with_screenshots']:
print("Processing good addons with screenshots")
addons=list(map(extract_addon_id_from_log,
glob("good_addons/*.log")))
# print(len(addons))
for id in addons:
print("Addon id {}".format(id))
addon=db.addons.filter(db.addons.id == id).one()
result=get_addon_releases_info(addon, process_screenshots = True)
import time
print("Mandatory pause ...")
time.sleep(12)
# print(find_developers(addon.id))
if len(result) > 0:
non_zero=non_zero + 1
print("Non zero release")
else:
zero=zero + 1
print("Zero release")
print("Non-zero activities : {}".format(non_zero))
print("Zero activities : {} ".format(zero))
else:
for addon in db.addons.all():
print("Addon id {}".format(addon.id))
result=get_addon_releases_info(addon, process_screenshots = False)
# print(find_developers(addon.id))
if len(result) > 0:
non_zero=non_zero + 1
print("Non zero release")
else:
zero=zero + 1
print("Zero release")
print("Non-zero activities : {}".format(non_zero))
print("Zero activities : {} ".format(zero))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment