-
-
Save rwols/2f24cdf9be284ca57614cbf45bf0fb40 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import contextmanager | |
from typing import Dict | |
from typing import Generator | |
from typing import Optional | |
import asyncio | |
import asyncio.events | |
import asyncio.unix_events | |
import heapq | |
import logging | |
import sublime | |
import sublime_plugin | |
import sys | |
import threading | |
logging.basicConfig(level=logging.DEBUG) | |
@contextmanager | |
def wrapped_print(prefix: str, begin: str, end: str) -> Generator: | |
print(f"{prefix}: {begin}") | |
yield | |
print(f"{prefix}: {end}") | |
# Minimum number of _scheduled timer handles before cleanup of | |
# cancelled handles is performed. | |
_MIN_SCHEDULED_TIMER_HANDLES = 100 | |
# Minimum fraction of _scheduled timer handles that are cancelled | |
# before cleanup of cancelled handles is performed. | |
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 | |
def _format_handle(handle): | |
cb = handle._callback | |
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): | |
# format the task | |
return repr(cb.__self__) | |
else: | |
return str(handle) | |
class SublimeEventLoop(asyncio.unix_events.SelectorEventLoop): | |
def _do_iteration(self) -> None: | |
if self._stopping: | |
self._cleanup() | |
return | |
try: | |
self._run_once() | |
sublime.set_timeout(self._do_iteration, 10) | |
except Exception: | |
self._cleanup() | |
def _cleanup(self) -> None: | |
self._thread_id = None | |
asyncio.events._set_running_loop(None) | |
self._set_coroutine_origin_tracking(False) | |
sys.set_asyncgen_hooks(*self.old_agen_hooks) | |
self._stopping = False | |
def run_forever(self) -> None: | |
self._check_closed() | |
if self.is_running(): | |
raise RuntimeError('This event loop is already running') | |
if asyncio.events._get_running_loop() is not None: | |
raise RuntimeError( | |
'Cannot run the event loop while another loop is running') | |
self._set_coroutine_origin_tracking(self._debug) | |
self._thread_id = threading.get_ident() | |
self.old_agen_hooks = sys.get_asyncgen_hooks() | |
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, | |
finalizer=self._asyncgen_finalizer_hook) | |
asyncio.events._set_running_loop(self) | |
sublime.set_timeout(self._do_iteration, 10) | |
def _run_once(self) -> None: | |
"""Run one full iteration of the event loop. | |
This calls all currently ready callbacks, polls for I/O, | |
schedules the resulting callbacks, and finally schedules | |
'call_later' callbacks. | |
""" | |
sched_count = len(self._scheduled) | |
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and | |
self._timer_cancelled_count / sched_count > | |
_MIN_CANCELLED_TIMER_HANDLES_FRACTION): | |
# Remove delayed calls that were cancelled if their number | |
# is too high | |
new_scheduled = [] | |
for handle in self._scheduled: | |
if handle._cancelled: | |
handle._scheduled = False | |
else: | |
new_scheduled.append(handle) | |
heapq.heapify(new_scheduled) | |
self._scheduled = new_scheduled | |
self._timer_cancelled_count = 0 | |
else: | |
# Remove delayed calls that were cancelled from head of queue. | |
while self._scheduled and self._scheduled[0]._cancelled: | |
self._timer_cancelled_count -= 1 | |
handle = heapq.heappop(self._scheduled) | |
handle._scheduled = False | |
event_list = self._selector.select(0) | |
self._process_events(event_list) | |
# Handle 'later' callbacks that are ready. | |
end_time = self.time() + self._clock_resolution | |
while self._scheduled: | |
handle = self._scheduled[0] | |
if handle._when >= end_time: | |
break | |
handle = heapq.heappop(self._scheduled) | |
handle._scheduled = False | |
self._ready.append(handle) | |
# This is the only place where callbacks are actually *called*. | |
# All other places just add them to ready. | |
# Note: We run all currently scheduled callbacks, but not any | |
# callbacks scheduled by callbacks run this time around -- | |
# they will be run the next time (after another I/O poll). | |
# Use an idiom that is thread-safe without using locks. | |
ntodo = len(self._ready) | |
for i in range(ntodo): | |
handle = self._ready.popleft() | |
if handle._cancelled: | |
continue | |
if self._debug: | |
try: | |
self._current_handle = handle | |
t0 = self.time() | |
handle._run() | |
dt = self.time() - t0 | |
if dt >= self.slow_callback_duration: | |
logging.warning('Executing %s took %.3f seconds', | |
_format_handle(handle), dt) | |
finally: | |
self._current_handle = None | |
else: | |
handle._run() | |
handle = None # Needed to break cycles when an exception occurs. | |
__loop: Optional[asyncio.AbstractEventLoop] = None | |
def plugin_loaded() -> None: | |
global __loop | |
if __loop is None: | |
try: | |
__loop = asyncio.get_running_loop() | |
print("loop: already running") | |
return | |
except RuntimeError: | |
pass | |
with wrapped_print("loop", "starting", "started"): | |
__loop = SublimeEventLoop() | |
__loop.set_debug(True) | |
__loop.run_forever() | |
GLYPHS = ("-", "\\", "|", "/", "-", "\\", "|", "/") | |
async def activity_monitor(view: sublime.View, key: str) -> None: | |
index = 0 | |
try: | |
while True: | |
view.set_status(key, f"[{GLYPHS[index]}]") | |
index += 1 | |
index %= len(GLYPHS) | |
await asyncio.sleep(0.1) | |
except Exception: | |
pass | |
finally: | |
try: | |
view.erase_status(key) | |
except Exception: | |
pass | |
class ActivityMonitorCommand(sublime_plugin.TextCommand): | |
keys: Dict[str, asyncio.Task] = {} | |
def run(self, edit: sublime.Edit, key: str) -> None: | |
task = self.keys.pop(key, None) | |
if task is None: | |
task = asyncio.create_task(activity_monitor(self.view, key)) | |
self.keys[key] = task | |
else: | |
task.cancel() | |
class RunExternalProgramCommand(sublime_plugin.TextCommand): | |
async def run_async(self, cmd: str, stdin_input: str) -> None: | |
process = await asyncio.create_subprocess_shell( | |
cmd, | |
stdout=asyncio.subprocess.PIPE, | |
stdin=asyncio.subprocess.PIPE) | |
encoding = "UTF-8" | |
errs = "replace" | |
inputbytes = stdin_input.encode(encoding, errs) | |
stdout_data, _ = await process.communicate(inputbytes) | |
sublime.message_dialog(stdout_data.decode(encoding, errs)) | |
def run(self, edit: sublime.Edit, cmd: str, stdin_input: str) -> None: | |
asyncio.get_running_loop().create_task(self.run_async(cmd, stdin_input)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment