Skip to content

Instantly share code, notes, and snippets.

@rwols
Last active December 9, 2019 12:33
Show Gist options
  • Save rwols/514f072648d7c51241da11d5e58efa33 to your computer and use it in GitHub Desktop.
Save rwols/514f072648d7c51241da11d5e58efa33 to your computer and use it in GitHub Desktop.
from sublime import CompletionItem
from sublime import CompletionList
from sublime import Edit
from sublime import message_dialog
from sublime import View
from sublime_plugin import EventListener
from sublime_plugin import TextCommand
from sublime_plugin import ViewEventListener
from threading import Thread
from typing import Awaitable
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Generator
from contextlib import contextmanager
import asyncio
__loop: Optional[asyncio.AbstractEventLoop] = None
__thread: Optional[Thread] = None
@contextmanager
def wrapped_print(prefix: str, begin: str, end: str) -> Generator:
print(f"{prefix}: {begin}")
yield
print(f"{prefix}: {end}")
def plugin_loaded() -> None:
with wrapped_print("loop", "starting", "started"):
global __loop
global __thread
__loop = asyncio.new_event_loop()
__thread = Thread(target=__loop.run_forever)
__thread.start()
def __shutdown() -> None:
for task in asyncio.Task.all_tasks():
task.cancel()
asyncio.get_event_loop().stop()
def plugin_unloaded() -> None:
with wrapped_print("loop", "stopping", "stopped"):
global __loop
global __thread
if __loop and __thread:
__loop.call_soon_threadsafe(__shutdown)
__thread.join()
__loop.run_until_complete(__loop.shutdown_asyncgens())
__loop.close()
__loop = None
__thread = None
def blocking_greet() -> None:
print("hello from a blocking function")
async def greet_twice() -> None:
print("hello one!")
await asyncio.sleep(1)
print("hello two!")
async def greet_twice_cancellation() -> None:
print("hello world!")
try:
await asyncio.sleep(1)
print("hello yet again world!")
except asyncio.CancelledError:
print("oh no! I was cancelled! Goodbye world!")
def blocking_compute(n: int) -> int:
print("compute: start", n)
import time
time.sleep(2) # zzz... working very hard
print("compute: done", n)
return n // 2
async def compute_in_threadpool(view: View, point: int, *args: int) -> None:
loop = asyncio.get_event_loop()
coros = (loop.run_in_executor(None, blocking_compute, arg) for arg in args)
results = await asyncio.gather(*coros, loop=loop)
if view and view.is_valid():
view.show_popup(", ".join(map(str, results)), 0, point)
GLYPHS = ("-", "\\", "|", "/", "-", "\\", "|", "/")
async def activity_monitor(view: View, key: str) -> None:
index = 0
try:
while True:
view.set_status(key, f"[{GLYPHS[index]}]")
index += 1
index %= len(GLYPHS)
await asyncio.sleep(0.1)
except Exception:
pass
finally:
try:
view.erase_status(key)
except Exception:
pass
def schedule(coro: Awaitable) -> None:
global __loop
if __loop:
__loop.call_soon_threadsafe(asyncio.ensure_future, coro)
class MyEventListener(EventListener):
def on_selection_modified(self, view: View) -> None:
if view.settings().get("is_widget", False):
return
schedule(greet_twice_cancellation())
# def on_hover(self, view: View, point: int, hover_zone: int) -> None:
# if view.settings().get("is_widget", False):
# return
# coro = compute_in_threadpool(
# view, point, 2, 4, 8, 16, 32, 64, 128, 256)
# schedule(coro)
class ActivityMonitorCommand(TextCommand):
keys: Dict[str, asyncio.Task] = {}
async def run_async(self, key: str) -> None:
task = self.keys.pop(key, None)
if task is None:
task = asyncio.create_task(activity_monitor(self.view, key))
self.keys[key] = task
else:
task.cancel()
def run(self, edit: Edit, key: str) -> None:
schedule(self.run_async(key))
class RunExternalProgramCommand(TextCommand):
async def run_async(self, cmd: str, stdin_input: str) -> None:
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE)
encoding = "UTF-8"
errs = "replace"
inputbytes = stdin_input.encode(encoding, errs)
stdout_data, _ = await process.communicate(inputbytes)
message_dialog(stdout_data.decode(encoding, errs))
def run(self, edit: Edit, cmd: str, stdin_input: str) -> None:
schedule(self.run_async(cmd, stdin_input))
class MyViewEventListener(ViewEventListener):
def on_query_completions(
self,
prefix: str,
locations: List[int]
) -> Optional[CompletionList]:
if len(locations) != 1:
return None
promise = CompletionList()
coro = self.on_query_completions_async(prefix, locations[0], promise)
schedule(coro)
return promise
async def on_query_completions_async(
self,
prefix: str,
location: int,
promise: CompletionList
) -> None:
await asyncio.sleep(1) # fetching completions from far away ...
snip = CompletionItem.snippet_completion
promise.set_completions([
snip("foo", "${1:foo}", "Does the foo"),
snip("bar", "${1:bar}", "Does the bar"),
snip("baz", "${1:baz}", "Does the baz")
])
async def __wrapper(f: Callable[[], None], timeout_ms: int) -> None:
if timeout_ms > 0:
await asyncio.sleep(timeout_ms / 1000)
f()
def set_timeout_async(f: Callable[[], None], timeout_ms: int = 0) -> None:
schedule(__wrapper(f, timeout_ms))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment