Created
September 23, 2018 23:31
-
-
Save vilkoz/22216d1c1ed25c80745f8e184b6dfe0c to your computer and use it in GitHub Desktop.
telegram-cli python ncurses gui
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys,os | |
import curses | |
import json | |
from subprocess import check_output | |
from draw_menu import clear, my_raw_input | |
from tcp_client import tgcli_send_command | |
class NcChat(): | |
def __init__(self, user): | |
self.user = user | |
self.stdscr = curses.initscr() | |
curses.cbreak() | |
curses.noecho() | |
self.stdscr.keypad(1) | |
# Clear and refresh the screen for a blank canvas | |
self.stdscr.clear() | |
self.stdscr.refresh() | |
# Start colors in curses | |
curses.start_color() | |
curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK) | |
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_CYAN) | |
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE) | |
self.selected = 0 | |
self.shift = 0 | |
self.item_filter = '' | |
self._get_chat_history() | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
curses.nocbreak() | |
self.stdscr.keypad(0) | |
curses.echo() | |
curses.endwin() | |
def _get_chat_history(self): | |
history = tgcli_send_command("history " + self.user) | |
history = json.loads(history) | |
self.history = history | |
# self.items = [x['from']['print_name'] + ' : ' + x['text'] for x in history if 'text' in x] | |
self.items = [] | |
for x in history: | |
if 'text' in x: | |
s = x['text'] | |
elif 'media' in x: | |
s = '%s : %s' % (x['media']['type'], x['media'].get('caption', ' ')) | |
else: | |
s = '<<event: %s>>' % (x['event']) | |
self.items.append(x['from']['print_name'] + ' : ' + s) | |
def _handle_keypress(self, k): | |
self.h, self.w = self.stdscr.getmaxyx() | |
height = self.h | |
width = self.w | |
selected = self.selected | |
items = self.items | |
self.max_item_count = height - 4 | |
if k in [curses.KEY_DOWN, ord('j')]: | |
self.selected += 1 | |
elif k in [curses.KEY_UP, ord('k')]: | |
self.selected -= 1 | |
elif k == ord('i'): | |
self._send_message() | |
elif k == ord('l'): | |
self._show_selected_message() | |
self.selected = self.selected % len(items) | |
if self.selected == 0: | |
self.shift = 0 | |
if self.selected + self.start_y > self.end_y - 10: | |
self.shift = self.selected + self.start_y - (self.end_y - 10) | |
def _render_status_bar(self): | |
stdscr = self.stdscr | |
# Declaration of strings | |
statusbarstr = "Press 'q' to exit || Selected: {} | Shift {}".format(self.selected, self.shift) | |
# Rendering some text | |
whstr = "Chat with {} | Width: {}, Height: {}".format(self.user, self.w, self.h) | |
stdscr.addstr(0, 0, whstr, curses.color_pair(1)) | |
# Render status bar | |
stdscr.attron(curses.color_pair(3)) | |
stdscr.addstr(self.h-1, 0, statusbarstr) | |
stdscr.addstr(self.h-1, len(statusbarstr), " " * (self.w - len(statusbarstr) - 1)) | |
stdscr.attroff(curses.color_pair(3)) | |
@staticmethod | |
def _bytes_from_int(i): | |
b = '' | |
while i > 0: | |
b += chr(i & 0xFF) | |
i >>= 8 | |
return b | |
def _my_raw_input(self, r, c, prompt_string): | |
win = curses.newwin(3, curses.COLS - 1, r, c) | |
curses.echo() | |
curses.nocbreak() | |
win.addstr(0, 0, prompt_string) | |
i = [] | |
ch = win.getch() | |
while (ch != ord('\n')): | |
if ch != ord('\r'): | |
i.append(ch) | |
else: | |
l.pop() | |
ch = win.getch() | |
curses.noecho() | |
curses.cbreak() | |
del win | |
return ("".join([NcChat._bytes_from_int(x) for x in i])).encode('utf-8') | |
def _send_message(self): | |
text = self._my_raw_input(self.h - 2, 0, ' > ').decode('utf-8') | |
if text == '': | |
return | |
self.selected = 0 | |
self.shift = 0 | |
tgcli_send_command('msg ' + self.user + ' ' + text) | |
self._get_chat_history() | |
def _show_selected_message(self): | |
msg = self.history[self.selected] | |
win = curses.newwin(40, curses.COLS - 1, 10, 10) | |
if 'text' in msg: | |
win.addstr(0, 0, msg['text']) | |
elif 'media' in msg: | |
win.addstr(0, 0, json.dumps(msg['media'], indent=4)) | |
win.addstr(5, 0, "load") | |
c = win.getch() | |
if c == ord('l'): | |
s = tgcli_send_command('load_' + msg['media']['type'] + ' ' + msg['id']) | |
s = json.loads(s) | |
s = s['result'] | |
check_output('xdg-open ' + s, shell=True) | |
else: | |
try: | |
win.addstr(0, 0, json.dumps(msg, indent=4)) | |
except curses.error: | |
win.addstr(0, 0, 'event: %s' % (msg['event'])) | |
win.getch() | |
del win | |
def loop(self): | |
stdscr = self.stdscr | |
selected = self.selected | |
item_filter = self.item_filter | |
k = 0 | |
# Loop where k is the last character pressed | |
while (k != ord('q')): | |
# Initialization | |
stdscr.clear() | |
self.h, self.w = self.stdscr.getmaxyx() | |
height = self.h | |
width = self.w | |
self.start_y = int((2)) | |
self.end_y = int(height) - 2 | |
start_y = self.start_y | |
end_y = self.end_y | |
self._handle_keypress(k) | |
self._render_status_bar() | |
tmp_items = self.items | |
tmp_items = tmp_items[self.shift:self.max_item_count + self.shift] | |
for i, it in enumerate(tmp_items): | |
if i+self.shift == self.selected: | |
stdscr.attron(curses.color_pair(2)) | |
stdscr.attron(curses.A_BOLD) | |
if i + start_y < end_y: | |
stdscr.addstr(start_y + i, 1, it) | |
if len(it)+1 < width: | |
stdscr.addstr(start_y + i, len(it)+1, " " * (width - len(it) - 1)) | |
if i+self.shift == self.selected: | |
stdscr.attroff(curses.color_pair(2)) | |
stdscr.attroff(curses.A_BOLD) | |
# Refresh the screen | |
stdscr.refresh() | |
# Wait for next input | |
k = stdscr.getch() | |
if (k == ord('q')): | |
return None | |
if item_filter != '': | |
self.selected = self.items.index(tmp_items[self.selected]) | |
return self.selected |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tcp_client import tgcli_send_command | |
from draw_menu import draw_menu | |
import json | |
from draw_chat import NcChat | |
def main(): | |
while 1: | |
dialogs = tgcli_send_command("dialog_list") | |
dialogs = json.loads(dialogs) | |
# print(dialogs) | |
dialogs = dialogs[::-1] | |
names = [x['print_name'] for x in dialogs] | |
# print(names) | |
selected = draw_menu(names) | |
if selected == None: | |
return | |
selected = dialogs[selected] | |
# print(selected) | |
with NcChat(selected['print_name']) as c: | |
c.loop() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import sys | |
def tgcli_send_command(cmd): | |
# Create a TCP/IP socket | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
# Connect the socket to the port where the server is listening | |
server_address = ('localhost', 1337) | |
# print ('connecting to %s port %s' % server_address) | |
sock.connect(server_address) | |
try: | |
message = cmd + '\n' | |
# print ('sending "%s"' % message) | |
sock.sendall(message.encode('utf-8')) | |
# print ('sent') | |
# Look for the response | |
data = [] | |
b = b' ' | |
while '\n' not in b.decode(): | |
b = sock.recv(16) | |
# print("b: ", b) | |
data.append(b) | |
data = "".join([x.decode('utf-8') for x in data]) | |
# print("data: %s" % data) | |
data = data.replace("ANSWER", "") | |
# print("data: %s" % data) | |
amount_expected = int(data.split('\n')[0].strip()) | |
amount_received = 16 - (len("ANSWER") + 1 + len(str(amount_expected)) + 1) | |
# print("amount_expected: %s" % amount_expected) | |
res = data.split('\n')[1].encode('utf-8') | |
while amount_received < amount_expected: | |
data = sock.recv(16) | |
amount_received += len(data) | |
# print ('received "%s"' % data) | |
res += data | |
res = res.decode() | |
# print(res) | |
finally: | |
# print ('closing socket') | |
sock.close() | |
return res |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment