Skip to content

Instantly share code, notes, and snippets.

@jam
Created June 13, 2012 17:51
Show Gist options
  • Save jam/2925485 to your computer and use it in GitHub Desktop.
Save jam/2925485 to your computer and use it in GitHub Desktop.
A moderately ugly interactive shell script for the Plex framework
#! /usr/bin/env python
__version__ = '0.1a1'
__script_exts__ = ['pscript', 'py']
import os
import sys
import traceback
import cmd
import readline
import subprocess
import types
import optparse
try:
import cStringIO as StringIO
except:
import StringIO
class mode(object):
initializing = 'Initializing'
bootstrapped = 'Bootstrapped'
__mode__ = mode.initializing
if len(sys.argv) > 1 and sys.argv[1] == '---bootstrapped':
del sys.argv[1]
__mode__ = mode.bootstrapped
class ScriptModule(types.ModuleType):
def __init__(self, name, path, core, rel_path=None, mod_name=None):
types.ModuleType.__init__(self, name)
f = open(path, 'r')
src = f.read()
f.close()
code = compile(src, path, 'exec')
self.__dict__['__builtins__'] = dict(core.host.environment)
self.__dict__['__builtins__'].update(core.host.environment['__builtins__'])
self.__dict__['_current_code_path'] = os.path.dirname(path)
module_name = mod_name if mod_name else os.path.splitext(os.path.basename(path))[0]
if rel_path:
module_name = os.path.splitext(os.path.basename(rel_path))[0] + '.' + module_name
self.__dict__['__name__'] = module_name
self.__path__ = os.path.dirname(path)
#print "MODULE:", module_name
sys.modules[module_name] = self
exec(code) in self.__dict__
def loader(path, core, existing_module=None, mod_name=None, parent=None):
class Loader(object):
def load_module(self, name):
if existing_module:
#print "Returning existing module", existing_module, name
return existing_module
lastname = name.split('.')[-1]
#print "\n\n\nNOW DOING", lastname, name
if name not in sys.modules:
sys.modules[name] = None
module = ScriptModule(name, path, core, mod_name = lastname)
sys.modules[name] = module
if '.' in name:
parent_name, child_name = name.rsplit('.', 1)
#print "SETTING MAGICAL THINGS"
#print parent_name, child_name
setattr(sys.modules[parent_name], child_name, module)
if parent:
#print "Setting parent %s of %s as %s" % (str(parent), str(module), parent.__name__)
module.__dict__['__builtins__'][parent.__name__] = parent
#print module.__dict__['__builtins__']
return sys.modules[name]
return Loader()
class MetaImporter(object):
def __init__(self, core, *paths):
self._core = core
self._paths = paths
def find_module(self, fullname, paths=None):
try:
#print 'find_module', fullname, paths
if self._paths:
parts = fullname.split('.')
lastname = parts[-1]
#print 'find_module', fullname, lastname, path
#print lastname in sys.modules, fullname in sys.modules
if lastname in sys.modules:
return loader(None, self._core, sys.modules[lastname])
elif fullname in sys.modules:
return loader(None, self._core, sys.modules[fullname])
possible_paths = []
valid_path = False
if paths != None:
for path in paths:
for p in self._paths:
#print p, path
if path.startswith(p):
valid_path = True
break
if paths != None and valid_path:
root_paths = paths
else:
root_paths = self._paths
for root_path in root_paths:
for ext in __script_exts__:
possible_paths.append(os.path.join(root_path, lastname + '.' + ext))
possible_paths.append(os.path.join(root_path, lastname, 'main' + '.' + ext))
possible_paths.append(os.path.join(root_path, lastname, '__init__' + '.' + ext))
#for x in possible_paths: print x
parent = None
if len(parts) > 1:
def search_module_by_path(parts, candidates):
#print 'checking', parts
if parts[0] in candidates:
part = parts[0]
if len(parts) == 1:
#print "Found final part"
return candidates[part]
else:
#print "Checking next parts"
ret = search_module_by_path(parts[1:], candidates[part].__dict__)
if ret:
return ret
else:
return search_module_by_path(parts[1:], candidates[part].__dict__['__builtins__'])
parent = search_module_by_path(parts[:-1], sys.modules)
#print "PARENT:", parent
for pyp in possible_paths:
if os.path.exists(pyp):
return loader(pyp, self._core, mod_name=lastname, parent=parent)
except:
print "### An exception occurred attempting to import a code file."
print traceback.format_exc()
class PshAdditions(object):
system = os.system
class types(object):
module = ScriptModule
def __init__(self, core):
self._core = core
self.script_path = None
self.cd = core.storage.change_dir
@property
def cwd(self):
return self._core.storage.current_dir()
def cmd(self, *args):
cmd = subprocess.Popen(
args,
stdout=subprocess.PIPE
)
try:
return cmd.communicate()
except KeyboardInterrupt:
cmd.terminate()
def addlib(self, name):
lib = __import__(name)
#print lib
#print dir(lib)
self._core.host.environment[name] = name
class Psh(cmd.Cmd):
macros = dict(
about = ('print "The Plex Platform Interactive Shell is a command line interpreter that allows you to do fun things."', []),
)
system = os.system
def __init__(self, core, err=None):
cmd.Cmd.__init__(self, stdin=sys.stdin, stdout=sys.stdout)
self.prompt = '>>> '
self.core = core
if err:
self.err =err
else:
self.err = None
def precmd(self, line):
try:
if len(line) == 0:
return line
elif line[0] == '#':
return line[1:]
elif line[0] == ':':
return 'path ' + line[1:]
elif line[0] == '$':
return 'system ' + line[1:].strip()
"""
try:
ret = self.core.host.execute(compile(line, '<shell>', 'single'))
except:
ret = None
tb_lines = traceback.format_exc().split('\n')[:-1]
del tb_lines[1:6]
if len(tb_lines) == 2:
del tb_lines[0]
print '!!! ' + '\n'.join(tb_lines)
"""
except:
print traceback.format_exc()
return line
def postcmd(self, stop, line):
if len(line):
print ''
pass
def default(self, line):
items = line.split(' ')
if items[0] in self.macros:
macro, arg_names = self.macros[items[0]]
arg_values = items[1:]
args = {}
for x in range(len(arg_names)):
name = arg_names[x]
if x < len(arg_values):
args[name] = arg_values[x]
else:
args[name] = ''
cmd = macro % args
self.core.host.execute(compile(cmd, '<macro>', 'single'))
else:
try:
ret = self.core.host.execute(compile(line, '<shell>', 'single'))
except:
ret = None
tb_lines = traceback.format_exc().split('\n')[:-1]
del tb_lines[1:6]
if len(tb_lines) == 2:
del tb_lines[0]
print '!!! ' + '\n'.join(tb_lines)
finally:
self.lastcmd = None
def do_path(self, line):
line = 'http://localhost:32400' + line
cmd = 'print HTTP.Request(\'%s\').content' % line
code = compile(cmd, '<request>', 'single')
self.core.host.execute(code)
def do_system(self, line):
os.system(line)
def do_success(self, line):
return None
def do_EOF(self, args):
print ''
sys.exit(0)
def do_log(self, count):
if self.err == None:
return None
if len(count) == 0:
count = 5
self.err.seek(0)
log = self.err.readlines()
i = int(count)
if i >= 0:
lines = log[-i:]
else:
lines = log[:-i]
for line in lines:
if line.find(self.core.identifier) == 26:
line = line[11:19] + line[58:]
print line[:-1]
def do_run(self, line):
line = line.strip()
if self.core.storage.file_exists(line):
run_file(line, self.core)
def do_help(self, line):
line = line.strip()
src = "try:\n tempvar_help_message__ = getattr(%(line)s, '__help__')() if hasattr(%(line)s, '__help__') else getattr(%(line)s, '__doc__')\nexcept:\n tempvar_help_message__ = None\n" % dict(line=line)
try:
self.core.host.execute(compile(src, '<help>', 'single'))
ret = self.core.host.environment['tempvar_help_message__']
except:
ret = None
finally:
if 'tempvar_help_message__' in self.core.host.environment:
del self.core.host.environment['tempvar_help_message__']
if ret:
print '\n', ret, '\n'
else:
print "No help available for '%s'." % line
def completedefault(self, text, line, begidx, endidx):
print text, line, begidx, endidx
def run_file(filename, core):
#script = xpython.convert(core.storage.load(filename))
try:
code = core.loader.load(filename, elevated=True, use_xpython=True)
core.host.execute(code)
except:
print traceback.format_exc()
def init_framework(bundle_path = None, **kwargs):
if bundle_path == None:
bundle_path = os.path.expanduser('~/Library/Application Support/Plex Media Server/Plug-ins/Framework.bundle')
library_paths = [
['Contents', 'Resources', 'Versions', '2', 'Python'],
['Contents', 'Resources', 'Platforms', 'Shared', 'Libraries'],
['Contents', 'Resources', 'Platforms', 'MacOSX', 'i386', 'Libraries'],
]
for path_arr in library_paths:
sys.path.insert(0, os.path.join(bundle_path, *path_arr))
import subsystem
import Framework
import config
plist = {
'CFBundleIdentifier' : 'com.plexapp.system.psh',
'PlexPluginCodePolicy' : 'Elevated',
}
config.console_logging = True
for kwarg in kwargs:
setattr(config, kwarg, kwargs[kwarg])
core = Framework.core.FrameworkCore(None, os.path.join(bundle_path, 'Contents', 'Resources', 'Versions', '2'), config, plist=plist)
return core
def bootstrap(bundle_path = None):
if bundle_path == None:
bundle_path = os.path.expanduser('~/Library/Application Support/Plex Media Server/Plug-ins/Framework.bundle')
script_path = os.path.abspath(sys.argv[0])
frameworks_path = os.path.join(bundle_path, 'Contents', 'Resources', 'Platforms', 'MacOSX', 'i386', 'Frameworks')
env = dict(os.environ)
env['DYLD_LIBRARY_PATH'] = frameworks_path
args = ['python2.5', script_path, '---bootstrapped']
args.extend(sys.argv[1:])
cmd = subprocess.Popen(
args,
stdin = sys.stdin,
stdout = sys.stdout,
env = env
)
try:
cmd.wait()
except KeyboardInterrupt:
cmd.terminate()
def run(bundle_path = None):
root_path = os.path.dirname(os.path.abspath(os.path.join(sys.argv[0], '..')))
lib_path = os.path.join(root_path, 'lib', 'psh')
#print root_path
if os.path.exists(lib_path):
for root, dirs, files in os.walk(lib_path):
for f in files:
if f[-4:] == '.pyc':
fp = os.path.join(root, f)
os.unlink(fp)
sys.path.insert(0, lib_path)
if len(sys.argv) == 1:
print ("Plex Platform Interactive Shell v%s" % __version__),
abs_path = None
cur_path = os.getcwd()
else:
abs_path = os.path.abspath(sys.argv[1])
cur_path = os.path.dirname(abs_path)
setattr(sys, '_stderr', sys.stderr)
setattr(sys, 'stderr', StringIO.StringIO())
del sys.argv[0]
core = init_framework(do_chdir = False)
core.host.environment['cs'] = core.storage
core.host.environment['cn'] = core.networking
core.host.environment['cr'] = core.runtime
core.host.environment['cm'] = core.messaging
additions = PshAdditions(core)
additions.script_path = abs_path
additions.lib_path = lib_path
core.host.environment['psh'] = additions
sys.meta_path.append(MetaImporter(core, cur_path, lib_path))
# Interactive shell
if len(sys.argv) == 0:
print " [Framework v%s]" % str(core.version)
psh = Psh(core, sys.stderr)
try:
psh.cmdloop()
except KeyboardInterrupt:
print "\nKeyboard Interrupt"
sys.exit(0)
else:
try:
run_file(abs_path, core)
if 'main' in core.host.environment:
core.host.execute(compile('main()', abs_path, 'exec'))
except Exception, e:
print "\n### An exception occurred while running the script."
print traceback.format_exc()
sys.exit(1)
sys.exit(0)
if __name__ == '__main__':
try:
sys.stdout.write("\x1b]2;psh\x07")
if __mode__ == mode.initializing:
bootstrap()
elif __mode__ == mode.bootstrapped:
run()
except Exception, e:
print "\n### An exception occurred in the interpreter's main method."
print traceback.format_exc()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment