Skip to content

Instantly share code, notes, and snippets.

@rwols
Created December 10, 2019 21:13
Show Gist options
  • Save rwols/2f24cdf9be284ca57614cbf45bf0fb40 to your computer and use it in GitHub Desktop.
Save rwols/2f24cdf9be284ca57614cbf45bf0fb40 to your computer and use it in GitHub Desktop.
from contextlib import contextmanager
from typing import Dict
from typing import Generator
from typing import Optional
import asyncio
import asyncio.events
import asyncio.unix_events
import heapq
import logging
import sublime
import sublime_plugin
import sys
import threading
logging.basicConfig(level=logging.DEBUG)
@contextmanager
def wrapped_print(prefix: str, begin: str, end: str) -> Generator:
print(f"{prefix}: {begin}")
yield
print(f"{prefix}: {end}")
# Minimum number of _scheduled timer handles before cleanup of
# cancelled handles is performed.
_MIN_SCHEDULED_TIMER_HANDLES = 100
# Minimum fraction of _scheduled timer handles that are cancelled
# before cleanup of cancelled handles is performed.
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
def _format_handle(handle):
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task):
# format the task
return repr(cb.__self__)
else:
return str(handle)
class SublimeEventLoop(asyncio.unix_events.SelectorEventLoop):
def _do_iteration(self) -> None:
if self._stopping:
self._cleanup()
return
try:
self._run_once()
sublime.set_timeout(self._do_iteration, 10)
except Exception:
self._cleanup()
def _cleanup(self) -> None:
self._thread_id = None
asyncio.events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
sys.set_asyncgen_hooks(*self.old_agen_hooks)
self._stopping = False
def run_forever(self) -> None:
self._check_closed()
if self.is_running():
raise RuntimeError('This event loop is already running')
if asyncio.events._get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')
self._set_coroutine_origin_tracking(self._debug)
self._thread_id = threading.get_ident()
self.old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
asyncio.events._set_running_loop(self)
sublime.set_timeout(self._do_iteration, 10)
def _run_once(self) -> None:
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
event_list = self._selector.select(0)
self._process_events(event_list)
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logging.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None # Needed to break cycles when an exception occurs.
__loop: Optional[asyncio.AbstractEventLoop] = None
def plugin_loaded() -> None:
global __loop
if __loop is None:
try:
__loop = asyncio.get_running_loop()
print("loop: already running")
return
except RuntimeError:
pass
with wrapped_print("loop", "starting", "started"):
__loop = SublimeEventLoop()
__loop.set_debug(True)
__loop.run_forever()
GLYPHS = ("-", "\\", "|", "/", "-", "\\", "|", "/")
async def activity_monitor(view: sublime.View, key: str) -> None:
index = 0
try:
while True:
view.set_status(key, f"[{GLYPHS[index]}]")
index += 1
index %= len(GLYPHS)
await asyncio.sleep(0.1)
except Exception:
pass
finally:
try:
view.erase_status(key)
except Exception:
pass
class ActivityMonitorCommand(sublime_plugin.TextCommand):
keys: Dict[str, asyncio.Task] = {}
def run(self, edit: sublime.Edit, key: str) -> None:
task = self.keys.pop(key, None)
if task is None:
task = asyncio.create_task(activity_monitor(self.view, key))
self.keys[key] = task
else:
task.cancel()
class RunExternalProgramCommand(sublime_plugin.TextCommand):
async def run_async(self, cmd: str, stdin_input: str) -> None:
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE)
encoding = "UTF-8"
errs = "replace"
inputbytes = stdin_input.encode(encoding, errs)
stdout_data, _ = await process.communicate(inputbytes)
sublime.message_dialog(stdout_data.decode(encoding, errs))
def run(self, edit: sublime.Edit, cmd: str, stdin_input: str) -> None:
asyncio.get_running_loop().create_task(self.run_async(cmd, stdin_input))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment