Skip to content

Instantly share code, notes, and snippets.

@tompng
Created January 10, 2024 14:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tompng/1ed800cea20154ec29a28c189622b604 to your computer and use it in GitHub Desktop.
Save tompng/1ed800cea20154ec29a28c189622b604 to your computer and use it in GitHub Desktop.
class PseudoTerminalIO
def initialize(fiber)
@fiber = fiber
@buffer = []
end
def close
@buffer << nil
@closed = true
end
def read(one)
raise ArgumentError, 'Only supports read(1)' unless one == 1
loop do
Fiber.yield if @buffer.empty? && !@closed
byte_or_time = @buffer.shift
case byte_or_time
when Integer
return byte_or_time
when Float
next
when nil
return -1
end
end
end
def wait_readable(timeout)
loop do
Fiber.yield if @buffer.empty? && !@closed
case @buffer.first
when Integer
return self
when Float
timeout -= @buffer.shift
return nil if timeout < 0
when nil # closed
return nil
end
end
end
def current_screen(color: false)
rendered_lines = Reline.core.line_editor.instance_variable_get(:@rendered_screen_cache)
return unless rendered_lines
screen = rendered_lines.map do |items|
item_ids = []
items.each_with_index { |(x, w), i| item_ids.fill(i, x, w) }
(0...item_ids.size).chunk { |i| item_ids[i] || -1 }.map do |id, chunk|
if id == -1
' ' * chunk.size
else
x, _w, text = items[id]
Reline::Unicode.take_range(text, chunk[0] - x, chunk.size)
end
end.join
end
unless color
screen.map! do |line|
line.gsub(/\e\[[0-9;]*m/, '').rstrip
end
end
screen
end
def write(text)
raise 'closed' if @closed
text.each_byte { |byte| @buffer << byte }
@fiber.resume
end
def wait(time)
@buffer << time.to_f
@fiber.resume
end
end
def assert_readmultiline(prompt: '', add_hist: false, termination_proc:, expected: :unspecified, &block)
action = -> { Reline.readmultiline(prompt, add_hist, &termination_proc) }
result = with_pseudo_terminal(action, &block)
assert_equal(expected, result) unless expected == :unspecified
end
def assert_readline(prompt: '', add_hist: false, expected: :unspecified)
action = -> { Reline.readline(prompt, add_hist) }
result = with_pseudo_terminal(action, &block)
assert_equal(expected, result) unless expected == :unspecified
end
def with_pseudo_terminal(block)
original_output = Reline.instance_variable_get(:@output)
original_input = Reline::GeneralIO.class_variable_get(:@@input)
original_getc = Reline::GeneralIO.method(:getc)
Reline::GeneralIO.singleton_class.remove_method(:getc)
Reline::GeneralIO.define_singleton_method :getc do |timeout_second|
buf = Reline::GeneralIO.class_variable_get(:@@buf)
return buf.shift unless buf.empty?
input = Reline::GeneralIO.class_variable_get(:@@input)
input.read(1) if input.wait_readable(timeout_second)
end
require 'stringio'
Reline.output = StringIO.new
result = :not_ended
io = nil
fiber = Fiber.new do
result = block.call
io.close
end
io = PseudoTerminalIO.new(fiber)
Reline::GeneralIO.input = io
fiber.resume
yield io
io.close
result
ensure
Reline::GeneralIO.input = original_input
Reline.output = original_output
Reline::GeneralIO.singleton_class.remove_method(:getc)
Reline::GeneralIO.define_singleton_method(:getc, original_getc)
end
Reline.autocompletion = true
Reline::HISTORY << 'abc'
Reline.completion_proc = ->(target,_pre,_post){100.times.map{|i|"a#{i}"}.select{_1.start_with? target}}
t=Time.now
1000.times do
assert_readmultiline(expected: "a\nb", prompt: '>', termination_proc: ->*{true}) do |io|
io.write "a\C-r\e"
assert_equal io.current_screen, ["(reverse-i-search)`': a"]
io.wait 0.49999999999
assert_equal io.current_screen, ["(reverse-i-search)`': a"]
io.wait 0.00000000002
assert_equal io.current_screen, [">a"]
io.write "\C-e\e\nb"
assert_equal io.current_screen, [">a", ">b"]
io.write "\n"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment