Skip to content

Instantly share code, notes, and snippets.

@rwols
Created December 9, 2019 10:22
Show Gist options
  • Save rwols/932c3bf601c732e1e183d53f30bd7be7 to your computer and use it in GitHub Desktop.
Save rwols/932c3bf601c732e1e183d53f30bd7be7 to your computer and use it in GitHub Desktop.
from .async_io import schedule
import asyncio
import os
import sys
import time
import codecs
import signal
import html
import sublime
import sublime_plugin
class ProcessListener:
def on_data(self, proc, data):
pass
def on_finished(self, proc):
pass
class AsyncProcess:
"""
Encapsulates asyncio.subprocess.Popen, forwarding stdout and stderr to a
supplied ProcessListener (on the asyncio event loop)
"""
def __init__(self, listener):
self.listener = listener
self.killed = False
self.start_time = time.time()
async def start(self, cmd, shell_cmd, env, path="", shell=False):
""" "path" and "shell" are options in build systems """
if not shell_cmd and not cmd:
raise ValueError("shell_cmd or cmd is required")
if shell_cmd and not isinstance(shell_cmd, str):
raise ValueError("shell_cmd must be a string")
# Hide the console window on Windows
startupinfo = None
if os.name == "nt":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
# Set temporary PATH to locate executable in cmd
if path:
old_path = os.environ["PATH"]
# The user decides in the build system whether he wants to append
# $PATH or tuck it at the front: "$PATH;C:\\new\\path",
# "C:\\new\\path;$PATH"
os.environ["PATH"] = os.path.expandvars(path)
proc_env = os.environ.copy()
proc_env.update(env)
for k, v in proc_env.items():
proc_env[k] = os.path.expandvars(v)
if sys.platform == "win32":
preexec_fn = None
else:
preexec_fn = os.setsid
if shell_cmd:
if sys.platform == "win32":
# Use shell=True on Windows, so shell_cmd is passed through
# with the correct escaping
cmd = shell_cmd
shell = True
elif sys.platform == "darwin":
# Use a login shell on OSX, otherwise the users expected env
# vars won't be setup
cmd = ["/usr/bin/env", "bash", "-l", "-c", shell_cmd]
shell = False
elif sys.platform == "linux":
# Explicitly use /bin/bash on Linux, to keep Linux and OSX as
# similar as possible. A login shell is explicitly not used for
# linux, as it's not required
cmd = ["/usr/bin/env", "bash", "-c", shell_cmd]
shell = False
self.proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
stdin=asyncio.subprocess.DEVNULL,
startupinfo=startupinfo,
env=proc_env,
preexec_fn=preexec_fn,
shell=shell)
if path:
os.environ["PATH"] = old_path
await self.read_fileno(self.proc.stdout)
def kill(self):
if not self.killed:
self.killed = True
if sys.platform == "win32":
# terminate would not kill process opened by the shell cmd.exe,
# it will only kill cmd.exe leaving the child running
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.Popen(
"taskkill /PID %d /T /F" % self.proc.pid,
startupinfo=startupinfo)
else:
os.killpg(self.proc.pid, signal.SIGTERM)
self.proc.terminate()
def poll(self):
return self.proc.returncode is None
def exit_code(self):
return self.proc.returncode
async def read_fileno(self, file):
decoder = \
codecs.getincrementaldecoder(self.listener.encoding)('replace')
while True:
data = decoder.decode(await file.read(2**16))
data = data.replace('\r\n', '\n').replace('\r', '\n')
if len(data) > 0 and not self.killed:
self.listener.on_data(self, data)
else:
self.listener.on_finished(self)
break
class ExecAsyncCommand(sublime_plugin.WindowCommand, ProcessListener):
OUTPUT_LIMIT = 2 ** 27
def __init__(self, window):
super().__init__(window)
self.proc = None
self.errs_by_file = {}
self.annotation_sets_by_buffer = {}
self.show_errors_inline = True
self.output_view = None
def run(
self,
cmd=None,
shell_cmd=None,
file_regex="",
line_regex="",
working_dir="",
encoding="utf-8",
env={},
quiet=False,
kill=False,
update_annotations_only=False,
hide_annotations_only=False,
word_wrap=True,
syntax="Packages/Text/Plain text.tmLanguage",
# Catches "path" and "shell"
**kwargs):
print("Running async version of exec command")
if update_annotations_only:
if self.show_errors_inline:
self.update_annotations()
return
if hide_annotations_only:
self.hide_annotations()
return
if kill:
if self.proc:
self.proc.kill()
return
if self.output_view is None:
# Try not to call get_output_panel until the regexes are assigned
self.output_view = self.window.create_output_panel("exec")
# Default the to the current files directory if no working directory
# was given
if (working_dir == "" and
self.window.active_view() and
self.window.active_view().file_name()):
working_dir = os.path.dirname(self.window.active_view().file_name())
self.output_view.settings().set("result_file_regex", file_regex)
self.output_view.settings().set("result_line_regex", line_regex)
self.output_view.settings().set("result_base_dir", working_dir)
self.output_view.settings().set("word_wrap", word_wrap)
self.output_view.settings().set("line_numbers", False)
self.output_view.settings().set("gutter", False)
self.output_view.settings().set("scroll_past_end", False)
self.output_view.assign_syntax(syntax)
# Call create_output_panel a second time after assigning the above
# settings, so that it'll be picked up as a result buffer
self.window.create_output_panel("exec")
self.encoding = encoding
self.quiet = quiet
self.proc = None
if not self.quiet:
if shell_cmd:
print("Running " + shell_cmd)
elif cmd:
cmd_string = cmd
if not isinstance(cmd, str):
cmd_string = " ".join(cmd)
print("Running " + cmd_string)
sublime.status_message("Building")
preferences_settings = \
sublime.load_settings("Preferences.sublime-settings")
show_panel_on_build = \
preferences_settings.get("show_panel_on_build", True)
if show_panel_on_build:
self.window.run_command("show_panel", {"panel": "output.exec"})
self.hide_annotations()
self.show_errors_inline = \
preferences_settings.get("show_errors_inline", True)
merged_env = env.copy()
if self.window.active_view():
user_env = self.window.active_view().settings().get('build_env')
if user_env:
merged_env.update(user_env)
# Change to the working dir, rather than spawning the process with it,
# so that emitted working dir relative path names make sense
if working_dir != "":
os.chdir(working_dir)
self.debug_text = ""
if shell_cmd:
self.debug_text += "[shell_cmd: " + shell_cmd + "]\n"
else:
self.debug_text += "[cmd: " + str(cmd) + "]\n"
self.debug_text += "[dir: " + str(os.getcwd()) + "]\n"
if "PATH" in merged_env:
self.debug_text += "[path: " + str(merged_env["PATH"]) + "]"
else:
self.debug_text += "[path: " + str(os.environ["PATH"]) + "]"
self.output_size = 0
self.should_update_annotations = False
try:
self.proc = AsyncProcess(self)
# Forward kwargs to AsyncProcess
schedule(self.proc.start(cmd, shell_cmd, merged_env, **kwargs))
except Exception as e:
self.write(str(e) + "\n")
self.write(self.debug_text + "\n")
if not self.quiet:
self.write("[Finished]")
def is_enabled(self, kill=False, **kwargs):
if kill:
return (self.proc is not None) and self.proc.poll()
else:
return True
def write(self, characters):
self.output_view.run_command(
'append',
{'characters': characters, 'force': True, 'scroll_to_end': True})
# Updating annotations is expensive, so batch it to the main thread
def annotations_check():
errs = self.output_view.find_all_results_with_text()
errs_by_file = {}
for file, line, column, text in errs:
if file not in errs_by_file:
errs_by_file[file] = []
errs_by_file[file].append((line, column, text))
self.errs_by_file = errs_by_file
self.update_annotations()
self.should_update_annotations = False
if not self.should_update_annotations:
if self.show_errors_inline and characters.find('\n') >= 0:
self.should_update_annotations = True
sublime.set_timeout(lambda: annotations_check())
def on_data(self, proc, data):
if proc != self.proc:
return
# Truncate past the limit
if self.output_size >= self.OUTPUT_LIMIT:
return
self.write(data)
self.output_size += len(data)
if self.output_size >= self.OUTPUT_LIMIT:
self.write('\n[Output Truncated]\n')
def on_finished(self, proc):
if proc != self.proc:
return
if proc.killed:
self.write("\n[Cancelled]")
elif not self.quiet:
elapsed = time.time() - proc.start_time
exit_code = proc.exit_code()
if exit_code == 0 or exit_code is None:
self.write("[Finished in %.1fs]" % elapsed)
else:
self.write("[Finished in %.1fs with exit code %d]\n" %
(elapsed, exit_code))
self.write(self.debug_text)
if proc.killed:
sublime.status_message("Build cancelled")
else:
errs = self.output_view.find_all_results()
if len(errs) == 0:
sublime.status_message("Build finished")
else:
sublime.status_message("Build finished with %d errors" %
len(errs))
def update_annotations(self):
stylesheet = '''
<style>
#annotation-error {
background-color: color(var(--background) blend(#fff 95%));
}
html.dark #annotation-error {
background-color: color(var(--background) blend(#fff 95%));
}
html.light #annotation-error {
background-color: color(var(--background) blend(#000 85%));
}
a {
text-decoration: inherit;
}
</style>
'''
for file, errs in self.errs_by_file.items():
view = self.window.find_open_file(file)
if view:
selection_set = []
content_set = []
line_err_set = []
for line, column, text in errs:
pt = view.text_point(line - 1, column - 1)
if (line_err_set and
line == line_err_set[len(line_err_set) - 1][0]):
line_err_set[len(line_err_set) - 1][1] += (
"<br>" + html.escape(text, quote=False))
else:
pt_b = pt + 1
if view.classify(pt) & sublime.CLASS_WORD_START:
pt_b = view.find_by_class(
pt,
forward=True,
classes=(sublime.CLASS_WORD_END))
if pt_b <= pt:
pt_b = pt + 1
selection_set.append(
sublime.Region(pt, pt_b))
line_err_set.append(
[line, html.escape(text, quote=False)])
for text in line_err_set:
content_set.append(
'<body>' + stylesheet +
'<div class="error" id=annotation-error>' +
'<span class="content">' + text[1] + '</span></div>' +
'</body>')
view.add_regions(
"exec",
selection_set,
scope="invalid",
annotations=content_set,
flags=(sublime.DRAW_SQUIGGLY_UNDERLINE |
sublime.DRAW_NO_FILL | sublime.DRAW_NO_OUTLINE),
on_close=self.hide_annotations)
def hide_annotations(self, url=""):
for file, errs in self.errs_by_file.items():
view = self.window.find_open_file(file)
if view:
view.erase_regions("exec")
view.hide_popup()
self.errs_by_file = {}
self.annotation_sets_by_buffer = {}
self.show_errors_inline = False
class ExecEventListener(sublime_plugin.EventListener):
def on_load(self, view):
w = view.window()
if w is not None:
w.run_command('exec_async', {'update_annotations_only': True})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment