Skip to content

Instantly share code, notes, and snippets.

@jskinner
Last active April 27, 2020 07:22
Show Gist options
  • Save jskinner/4dfa6d86f848e1e1e583883507369b67 to your computer and use it in GitHub Desktop.
Save jskinner/4dfa6d86f848e1e1e583883507369b67 to your computer and use it in GitHub Desktop.
import sublime, sublime_plugin
import re
import threading
# This is an example plugin that simulates a completion interaction with an
# LSP server, while correctly apply any edits that are received from the
# server. This is trickier than you'd expect, due to the async nature the
# server communication, the async auto complete system in Sublime Text, and
# the use of row+col offsets within the LSP protocol.
#
# Note this uses APIs that are only available in Sublime Text 4069+
#
# For reference, this plugin adds a single completion with a trigger of
# "hello", that when run replaces all "#include" occurrences in the buffer
# with smiley faces
# Apply a sequence of edits to the text buffer.
#
# Each edit is specified as a tuple of (begin_offset, end_offset, text)
#
# `when` is a token that indicates which version of the text buffer the regions
# are in relation to.
# `transform_region_from` translates these regions into the coordinate space
# of the current buffer.
class ApplyEditsCommand(sublime_plugin.TextCommand):
def run(self, edit_token, when, edits):
for e in edits:
begin, end, text = e
r = self.view.transform_region_from(sublime.Region(begin, end), when)
self.view.replace(edit_token, r, text)
# Represents a point in a TextBuffer in row+col format, as opposed to Sublime
# Text's native character offset
class RcPoint():
__slots__ = ['row', 'col']
def __init__(self, row, col):
self.row = row
self.col = col
def __repr__(self):
return str(self.row) + ":" + str(self.col)
def __eq__(x, y):
return x.row == y.row and x.col == y.col
def __lt__(x, y):
if x.row == y.row:
return x.col < y.col
else:
return x.row < y.row
def __le__(x, y):
if x.row == y.row:
return x.col <= y.col
else:
return x.row <= y.row
def __gt__(x, y):
if x.row == y.row:
return x.col > y.col
else:
return x.row > y.row
def __ge__(x, y):
if x.row == y.row:
return x.col >= y.col
else:
return x.row >= y.row
# Equivalent to a sublime.Region, but using row+col offsets, rather than
# character offsets
class RcRegion():
__slots__ = ['a', 'b']
def __init__(self, a: RcPoint, b: RcPoint):
self.a = a
self.b = b
def __repr__(self):
return str(self.a) + ", " + str(self.b)
def __eq__(x, y):
return x.a == y.a and x.b == y.b
# Convert an offset into a string into a row+column offset into the string.
# Used by simulate_server() only
def offset_to_rowcol(s: str, offset: int) -> RcPoint:
row = 0
col = 0
for c in s[:offset]:
if c == '\n':
row += 1
col = 0
else:
col += 1
return RcPoint(row,col)
# Acts in a similar manner to a LSP server: takes the current contents of the
# text buffer, spends some time thinking, and produces a sequence of edits to
# that buffer
def simulate_server(contents: str, request):
# calculate all the edits
edits = []
for m in re.finditer("#include", contents):
begin = offset_to_rowcol(contents, m.start())
end = offset_to_rowcol(contents, m.end())
edits.append((RcRegion(begin, end), "😀"))
# send the edits back to the client with a simulated delay
sublime.set_timeout(lambda: request.fill_completions(edits), 500)
# Given an RcRegion, and a change to the text buffer, transform the region
# into the coordinate space after that change has been applied.
#
# For example, if the r.a is row 10, column 5, and a newline is inserted at
# the beginning of the file, then the resulting region has its r.a set to row
# 11, column 5.
#
# Note that this mirrors the algorithm that View.transform_region_from uses
# internally
def transform_region(r: RcRegion, change: sublime.TextChange) -> RcRegion:
begin = RcPoint(change.a.row, change.a.col)
end = RcPoint(change.b.row, change.b.col)
num_added_newlines = change.str.count("\n")
if num_added_newlines > 0:
num_chars_on_last_line = len(change.str) - change.str.rfind("\n")
else:
num_chars_on_last_line = len(change.str)
if r.a >= begin and r.a <= end:
r.a = begin
elif r.a > end:
# handle the erased region
if r.a.row == end.row:
if begin.row == end.row:
r.a.col -= end.col - begin.col
else:
r.a.col -= end.col
else:
r.a.row -= (end.row - begin.row)
# handle the added text
if r.a.row == end.row:
r.a.row += num_added_newlines
r.a.col += num_chars_on_last_line
else:
r.a.row += num_added_newlines
if r.b >= begin and r.b < end:
r.b = begin
elif r.b >= end:
# handle the erased region
if r.b.row == end.row:
if begin.row == end.row:
r.b.col -= end.col - begin.col
else:
r.b.col -= end.col
else:
r.b.row -= (end.row - begin.row)
# handle the added text
if r.b.row == end.row:
r.b.row += num_added_newlines
r.b.col += num_chars_on_last_line
else:
r.b.row += num_added_newlines
return r
def check_transform_region():
assert(transform_region(RcRegion(RcPoint(10, 0), RcPoint(10, 0)), sublime.TextChange(
sublime.HistoricPosition(-1, 5, 0, 0, 0),
sublime.HistoricPosition(-1, 6, 0, 0, 0),
"")) == RcRegion(RcPoint(9, 0), RcPoint(9, 0)))
assert(transform_region(RcRegion(RcPoint(10, 0), RcPoint(10, 0)), sublime.TextChange(
sublime.HistoricPosition(-1, 5, 0, 0, 0),
sublime.HistoricPosition(-1, 6, 0, 0, 0),
"abc")) == RcRegion(RcPoint(9, 0), RcPoint(9, 0)))
assert(transform_region(RcRegion(RcPoint(10, 0), RcPoint(10, 0)), sublime.TextChange(
sublime.HistoricPosition(-1, 5, 0, 0, 0),
sublime.HistoricPosition(-1, 6, 0, 0, 0),
"\n")) == RcRegion(RcPoint(10, 0), RcPoint(10, 0)))
assert(transform_region(RcRegion(RcPoint(10, 0), RcPoint(10, 0)), sublime.TextChange(
sublime.HistoricPosition(-1, 15, 0, 0, 0),
sublime.HistoricPosition(-1, 16, 0, 0, 0),
"")) == RcRegion(RcPoint(10, 0), RcPoint(10, 0)))
assert(transform_region(RcRegion(RcPoint(10, 0), RcPoint(10, 0)), sublime.TextChange(
sublime.HistoricPosition(-1, 10, 0, 0, 0),
sublime.HistoricPosition(-1, 10, 0, 0, 0),
"xx")) == RcRegion(RcPoint(10, 0), RcPoint(10, 2)))
check_transform_region()
# Takes an edit in row+col format, and returns an edit in character offset
# format
def row_col_edit_to_offset_edit(view: sublime.View, e):
r, text = e
# convert from row_col into a text point
begin = view.text_point(r.a.row, r.a.col)
end = view.text_point(r.b.row, r.b.col)
return (begin, end, text)
# Represents an outstanding request to the 'server' (a thread running
# simulate_server() in this case).
#
# The important aspect here is self.changes_since_sent, which represents all
# the changes to the text buffer since the request was sent. This allows the
# response from the server to be adjusted to the current coordinate space of
# the buffer
class Request():
def __init__(self, listener, view, completions):
self.listener = listener
self.view = view
self.changes_since_sent = []
self.completions = sublime.CompletionList()
# Called when we get a response from the 'server'
def fill_completions(self, edits):
# The server has sent us a series of edits it wants to be made when
# the completion command is run, however the user may have edited the
# buffer since the request want sent. Adjust the response from the
# server to account for these edits
for c in self.changes_since_sent:
edits = [(transform_region(r, c), text) for r, text in edits]
# The edits are now relative to the current document, and could be
# applied now. However, we don't want to apply them now, we want to
# apply them only when the user selects the completion. Because
# further edits to the buffer may occur between now and when the user
# selects the completion, grab a token that represents the current
# state of the buffer, so the edits can be transformed a second time
# when the user does select the completion.
change_id = self.view.change_id()
# Transform the edits from row+col representation, to the simpler
# character offset representation
normal_edits = [row_col_edit_to_offset_edit(self.view, e) for e in edits]
# Create the completion item, indicating which command to run, and its
# arguments. Note that `change_id` and `normal_edits` are both simple
# types that can be JSON encoded
item = sublime.CompletionItem.command_completion(
trigger="hello",
command="apply_edits",
args={"when":change_id, "edits": normal_edits})
# The request has completed, so remove ourself from
# HelloCompletions.requests
self.listener.requests.remove(self)
self.completions.set_completions([item], 0)
class AsyncEditsListener(sublime_plugin.EventListener):
def __init__(self):
self.requests = []
def on_query_completions(self, view, prefix, locations):
req = Request(self, view, sublime.CompletionList())
# Send the contents of the current view to the simulated server. When
# the response is ready, Request.fill_completions()) will be called
contents = view.substr(sublime.Region(0, view.size()))
t = threading.Thread(
target=simulate_server,
args=(contents, req))
t.start()
self.requests.append(req)
return req.completions
def on_text_changed(self, view, changes):
# Record the changes in all pending requests
for r in self.requests:
if r.view.view_id == view.view_id:
r.changes_since_sent.extend(changes)
@rwols
Copy link

rwols commented Apr 26, 2020

But maybe we'd need some language-server-specific handling for such things.

Perhaps, but it should be a last resort :)

But this feels like it becomes off topic here.

I'll stop here.

@jskinner
Copy link
Author

I've touched on some of the items raised here in sublimehq/sublime_text#3296

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment