Created
June 13, 2012 17:51
-
-
Save jam/2925485 to your computer and use it in GitHub Desktop.
A moderately ugly interactive shell script for the Plex framework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
__version__ = '0.1a1' | |
__script_exts__ = ['pscript', 'py'] | |
import os | |
import sys | |
import traceback | |
import cmd | |
import readline | |
import subprocess | |
import types | |
import optparse | |
try: | |
import cStringIO as StringIO | |
except: | |
import StringIO | |
class mode(object): | |
initializing = 'Initializing' | |
bootstrapped = 'Bootstrapped' | |
__mode__ = mode.initializing | |
if len(sys.argv) > 1 and sys.argv[1] == '---bootstrapped': | |
del sys.argv[1] | |
__mode__ = mode.bootstrapped | |
class ScriptModule(types.ModuleType): | |
def __init__(self, name, path, core, rel_path=None, mod_name=None): | |
types.ModuleType.__init__(self, name) | |
f = open(path, 'r') | |
src = f.read() | |
f.close() | |
code = compile(src, path, 'exec') | |
self.__dict__['__builtins__'] = dict(core.host.environment) | |
self.__dict__['__builtins__'].update(core.host.environment['__builtins__']) | |
self.__dict__['_current_code_path'] = os.path.dirname(path) | |
module_name = mod_name if mod_name else os.path.splitext(os.path.basename(path))[0] | |
if rel_path: | |
module_name = os.path.splitext(os.path.basename(rel_path))[0] + '.' + module_name | |
self.__dict__['__name__'] = module_name | |
self.__path__ = os.path.dirname(path) | |
#print "MODULE:", module_name | |
sys.modules[module_name] = self | |
exec(code) in self.__dict__ | |
def loader(path, core, existing_module=None, mod_name=None, parent=None): | |
class Loader(object): | |
def load_module(self, name): | |
if existing_module: | |
#print "Returning existing module", existing_module, name | |
return existing_module | |
lastname = name.split('.')[-1] | |
#print "\n\n\nNOW DOING", lastname, name | |
if name not in sys.modules: | |
sys.modules[name] = None | |
module = ScriptModule(name, path, core, mod_name = lastname) | |
sys.modules[name] = module | |
if '.' in name: | |
parent_name, child_name = name.rsplit('.', 1) | |
#print "SETTING MAGICAL THINGS" | |
#print parent_name, child_name | |
setattr(sys.modules[parent_name], child_name, module) | |
if parent: | |
#print "Setting parent %s of %s as %s" % (str(parent), str(module), parent.__name__) | |
module.__dict__['__builtins__'][parent.__name__] = parent | |
#print module.__dict__['__builtins__'] | |
return sys.modules[name] | |
return Loader() | |
class MetaImporter(object): | |
def __init__(self, core, *paths): | |
self._core = core | |
self._paths = paths | |
def find_module(self, fullname, paths=None): | |
try: | |
#print 'find_module', fullname, paths | |
if self._paths: | |
parts = fullname.split('.') | |
lastname = parts[-1] | |
#print 'find_module', fullname, lastname, path | |
#print lastname in sys.modules, fullname in sys.modules | |
if lastname in sys.modules: | |
return loader(None, self._core, sys.modules[lastname]) | |
elif fullname in sys.modules: | |
return loader(None, self._core, sys.modules[fullname]) | |
possible_paths = [] | |
valid_path = False | |
if paths != None: | |
for path in paths: | |
for p in self._paths: | |
#print p, path | |
if path.startswith(p): | |
valid_path = True | |
break | |
if paths != None and valid_path: | |
root_paths = paths | |
else: | |
root_paths = self._paths | |
for root_path in root_paths: | |
for ext in __script_exts__: | |
possible_paths.append(os.path.join(root_path, lastname + '.' + ext)) | |
possible_paths.append(os.path.join(root_path, lastname, 'main' + '.' + ext)) | |
possible_paths.append(os.path.join(root_path, lastname, '__init__' + '.' + ext)) | |
#for x in possible_paths: print x | |
parent = None | |
if len(parts) > 1: | |
def search_module_by_path(parts, candidates): | |
#print 'checking', parts | |
if parts[0] in candidates: | |
part = parts[0] | |
if len(parts) == 1: | |
#print "Found final part" | |
return candidates[part] | |
else: | |
#print "Checking next parts" | |
ret = search_module_by_path(parts[1:], candidates[part].__dict__) | |
if ret: | |
return ret | |
else: | |
return search_module_by_path(parts[1:], candidates[part].__dict__['__builtins__']) | |
parent = search_module_by_path(parts[:-1], sys.modules) | |
#print "PARENT:", parent | |
for pyp in possible_paths: | |
if os.path.exists(pyp): | |
return loader(pyp, self._core, mod_name=lastname, parent=parent) | |
except: | |
print "### An exception occurred attempting to import a code file." | |
print traceback.format_exc() | |
class PshAdditions(object): | |
system = os.system | |
class types(object): | |
module = ScriptModule | |
def __init__(self, core): | |
self._core = core | |
self.script_path = None | |
self.cd = core.storage.change_dir | |
@property | |
def cwd(self): | |
return self._core.storage.current_dir() | |
def cmd(self, *args): | |
cmd = subprocess.Popen( | |
args, | |
stdout=subprocess.PIPE | |
) | |
try: | |
return cmd.communicate() | |
except KeyboardInterrupt: | |
cmd.terminate() | |
def addlib(self, name): | |
lib = __import__(name) | |
#print lib | |
#print dir(lib) | |
self._core.host.environment[name] = name | |
class Psh(cmd.Cmd): | |
macros = dict( | |
about = ('print "The Plex Platform Interactive Shell is a command line interpreter that allows you to do fun things."', []), | |
) | |
system = os.system | |
def __init__(self, core, err=None): | |
cmd.Cmd.__init__(self, stdin=sys.stdin, stdout=sys.stdout) | |
self.prompt = '>>> ' | |
self.core = core | |
if err: | |
self.err =err | |
else: | |
self.err = None | |
def precmd(self, line): | |
try: | |
if len(line) == 0: | |
return line | |
elif line[0] == '#': | |
return line[1:] | |
elif line[0] == ':': | |
return 'path ' + line[1:] | |
elif line[0] == '$': | |
return 'system ' + line[1:].strip() | |
""" | |
try: | |
ret = self.core.host.execute(compile(line, '<shell>', 'single')) | |
except: | |
ret = None | |
tb_lines = traceback.format_exc().split('\n')[:-1] | |
del tb_lines[1:6] | |
if len(tb_lines) == 2: | |
del tb_lines[0] | |
print '!!! ' + '\n'.join(tb_lines) | |
""" | |
except: | |
print traceback.format_exc() | |
return line | |
def postcmd(self, stop, line): | |
if len(line): | |
print '' | |
pass | |
def default(self, line): | |
items = line.split(' ') | |
if items[0] in self.macros: | |
macro, arg_names = self.macros[items[0]] | |
arg_values = items[1:] | |
args = {} | |
for x in range(len(arg_names)): | |
name = arg_names[x] | |
if x < len(arg_values): | |
args[name] = arg_values[x] | |
else: | |
args[name] = '' | |
cmd = macro % args | |
self.core.host.execute(compile(cmd, '<macro>', 'single')) | |
else: | |
try: | |
ret = self.core.host.execute(compile(line, '<shell>', 'single')) | |
except: | |
ret = None | |
tb_lines = traceback.format_exc().split('\n')[:-1] | |
del tb_lines[1:6] | |
if len(tb_lines) == 2: | |
del tb_lines[0] | |
print '!!! ' + '\n'.join(tb_lines) | |
finally: | |
self.lastcmd = None | |
def do_path(self, line): | |
line = 'http://localhost:32400' + line | |
cmd = 'print HTTP.Request(\'%s\').content' % line | |
code = compile(cmd, '<request>', 'single') | |
self.core.host.execute(code) | |
def do_system(self, line): | |
os.system(line) | |
def do_success(self, line): | |
return None | |
def do_EOF(self, args): | |
print '' | |
sys.exit(0) | |
def do_log(self, count): | |
if self.err == None: | |
return None | |
if len(count) == 0: | |
count = 5 | |
self.err.seek(0) | |
log = self.err.readlines() | |
i = int(count) | |
if i >= 0: | |
lines = log[-i:] | |
else: | |
lines = log[:-i] | |
for line in lines: | |
if line.find(self.core.identifier) == 26: | |
line = line[11:19] + line[58:] | |
print line[:-1] | |
def do_run(self, line): | |
line = line.strip() | |
if self.core.storage.file_exists(line): | |
run_file(line, self.core) | |
def do_help(self, line): | |
line = line.strip() | |
src = "try:\n tempvar_help_message__ = getattr(%(line)s, '__help__')() if hasattr(%(line)s, '__help__') else getattr(%(line)s, '__doc__')\nexcept:\n tempvar_help_message__ = None\n" % dict(line=line) | |
try: | |
self.core.host.execute(compile(src, '<help>', 'single')) | |
ret = self.core.host.environment['tempvar_help_message__'] | |
except: | |
ret = None | |
finally: | |
if 'tempvar_help_message__' in self.core.host.environment: | |
del self.core.host.environment['tempvar_help_message__'] | |
if ret: | |
print '\n', ret, '\n' | |
else: | |
print "No help available for '%s'." % line | |
def completedefault(self, text, line, begidx, endidx): | |
print text, line, begidx, endidx | |
def run_file(filename, core): | |
#script = xpython.convert(core.storage.load(filename)) | |
try: | |
code = core.loader.load(filename, elevated=True, use_xpython=True) | |
core.host.execute(code) | |
except: | |
print traceback.format_exc() | |
def init_framework(bundle_path = None, **kwargs): | |
if bundle_path == None: | |
bundle_path = os.path.expanduser('~/Library/Application Support/Plex Media Server/Plug-ins/Framework.bundle') | |
library_paths = [ | |
['Contents', 'Resources', 'Versions', '2', 'Python'], | |
['Contents', 'Resources', 'Platforms', 'Shared', 'Libraries'], | |
['Contents', 'Resources', 'Platforms', 'MacOSX', 'i386', 'Libraries'], | |
] | |
for path_arr in library_paths: | |
sys.path.insert(0, os.path.join(bundle_path, *path_arr)) | |
import subsystem | |
import Framework | |
import config | |
plist = { | |
'CFBundleIdentifier' : 'com.plexapp.system.psh', | |
'PlexPluginCodePolicy' : 'Elevated', | |
} | |
config.console_logging = True | |
for kwarg in kwargs: | |
setattr(config, kwarg, kwargs[kwarg]) | |
core = Framework.core.FrameworkCore(None, os.path.join(bundle_path, 'Contents', 'Resources', 'Versions', '2'), config, plist=plist) | |
return core | |
def bootstrap(bundle_path = None): | |
if bundle_path == None: | |
bundle_path = os.path.expanduser('~/Library/Application Support/Plex Media Server/Plug-ins/Framework.bundle') | |
script_path = os.path.abspath(sys.argv[0]) | |
frameworks_path = os.path.join(bundle_path, 'Contents', 'Resources', 'Platforms', 'MacOSX', 'i386', 'Frameworks') | |
env = dict(os.environ) | |
env['DYLD_LIBRARY_PATH'] = frameworks_path | |
args = ['python2.5', script_path, '---bootstrapped'] | |
args.extend(sys.argv[1:]) | |
cmd = subprocess.Popen( | |
args, | |
stdin = sys.stdin, | |
stdout = sys.stdout, | |
env = env | |
) | |
try: | |
cmd.wait() | |
except KeyboardInterrupt: | |
cmd.terminate() | |
def run(bundle_path = None): | |
root_path = os.path.dirname(os.path.abspath(os.path.join(sys.argv[0], '..'))) | |
lib_path = os.path.join(root_path, 'lib', 'psh') | |
#print root_path | |
if os.path.exists(lib_path): | |
for root, dirs, files in os.walk(lib_path): | |
for f in files: | |
if f[-4:] == '.pyc': | |
fp = os.path.join(root, f) | |
os.unlink(fp) | |
sys.path.insert(0, lib_path) | |
if len(sys.argv) == 1: | |
print ("Plex Platform Interactive Shell v%s" % __version__), | |
abs_path = None | |
cur_path = os.getcwd() | |
else: | |
abs_path = os.path.abspath(sys.argv[1]) | |
cur_path = os.path.dirname(abs_path) | |
setattr(sys, '_stderr', sys.stderr) | |
setattr(sys, 'stderr', StringIO.StringIO()) | |
del sys.argv[0] | |
core = init_framework(do_chdir = False) | |
core.host.environment['cs'] = core.storage | |
core.host.environment['cn'] = core.networking | |
core.host.environment['cr'] = core.runtime | |
core.host.environment['cm'] = core.messaging | |
additions = PshAdditions(core) | |
additions.script_path = abs_path | |
additions.lib_path = lib_path | |
core.host.environment['psh'] = additions | |
sys.meta_path.append(MetaImporter(core, cur_path, lib_path)) | |
# Interactive shell | |
if len(sys.argv) == 0: | |
print " [Framework v%s]" % str(core.version) | |
psh = Psh(core, sys.stderr) | |
try: | |
psh.cmdloop() | |
except KeyboardInterrupt: | |
print "\nKeyboard Interrupt" | |
sys.exit(0) | |
else: | |
try: | |
run_file(abs_path, core) | |
if 'main' in core.host.environment: | |
core.host.execute(compile('main()', abs_path, 'exec')) | |
except Exception, e: | |
print "\n### An exception occurred while running the script." | |
print traceback.format_exc() | |
sys.exit(1) | |
sys.exit(0) | |
if __name__ == '__main__': | |
try: | |
sys.stdout.write("\x1b]2;psh\x07") | |
if __mode__ == mode.initializing: | |
bootstrap() | |
elif __mode__ == mode.bootstrapped: | |
run() | |
except Exception, e: | |
print "\n### An exception occurred in the interpreter's main method." | |
print traceback.format_exc() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment