Skip to content

Instantly share code, notes, and snippets.

@gerardroche
Created September 23, 2016 15:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gerardroche/ee5699cd6cbaab04b9e35a78d272f4c5 to your computer and use it in GitHub Desktop.
Save gerardroche/ee5699cd6cbaab04b9e35a78d272f4c5 to your computer and use it in GitHub Desktop.
Default/exec.py (Sublime Text 3124 Default Package)
import collections
import functools
import html
import os
import subprocess
import sys
import threading
import time
import sublime
import sublime_plugin
class ProcessListener(object):
def on_data(self, proc, data):
pass
def on_finished(self, proc):
pass
class AsyncProcess(object):
"""
Encapsulates subprocess.Popen, forwarding stdout to a supplied
ProcessListener (on a separate thread)
"""
def __init__(self, cmd, shell_cmd, env, listener, path="", shell=False):
""" "path" and "shell" are options in build systems """
if not shell_cmd and not cmd:
raise ValueError("shell_cmd or cmd is required")
if shell_cmd and not isinstance(shell_cmd, str):
raise ValueError("shell_cmd must be a string")
self.listener = listener
self.killed = False
self.start_time = time.time()
# Hide the console window on Windows
startupinfo = None
if os.name == "nt":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
# Set temporary PATH to locate executable in cmd
if path:
old_path = os.environ["PATH"]
# The user decides in the build system whether he wants to append $PATH
# or tuck it at the front: "$PATH;C:\\new\\path", "C:\\new\\path;$PATH"
os.environ["PATH"] = os.path.expandvars(path)
proc_env = os.environ.copy()
proc_env.update(env)
for k, v in proc_env.items():
proc_env[k] = os.path.expandvars(v)
if shell_cmd and sys.platform == "win32":
# Use shell=True on Windows, so shell_cmd is passed through with the correct escaping
self.proc = subprocess.Popen(
shell_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
startupinfo=startupinfo,
env=proc_env,
shell=True)
elif shell_cmd and sys.platform == "darwin":
# Use a login shell on OSX, otherwise the users expected env vars won't be setup
self.proc = subprocess.Popen(
["/bin/bash", "-l", "-c", shell_cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
startupinfo=startupinfo,
env=proc_env,
shell=False)
elif shell_cmd and sys.platform == "linux":
# Explicitly use /bin/bash on Linux, to keep Linux and OSX as
# similar as possible. A login shell is explicitly not used for
# linux, as it's not required
self.proc = subprocess.Popen(
["/bin/bash", "-c", shell_cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
startupinfo=startupinfo,
env=proc_env,
shell=False)
else:
# Old style build system, just do what it asks
self.proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
startupinfo=startupinfo,
env=proc_env,
shell=shell)
if path:
os.environ["PATH"] = old_path
if self.proc.stdout:
threading.Thread(target=self.read_stdout).start()
if self.proc.stderr:
threading.Thread(target=self.read_stderr).start()
def kill(self):
if not self.killed:
self.killed = True
if sys.platform == "win32":
# terminate would not kill process opened by the shell cmd.exe,
# it will only kill cmd.exe leaving the child running
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.Popen(
"taskkill /PID " + str(self.proc.pid),
startupinfo=startupinfo)
else:
self.proc.terminate()
self.listener = None
def poll(self):
return self.proc.poll() is None
def exit_code(self):
return self.proc.poll()
def read_stdout(self):
while True:
data = os.read(self.proc.stdout.fileno(), 2**15)
if len(data) > 0:
if self.listener:
self.listener.on_data(self, data)
else:
self.proc.stdout.close()
if self.listener:
self.listener.on_finished(self)
break
def read_stderr(self):
while True:
data = os.read(self.proc.stderr.fileno(), 2**15)
if len(data) > 0:
if self.listener:
self.listener.on_data(self, data)
else:
self.proc.stderr.close()
break
class ExecCommand(sublime_plugin.WindowCommand, ProcessListener):
BLOCK_SIZE = 2**14
text_queue = collections.deque()
text_queue_proc = None
text_queue_lock = threading.Lock()
proc = None
errs_by_file = {}
phantom_sets_by_buffer = {}
show_errors_inline = True
def run(
self,
cmd=None,
shell_cmd=None,
file_regex="",
line_regex="",
working_dir="",
encoding="utf-8",
env={},
quiet=False,
kill=False,
update_phantoms_only=False,
hide_phantoms_only=False,
word_wrap=True,
syntax="Packages/Text/Plain text.tmLanguage",
# Catches "path" and "shell"
**kwargs):
if update_phantoms_only:
if self.show_errors_inline:
self.update_phantoms()
return
if hide_phantoms_only:
self.hide_phantoms()
return
# clear the text_queue
self.text_queue_lock.acquire()
try:
self.text_queue.clear()
self.text_queue_proc = None
finally:
self.text_queue_lock.release()
if kill:
if self.proc:
self.proc.kill()
self.proc = None
self.append_string(None, "[Cancelled]")
return
if not hasattr(self, 'output_view'):
# Try not to call get_output_panel until the regexes are assigned
self.output_view = self.window.create_output_panel("exec")
# Default the to the current files directory if no working directory was given
if working_dir == "" and self.window.active_view() and self.window.active_view().file_name():
working_dir = os.path.dirname(self.window.active_view().file_name())
self.output_view.settings().set("result_file_regex", file_regex)
self.output_view.settings().set("result_line_regex", line_regex)
self.output_view.settings().set("result_base_dir", working_dir)
self.output_view.settings().set("word_wrap", word_wrap)
self.output_view.settings().set("line_numbers", False)
self.output_view.settings().set("gutter", False)
self.output_view.settings().set("scroll_past_end", False)
self.output_view.assign_syntax(syntax)
# Call create_output_panel a second time after assigning the above
# settings, so that it'll be picked up as a result buffer
self.window.create_output_panel("exec")
self.encoding = encoding
self.quiet = quiet
self.proc = None
if not self.quiet:
if shell_cmd:
print("Running " + shell_cmd)
elif cmd:
print("Running " + " ".join(cmd))
sublime.status_message("Building")
show_panel_on_build = sublime.load_settings("Preferences.sublime-settings").get("show_panel_on_build", True)
if show_panel_on_build:
self.window.run_command("show_panel", {"panel": "output.exec"})
self.hide_phantoms()
self.show_errors_inline = sublime.load_settings("Preferences.sublime-settings").get("show_errors_inline", True)
merged_env = env.copy()
if self.window.active_view():
user_env = self.window.active_view().settings().get('build_env')
if user_env:
merged_env.update(user_env)
# Change to the working dir, rather than spawning the process with it,
# so that emitted working dir relative path names make sense
if working_dir != "":
os.chdir(working_dir)
self.debug_text = ""
if shell_cmd:
self.debug_text += "[shell_cmd: " + shell_cmd + "]\n"
else:
self.debug_text += "[cmd: " + str(cmd) + "]\n"
self.debug_text += "[dir: " + str(os.getcwd()) + "]\n"
if "PATH" in merged_env:
self.debug_text += "[path: " + str(merged_env["PATH"]) + "]"
else:
self.debug_text += "[path: " + str(os.environ["PATH"]) + "]"
try:
# Forward kwargs to AsyncProcess
self.proc = AsyncProcess(cmd, shell_cmd, merged_env, self, **kwargs)
self.text_queue_lock.acquire()
try:
self.text_queue_proc = self.proc
finally:
self.text_queue_lock.release()
except Exception as e:
self.append_string(None, str(e) + "\n")
self.append_string(None, self.debug_text + "\n")
if not self.quiet:
self.append_string(None, "[Finished]")
def is_enabled(self, kill=False, **kwargs):
if kill:
return (self.proc is not None) and self.proc.poll()
else:
return True
def append_string(self, proc, str):
self.text_queue_lock.acquire()
was_empty = False
try:
if proc != self.text_queue_proc:
# a second call to exec has been made before the first one
# finished, ignore it instead of intermingling the output.
if proc:
proc.kill()
return
if len(self.text_queue) == 0:
was_empty = True
self.text_queue.append("")
available = self.BLOCK_SIZE - len(self.text_queue[-1])
if len(str) < available:
cur = self.text_queue.pop()
self.text_queue.append(cur + str)
else:
self.text_queue.append(str)
finally:
self.text_queue_lock.release()
if was_empty:
sublime.set_timeout(self.service_text_queue, 0)
def service_text_queue(self):
self.text_queue_lock.acquire()
is_empty = False
try:
if len(self.text_queue) == 0:
# this can happen if a new build was started, which will clear
# the text_queue
return
characters = self.text_queue.popleft()
is_empty = (len(self.text_queue) == 0)
finally:
self.text_queue_lock.release()
self.output_view.run_command(
'append',
{'characters': characters, 'force': True, 'scroll_to_end': True})
if self.show_errors_inline and characters.find('\n') >= 0:
errs = self.output_view.find_all_results_with_text()
errs_by_file = {}
for file, line, column, text in errs:
if file not in errs_by_file:
errs_by_file[file] = []
errs_by_file[file].append((line, column, text))
self.errs_by_file = errs_by_file
self.update_phantoms()
if not is_empty:
sublime.set_timeout(self.service_text_queue, 1)
def finish(self, proc):
if not self.quiet:
elapsed = time.time() - proc.start_time
exit_code = proc.exit_code()
if exit_code == 0 or exit_code is None:
self.append_string(proc, "[Finished in %.1fs]" % elapsed)
else:
self.append_string(proc, "[Finished in %.1fs with exit code %d]\n" % (elapsed, exit_code))
self.append_string(proc, self.debug_text)
if proc != self.proc:
return
errs = self.output_view.find_all_results()
if len(errs) == 0:
sublime.status_message("Build finished")
else:
sublime.status_message("Build finished with %d errors" % len(errs))
def on_data(self, proc, data):
try:
characters = data.decode(self.encoding)
except:
characters = "[Decode error - output not " + self.encoding + "]\n"
proc = None
# Normalize newlines, Sublime Text always uses a single \n separator
# in memory.
characters = characters.replace('\r\n', '\n').replace('\r', '\n')
self.append_string(proc, characters)
def on_finished(self, proc):
sublime.set_timeout(functools.partial(self.finish, proc), 0)
def update_phantoms(self):
stylesheet = '''
<style>
div.error {
padding: 0.4rem 0 0.4rem 0.7rem;
margin: 0.2rem 0;
border-radius: 2px;
}
div.error span.message {
padding-right: 0.7rem;
}
div.error a {
text-decoration: inherit;
padding: 0.35rem 0.7rem 0.45rem 0.8rem;
position: relative;
bottom: 0.05rem;
border-radius: 0 2px 2px 0;
font-weight: bold;
}
html.dark div.error a {
background-color: #00000018;
}
html.light div.error a {
background-color: #ffffff18;
}
</style>
'''
for file, errs in self.errs_by_file.items():
view = self.window.find_open_file(file)
if view:
buffer_id = view.buffer_id()
if buffer_id not in self.phantom_sets_by_buffer:
phantom_set = sublime.PhantomSet(view, "exec")
self.phantom_sets_by_buffer[buffer_id] = phantom_set
else:
phantom_set = self.phantom_sets_by_buffer[buffer_id]
phantoms = []
for line, column, text in errs:
pt = view.text_point(line - 1, column - 1)
phantoms.append(sublime.Phantom(
sublime.Region(pt, view.line(pt).b),
('<body id=inline-error>' + stylesheet +
'<div class="error">' +
'<span class="message">' + html.escape(text, quote=False) + '</span>' +
'<a href=hide>' + chr(0x00D7) + '</a></div>' +
'</body>'),
sublime.LAYOUT_BELOW,
on_navigate=self.on_phantom_navigate))
phantom_set.update(phantoms)
def hide_phantoms(self):
for file, errs in self.errs_by_file.items():
view = self.window.find_open_file(file)
if view:
view.erase_phantoms("exec")
self.errs_by_file = {}
self.phantom_sets_by_buffer = {}
self.show_errors_inline = False
def on_phantom_navigate(self, url):
self.hide_phantoms()
class ExecEventListener(sublime_plugin.EventListener):
def on_load(self, view):
w = view.window()
if w is not None:
w.run_command('exec', {'update_phantoms_only': True})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment