Skip to content

Instantly share code, notes, and snippets.

@ywangd

ywangd/psiclient.py

Last active Apr 10, 2020
Embed
What would you like to do?
Proof-of-Concept Client for Pythonista-Script-Index
"""
Basic client for Pythonista Script Index (https://github.com/ywangd/Pythonista-Script-Index)
See also: https://github.com/ywangd/psiclient
"""
import platform
import sys
import os
import urllib2
import json
import collections
import random
import copy
import shutil
import zipfile
import re
import console
import ui
URL_MAIN_INDEX = 'https://raw.githubusercontent.com/ywangd/Pythonista-Script-Index/master/index.json'
SCRIPT_ROOT = os.path.realpath(os.path.abspath(os.path.dirname(__file__)))
RECORD_FILE = os.path.expanduser('~/Documents/.psiclient.json')
class WgetException(Exception):
pass
class UnzipException(Exception):
pass
class PsicException(Exception):
pass
class InvalidGistURLError(Exception):
pass
class MultipleFilesInGistError(Exception):
pass
class NoFilesInGistError(Exception):
pass
class GistDownloadError(Exception):
pass
def is_gist(url):
match = re.match('http(s?)://gist.github.com/([0-9a-zA-Z]*)/([0-9a-f]*)', url)
return match
def download_gist(gist_url):
# Returns a 2-tuple of filename and content
gist_id_match = re.match('http(s?)://gist.github.com/([0-9a-zA-Z]*)/([0-9a-f]*)', gist_url)
if gist_id_match:
import requests
gist_id = gist_id_match.group(3)
json_url = 'https://api.github.com/gists/' + gist_id
try:
gist_json = requests.get(json_url).text
gist_info = json.loads(gist_json)
history = gist_info['history']
nrevisions = len(history)
files = gist_info['files']
except:
raise GistDownloadError()
py_files = []
for file_info in files.values():
lang = file_info.get('language', None)
if lang != 'Python':
continue
py_files.append(file_info)
if len(py_files) > 1:
raise MultipleFilesInGistError()
elif len(py_files) == 0:
raise NoFilesInGistError()
else:
file_info = py_files[0]
filename = file_info['filename']
content = file_info['content']
return filename, content, nrevisions
else:
raise InvalidGistURLError()
def wget(url, output_file=None):
""" wget: downloads a file from a url """
if not output_file:
file_name, ext = os.path.splitext(url.split('/')[-1])
output_file = file_name + ext
else:
output_file = output_file
try:
#console.show_activity()
u = urllib2.urlopen(url)
print 'Opening: %s' % url
meta = u.info()
try:
file_size = int(meta.getheaders("Content-Length")[0])
except IndexError:
file_size = 0
print "Save as: %s " % output_file,
if file_size:
print "(%s bytes)" % file_size
else:
print
with open(output_file, 'wb') as f:
file_size_dl = 0
block_sz = 8192
while True:
buf = u.read(block_sz)
if not buf:
break
file_size_dl += len(buf)
f.write(buf)
if file_size:
status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size)
else:
status = "%10d" % file_size_dl
print status
except:
raise WgetException('not valid url')
finally:
# console.hide_activity()
pass
def unzip(zfile, exdir=None):
"""Extract a zip archive into a directory."""
files_extracted = []
if not os.path.isfile(zfile):
raise UnzipException("%s: No such file" % zfile)
else:
# PK magic marker check
with open(zfile) as f:
try:
pk_check = f.read(2)
except:
pk_check = ''
if pk_check != 'PK':
raise UnzipException("%s: does not appear to be a zip file" % zfile)
if os.path.basename(zfile).lower().endswith('.zip'):
altpath = os.path.splitext(os.path.basename(zfile))[0]
else:
altpath = os.path.basename(zfile) + '_unzipped'
altpath = os.path.join(os.path.dirname(zfile), altpath)
location = exdir or altpath
if (os.path.exists(location)) and not (os.path.isdir(location)):
print "%s: destination is not a directory" % location
sys.exit(1)
elif not os.path.exists(location):
os.makedirs(location)
with open(zfile, 'rb') as zipfp:
try:
zipf = zipfile.ZipFile(zipfp)
# check for a leading directory common to all files and remove it
dirnames = [os.path.join(os.path.dirname(x), '') for x in zipf.namelist()]
common_dir = os.path.commonprefix(dirnames or ['/'])
# Check to make sure there aren't 2 or more sub directories with the same prefix
if not common_dir.endswith('/'):
common_dir = os.path.join(os.path.dirname(common_dir), '')
for name in zipf.namelist():
data = zipf.read(name)
fn = name
if common_dir:
if fn.startswith(common_dir):
fn = fn.split(common_dir, 1)[-1]
elif fn.startswith('/' + common_dir):
fn = fn.split('/' + common_dir, 1)[-1]
fn = fn.lstrip('/')
fn = os.path.join(location, fn)
dirf = os.path.dirname(fn)
if not os.path.exists(dirf):
os.makedirs(dirf)
if fn.endswith('/'): # A directory
if not os.path.exists(fn):
os.makedirs(fn)
else:
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
files_extracted.append(fn)
except:
raise UnzipException("%s: zip file is corrupt" % zfile)
return files_extracted
def top_dir(path):
dirname = os.path.dirname(path)
if dirname == '':
return path
else:
return top_dir(dirname)
def refreshable_url(url):
return '%s?q=%d' % (url, random.randint(0, 99999))
def dict_update(d, u):
for k, v in u.iteritems():
if isinstance(v, collections.Mapping):
r = dict_update(d.get(k, {}), v)
d[k] = r
else:
d[k] = u[k]
return d
def get_release_for_version(releases, ver_num=None):
if ver_num is not None:
print repr(ver_num)
print repr(releases)
for release in releases:
if ver_num == release['version']:
return release
else:
ver_digits = (0, 0, 0)
target_release = None
try:
for release in releases:
if release['version'] is None:
raise KeyError()
this_ver_digits = map(int, release['version'].split('.'))
if len(this_ver_digits) < 3:
this_ver_digits += (0,) * (3 - len(this_ver_digits))
for i, digit in enumerate(this_ver_digits):
if ver_digits[i] < digit:
ver_digits = this_ver_digits
target_release = release
return target_release
except KeyError: # if no version info, just return the first release
if releases:
release = releases[0]
release['version'] = None
return release
return None
class MergedIndices(object):
def __init__(self, remote_index_url, record_file, load_remote=True, dest_dir=None):
self.remote_index_url = remote_index_url
self.record_file = record_file
self.dest_dir = os.path.expanduser(dest_dir) if dest_dir \
else os.path.join(os.environ['HOME'], 'Documents/bin')
if not os.path.exists(self.dest_dir) or not os.path.isdir(self.dest_dir):
os.mkdir(self.dest_dir)
if load_remote:
self.remote_index = self.load_remote_index()
else:
self.remote_index = {'scripts': {}}
self.local_indices = self.load_local_record()
self.merged_indices = None
self.merged_index = None
self.merge_remote_and_local()
def load_local_record(self):
if not os.path.exists(self.record_file):
with open(self.record_file, 'w') as outs:
outs.write('{}')
return {}
else:
with open(self.record_file) as ins:
return json.load(ins)
def load_remote_index(self):
return json.loads(urllib2.urlopen(refreshable_url(self.remote_index_url)).read())
def merge_remote_and_local(self):
self.merged_indices = {self.remote_index_url: copy.deepcopy(self.remote_index)}
dict_update(self.merged_indices, copy.deepcopy(self.local_indices))
self.merged_index = self.merged_indices[self.remote_index_url]
def save_local_record(self):
with open(self.record_file, 'w') as outs:
json.dump(self.local_indices, outs, indent=1)
def do_list(self):
script_list = []
for url in sorted(self.merged_indices.keys()):
index = self.merged_indices[url]
scripts = index['scripts']
for script_name in sorted(scripts.keys()):
entry = scripts[script_name]
installed = entry.get('installed', None)
description = entry.get('description', None)
script_list.append({"short_name": script_name,
"description": description,
"installed": installed})
return script_list
def do_install(self, script_name, ver_num=None, dest_dir=None):
if script_name not in self.merged_index['scripts'].keys():
raise PsicException('%s: not found' % script_name)
elif self.merged_index['scripts'][script_name].get('installed', None):
raise PsicException('%s: already installed' % script_name)
else:
meta_url, tag = urllib2.splittag(self.merged_index['scripts'][script_name]['meta_url'])
jmsg = json.loads(urllib2.urlopen(refreshable_url(meta_url)).read())
jmsg = jmsg[tag] if tag else jmsg
releases = jmsg['releases']
release = get_release_for_version(releases, ver_num)
if release is None:
raise PsicException('%s: release not found' % script_name)
if 'filetype' not in release:
if release['url'].endswith('.py') or is_gist(release['url']):
release['filetype'] = 'SingleFile'
elif release['url'].endswith('.zip'):
release['filetype'] = 'ZippedFiles'
else:
raise PsicException('%s: unknown filetype' % script_name)
if not dest_dir:
dest_dir = self.dest_dir
else:
dest_dir = os.path.expanduser(dest_dir)
dest_dir = os.path.abspath(dest_dir)
if not os.path.exists(dest_dir) or not os.path.isdir(dest_dir):
os.makedirs(dest_dir)
filetype = release['filetype']
files_installed = []
full_name = jmsg.get('name', None)
if full_name is None:
full_name = script_name
saved_dir = os.getcwd()
try:
if filetype == 'SingleFile':
os.chdir(dest_dir)
filename = full_name if full_name.endswith('.py') else (full_name + '.py')
if release['url'].endswith('.py'):
wget(release['url'], output_file=filename)
files_installed.append(filename)
raw_match = re.match('http(s?)://raw.github.com/gist/', release['url'])
raw_match2 = re.match('http(s?)://gist.githubusercontent.com/([0-9a-zA-Z]*)/([0-9a-f]*)/raw/', release['url'])
if not raw_match and not raw_match2:
try:
filename_pyui = filename + 'ui'
wget(release['url'] + 'ui', output_file=filename_pyui)
files_installed.append(filename_pyui)
except WgetException:
pass
elif is_gist(release['url']):
try:
_, content, nrevisions = download_gist(release['url'])
except (InvalidGistURLError, MultipleFilesInGistError, NoFilesInGistError, GistDownloadError):
raise PsicException('%s: Gist url error' % release['url'])
with open(filename, 'w') as outs:
outs.write(content)
files_installed.append(filename)
if release['version'] is None:
release['version'] = '%d.0.0' % nrevisions
else:
raise PsicException('%s: invalid single file url' % release['url'])
elif filetype == 'ZippedFiles':
os.chdir(os.environ['TMPDIR'])
wget(release['url'], output_file='files.zip')
this_dest_dir = os.path.join(dest_dir, full_name)
if not os.path.exists(this_dest_dir) or not os.path.isdir(this_dest_dir):
os.mkdir(this_dest_dir)
files_extracted = unzip('files.zip', exdir=this_dest_dir)
for f in files_extracted:
if f.strip() == '':
continue
f = os.path.relpath(f, dest_dir)
if f != '.':
files_installed.append(f)
os.remove('files.zip')
else:
raise PsicException('%s: unknown filetype' % script_name)
finally:
os.chdir(saved_dir)
installed = {
'version': release['version'],
'dest_dir': dest_dir,
'filetype': release['filetype'],
'files': files_installed
}
if self.remote_index_url not in self.local_indices:
self.local_indices[self.remote_index_url] = {'scripts': {}}
self.local_indices[self.remote_index_url]['scripts'][script_name] = {'installed': installed}
self.save_local_record()
self.merge_remote_and_local()
return script_name, installed
def do_remove(self, script_name):
if script_name not in self.remote_index['scripts'].keys():
raise PsicException('%s: script not found' % script_name)
else:
entry = self.merged_index['scripts'][script_name]
if entry.get('installed', None) is None:
raise PsicException('%s: script not installed' % script_name)
saved_dir = os.getcwd()
try:
os.chdir(entry['installed']['dest_dir'])
if entry['installed']['filetype'] == 'SingleFile':
for f in entry['installed']['files']:
os.remove(f)
elif entry['installed']['filetype'] == 'ZippedFiles':
removed_dirs = set()
for f in entry['installed']['files']:
this_dir = os.path.dirname(f)
if this_dir in removed_dirs or top_dir(this_dir) in removed_dirs:
continue
if this_dir != '' and this_dir not in removed_dirs:
shutil.rmtree(this_dir)
removed_dirs.add(this_dir)
elif os.path.isdir(f):
shutil.rmtree(f)
removed_dirs.add(f)
else:
os.remove(f)
finally:
os.chdir(saved_dir)
self.local_indices[self.remote_index_url]['scripts'].pop(script_name)
self.save_local_record()
self.merge_remote_and_local()
return script_name
def do_info(self, script_name):
if script_name not in self.remote_index['scripts'].keys():
raise PsicException('%s: script not found' % script_name)
else:
entry = self.remote_index['scripts'][script_name]
if 'info' in entry.keys():
return script_name, entry['info'], self.merged_index['scripts'][script_name].get('installed', None)
meta_url, tag = urllib2.splittag(self.remote_index['scripts'][script_name]['meta_url'])
jmsg = json.loads(urllib2.urlopen(refreshable_url(meta_url)).read())
jmsg = jmsg[tag] if tag else jmsg
jmsg['author'] = jmsg.get('author', '')
jmsg['email'] = jmsg.get('email', '')
jmsg['website'] = jmsg.get('website', '')
jmsg['long_description'] = jmsg.get('long_description', 'N/A')
jmsg['releases'] = jmsg.get('releases', [])
releases = []
for release in jmsg.get('releases', []):
release['version'] = release.get('version', None)
releases.append(release)
jmsg['releases'] = releases
self.remote_index['scripts'][script_name]['info'] = jmsg
installed = self.merged_index['scripts'][script_name].get('installed', None)
return script_name, jmsg, installed
info_templ = r"""%s
Author: %s
Email: %s
Website: %s
Status:
%s
%s
"""
alert_duration = 1.0
if __name__ == '__main__':
_ON_IPAD = platform.machine().startswith('iPad')
desc_page = ui.View(flex='WH')
desc_page.background_color = 0.8
desc_tv = ui.TextView(flex='W')
desc_page.add_subview(desc_tv)
desc_tv.editable = False
desc_tv.font = ('AppleSDGothicNeo-Regular', 16)
desc_tv.border_width = 1
psiclient = ui.View(flex='WH')
psiclient.background_color = 'white'
psiclient.name = 'Pythonista Script Index'
psitable = ui.TableView(flex='WH')
listsource = ui.ListDataSource(['Loading ...'])
listsource.delete_enabled = False
psitable.data_source = listsource
psitable.delegate = listsource
psiclient.add_subview(psitable)
nav_view = ui.NavigationView(psiclient)
nav_view.name = 'psiclient'
if _ON_IPAD:
psitable.width = 540
psitable.height = 575
else:
psitable.width, psitable.height = ui.get_screen_size()
height_nudge = 110
psitable.height -= height_nudge
if _ON_IPAD:
nav_view.present('sheet')
else:
nav_view.present('fullscreen')
merged_indices = MergedIndices(URL_MAIN_INDEX, RECORD_FILE, dest_dir='~/Documents/bin')
item_content = {}
def make_items():
items = []
for item in merged_indices.do_list():
items.append({'title': '%s (%s)' % (item['short_name'], 'installed' if item['installed'] else 'available'),
'accessory_type': 'disclosure_indicator',
'short_name': item['short_name'],
'description': item.get('description', 'No description'),
'installed': item['installed']})
return items
def make_button(title, action):
btn = ui.Button(name='btn', title=' %s ' % title)
btn.background_color = 'white'
btn.tint_color = 'black'
btn.border_width = 1
btn.corner_radius = 5
btn.font = (btn.font[0], 18)
btn.action = action
btn.size_to_fit()
btn.x = (desc_page.width - btn.width) / 2
btn.y = 320
return btn
def make_info_page_texts():
info = item_content['info']
installed = item_content['installed']
release = get_release_for_version(info['releases'])
description = info['long_description'] if info['long_description'] else item_content['description']
rl_ver = release['version'] if release['version'] else 'N/A'
if installed:
lc_ver = installed['version'] if installed['version'] else 'N/A'
desc_tv.text = info_templ % (description,
info['author'],
info['email'],
info['website'],
'available (version: %s)' % rl_ver,
('installed (version: %s)' % lc_ver) if installed else '')
def install_action(sender):
try:
short_name = item_content['short_name']
merged_indices.do_install(short_name)
console.hud_alert('success', 'success', alert_duration)
listsource.items = make_items()
item_content['installed'] = merged_indices.merged_index['scripts'][short_name]['installed']
btn = make_button('Uninstall', remove_action)
sender.superview.add_subview(btn)
sender.superview.remove_subview(sender)
make_info_page_texts()
except:
console.hud_alert('failed', 'error', alert_duration)
def remove_action(sender):
try:
short_name = item_content['short_name']
merged_indices.do_remove(short_name)
console.hud_alert('success', 'success', alert_duration)
listsource.items = make_items()
item_content['installed'] = None
btn = make_button('Install', install_action)
sender.superview.add_subview(btn)
sender.superview.remove_subview(sender)
make_info_page_texts()
except:
console.hud_alert('failed', 'error', alert_duration)
@ui.in_background
def info_action():
_, info, installed = merged_indices.do_info(item_content['short_name'])
item_content['info'] = info
item_content['installed'] = installed
make_info_page_texts()
if installed:
btn = make_button('Uninstall', remove_action)
desc_page.add_subview(btn)
else:
btn = make_button('Install', install_action)
desc_page.add_subview(btn)
def list_item_tapped(sender):
item_content.clear()
row_item = sender.items[sender.selected_row]
item_content['short_name'] = row_item['short_name']
item_content['description'] = row_item['description']
desc_page.name = row_item['short_name']
desc_tv.height = 300 # desc_page.height / 2
desc_tv.text = 'Loading ...'
psiclient.navigation_view.push_view(desc_page)
for sv in desc_page.subviews:
if sv.name == 'btn':
desc_page.remove_subview(sv)
info_action()
listsource.items = make_items()
listsource.action = list_item_tapped
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.