-
-
Save Fingercomp/b4ca856bab31e8f8c2488f85012c24d5 to your computer and use it in GitHub Desktop.
A Stem client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local buffer = require("buffer") | |
local com = require("component") | |
local comp = require("computer") | |
local event = require("event") | |
local inet = require("internet") | |
local term = require("term") | |
local text = require("text") | |
local gpu = com.gpu | |
local w, h = gpu.getResolution() | |
local function writeLine(color, line) | |
local oldFg | |
if gpu.getForeground() ~= color then | |
oldFg = gpu.setForeground(color) | |
end | |
local lines = 0 | |
for line in text.wrappedLines(line, w + 1, w + 1) do | |
lines = lines + 1 | |
end | |
gpu.copy(1, 1, w, h - 1, 0, -lines) | |
local i = 0 | |
for line in text.wrappedLines(line, w + 1, w + 1) do | |
gpu.set(1, h - lines + i, (" "):rep(w)) | |
gpu.set(1, h - lines + i, line) | |
i = i + 1 | |
end | |
if oldFg then | |
gpu.setForeground(oldFg) | |
end | |
end | |
local newClient do | |
local meta = { | |
__index = { | |
__opcodes = { | |
message = 0, | |
subscribe = 1, | |
unsubscribe = 2, | |
ping = 3, | |
pong = 4, | |
}, | |
__craftPacket = function(self, opcode, data) | |
return (">s2"):pack(string.char(opcode) .. data) | |
end, | |
__parsePacket = function(self, packet) | |
local opcode, data = (">I1"):unpack(packet), packet:sub(2) | |
return self.__parsers[opcode](data) | |
end, | |
send = function(self, channel, message) | |
return self:write(self:__craftPacket(self.__opcodes.message, (">s1"):pack(channel) .. message)) | |
end, | |
subscribe = function(self, channel) | |
return self:write(self:__craftPacket(self.__opcodes.subscribe, (">s1"):pack(channel))) | |
end, | |
unsubscribe = function(self, channel) | |
return self:write(self:__craftPacket(self.__opcodes.unsubscribe, (">s1"):pack(channel))) | |
end, | |
ping = function(self, message) | |
return self:write(self:__craftPacket(self.__opcodes.ping, message)) | |
end, | |
pong = function(self, message) | |
return self:write(self:__craftPacket(self.__opcodes.pong, message)) | |
end, | |
connect = function(self) | |
local socketStream = assert(inet.socket(self.__address)) -- ① | |
local socket = socketStream.socket -- ② | |
local start = comp.uptime() -- ③ | |
while true do | |
local status, err = socket.finishConnect() | |
if status then | |
break | |
end | |
if status == nil then | |
error(("connection failed: %s"):format(err or "unknown error")) -- ④ | |
end | |
if comp.uptime() >= start + self.__connectionTimeout then | |
socket.close() | |
error("connection failed: timed out") -- ④ | |
end | |
os.sleep(0.05) | |
end | |
self.__socket = socket -- ⑤ | |
self.__buffer = buffer.new("rwb", socketStream) -- ⑥ | |
self.__buffer:setTimeout(self.__readTimeout) -- ⑦ | |
self.__buffer:setvbuf("no", 512) -- ⑧ | |
for _, channel in ipairs(self.__channels) do | |
self:subscribe(channel) | |
end | |
end, | |
write = function(self, data) | |
return assert(self.__buffer:write(data)) | |
end, | |
readOne = function(self, callback) -- ⑥ | |
self.__buffer:setTimeout(0) -- ① | |
local status, head, err = pcall(self.__buffer.read, self.__buffer, 2) | |
self.__buffer:setTimeout(self.__readTimeout) | |
if not status and head:match("timeout$") then | |
return | |
end | |
assert(status, head) -- ② | |
local length = (">I2"):unpack(assert(head, err)) -- ③ | |
local packet = self:__parsePacket(assert(self.__buffer:read(length))) -- ④ | |
if packet.type == "ping" then -- ⑤ | |
self:pong(packet.message) | |
end | |
callback(self, packet) -- ⑥ | |
return true | |
end, | |
__run = function(self, callback) | |
while self.__running do | |
local e, _, id = event.pullMultiple(self.__readTimeout, "internet_ready", "stem%-client::stop") | |
if e == "internet_ready" and id == self.__socket.id() then -- ① | |
while self:readOne(callback) do | |
self.__reconnectCount = 0 -- ② | |
end | |
else | |
self:ensureConnected() -- ③ | |
end | |
end | |
end, | |
stop = function(self) | |
self.__running = false | |
event.push("stem-client::stop") | |
end, | |
ensureConnected = function(self) | |
local status, err = self.__socket.finishConnect() -- ① | |
if status == false then | |
error("not yet connected") | |
end | |
return assert(status, err or "unknown error") | |
end, | |
run = function(self, callback) | |
if self.__running then -- ② | |
return | |
end | |
self:connect() -- ③ | |
self.__running = true | |
while self.__running do -- ④ | |
local status, err = pcall(self.__run, self, callback) -- ⑤ | |
if not status then | |
if self.__reconnectCount == self.__maxReconnects then -- ⑥ | |
return nil, ("connection lost: %s; reconnect limit is reached"):format(err or "unknown error") | |
end | |
self.__reconnectCount = self.__reconnectCount + 1 | |
self.__buffer:close() -- ⑦ | |
if not pcall(self.connect, self) then -- ⑧ | |
if self.__socket then | |
self.__socket:close() | |
end | |
if self.__buffer then | |
self.__buffer:close() | |
end | |
os.sleep(1) | |
end | |
end | |
end | |
self.__buffer:close() | |
end, | |
}, | |
} | |
meta.__index.__parsers = { | |
[meta.__index.__opcodes.message] = function(data) | |
local channel, idx = (">s1"):unpack(data) | |
return { | |
type = "message", | |
channel = channel, | |
message = data:sub(idx), | |
} | |
end, | |
[meta.__index.__opcodes.subscribe] = function(data) | |
return { | |
type = "subscribe", | |
channel = (">s1"):unpack(data), | |
} | |
end, | |
[meta.__index.__opcodes.unsubscribe] = function(data) | |
return { | |
type = "unsubscribe", | |
channel = (">s1"):unpack(data), | |
} | |
end, | |
[meta.__index.__opcodes.ping] = function(data) | |
return { | |
type = "ping", | |
message = data, | |
} | |
end, | |
[meta.__index.__opcodes.pong] = function(data) | |
return { | |
type = "pong", | |
message = data, | |
} | |
end, | |
} | |
function newClient(address, channels, connectionTimeout, readTimeout, maxReconnects) | |
local obj = { | |
__address = address, | |
__channels = channels, | |
__connectionTimeout = connectionTimeout, | |
__readTimeout = readTimeout, | |
__maxReconnects = maxReconnects; | |
__socket = nil, | |
__buffer = nil, | |
__running = false, | |
__reconnectCount = 0, | |
} | |
return setmetatable(obj, meta) | |
end | |
end | |
local channel = ... | |
if not channel then | |
io.stderr:write("Usage: stem <channel>\n") | |
os.exit(1) | |
end | |
if #channel == 0 or #channel >= 256 then | |
io.stderr:write("Invalid channel name\n") | |
os.exit(2) | |
end | |
local client = newClient( | |
"stem.fomalhaut.me:5733", | |
{channel}, | |
10, | |
10, | |
5 | |
) | |
require("thread").create(function() | |
while true do | |
term.setCursor(1, h) | |
io.write("← ") | |
local line = io.read() | |
if not line then | |
break | |
end | |
local status, err = pcall(client.send, client, channel, line) | |
if not status then | |
writeLine(0xff0000, ("Got error while sending: %s"):format(err or "unknown error")) | |
break | |
end | |
end | |
client:stop() | |
end) | |
client:run(function(client, evt) | |
if evt.type == "message" then | |
writeLine(0x66ff00, "→ " .. evt.message) | |
elseif evt.type == "ping" or evt.type == "pong" then | |
writeLine(0xa5a5a5, "Ping: " .. evt.message:gsub(".", function(c) | |
return ("%02x"):format(c:byte()) | |
end)) | |
end | |
end) | |
os.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment