Skip to content

Instantly share code, notes, and snippets.

@vilkoz
Created September 23, 2018 23:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vilkoz/22216d1c1ed25c80745f8e184b6dfe0c to your computer and use it in GitHub Desktop.
Save vilkoz/22216d1c1ed25c80745f8e184b6dfe0c to your computer and use it in GitHub Desktop.
telegram-cli python ncurses gui
import sys,os
import curses
import json
from subprocess import check_output
from draw_menu import clear, my_raw_input
from tcp_client import tgcli_send_command
class NcChat():
def __init__(self, user):
self.user = user
self.stdscr = curses.initscr()
curses.cbreak()
curses.noecho()
self.stdscr.keypad(1)
# Clear and refresh the screen for a blank canvas
self.stdscr.clear()
self.stdscr.refresh()
# Start colors in curses
curses.start_color()
curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE)
self.selected = 0
self.shift = 0
self.item_filter = ''
self._get_chat_history()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
curses.nocbreak()
self.stdscr.keypad(0)
curses.echo()
curses.endwin()
def _get_chat_history(self):
history = tgcli_send_command("history " + self.user)
history = json.loads(history)
self.history = history
# self.items = [x['from']['print_name'] + ' : ' + x['text'] for x in history if 'text' in x]
self.items = []
for x in history:
if 'text' in x:
s = x['text']
elif 'media' in x:
s = '%s : %s' % (x['media']['type'], x['media'].get('caption', ' '))
else:
s = '<<event: %s>>' % (x['event'])
self.items.append(x['from']['print_name'] + ' : ' + s)
def _handle_keypress(self, k):
self.h, self.w = self.stdscr.getmaxyx()
height = self.h
width = self.w
selected = self.selected
items = self.items
self.max_item_count = height - 4
if k in [curses.KEY_DOWN, ord('j')]:
self.selected += 1
elif k in [curses.KEY_UP, ord('k')]:
self.selected -= 1
elif k == ord('i'):
self._send_message()
elif k == ord('l'):
self._show_selected_message()
self.selected = self.selected % len(items)
if self.selected == 0:
self.shift = 0
if self.selected + self.start_y > self.end_y - 10:
self.shift = self.selected + self.start_y - (self.end_y - 10)
def _render_status_bar(self):
stdscr = self.stdscr
# Declaration of strings
statusbarstr = "Press 'q' to exit || Selected: {} | Shift {}".format(self.selected, self.shift)
# Rendering some text
whstr = "Chat with {} | Width: {}, Height: {}".format(self.user, self.w, self.h)
stdscr.addstr(0, 0, whstr, curses.color_pair(1))
# Render status bar
stdscr.attron(curses.color_pair(3))
stdscr.addstr(self.h-1, 0, statusbarstr)
stdscr.addstr(self.h-1, len(statusbarstr), " " * (self.w - len(statusbarstr) - 1))
stdscr.attroff(curses.color_pair(3))
@staticmethod
def _bytes_from_int(i):
b = ''
while i > 0:
b += chr(i & 0xFF)
i >>= 8
return b
def _my_raw_input(self, r, c, prompt_string):
win = curses.newwin(3, curses.COLS - 1, r, c)
curses.echo()
curses.nocbreak()
win.addstr(0, 0, prompt_string)
i = []
ch = win.getch()
while (ch != ord('\n')):
if ch != ord('\r'):
i.append(ch)
else:
l.pop()
ch = win.getch()
curses.noecho()
curses.cbreak()
del win
return ("".join([NcChat._bytes_from_int(x) for x in i])).encode('utf-8')
def _send_message(self):
text = self._my_raw_input(self.h - 2, 0, ' > ').decode('utf-8')
if text == '':
return
self.selected = 0
self.shift = 0
tgcli_send_command('msg ' + self.user + ' ' + text)
self._get_chat_history()
def _show_selected_message(self):
msg = self.history[self.selected]
win = curses.newwin(40, curses.COLS - 1, 10, 10)
if 'text' in msg:
win.addstr(0, 0, msg['text'])
elif 'media' in msg:
win.addstr(0, 0, json.dumps(msg['media'], indent=4))
win.addstr(5, 0, "load")
c = win.getch()
if c == ord('l'):
s = tgcli_send_command('load_' + msg['media']['type'] + ' ' + msg['id'])
s = json.loads(s)
s = s['result']
check_output('xdg-open ' + s, shell=True)
else:
try:
win.addstr(0, 0, json.dumps(msg, indent=4))
except curses.error:
win.addstr(0, 0, 'event: %s' % (msg['event']))
win.getch()
del win
def loop(self):
stdscr = self.stdscr
selected = self.selected
item_filter = self.item_filter
k = 0
# Loop where k is the last character pressed
while (k != ord('q')):
# Initialization
stdscr.clear()
self.h, self.w = self.stdscr.getmaxyx()
height = self.h
width = self.w
self.start_y = int((2))
self.end_y = int(height) - 2
start_y = self.start_y
end_y = self.end_y
self._handle_keypress(k)
self._render_status_bar()
tmp_items = self.items
tmp_items = tmp_items[self.shift:self.max_item_count + self.shift]
for i, it in enumerate(tmp_items):
if i+self.shift == self.selected:
stdscr.attron(curses.color_pair(2))
stdscr.attron(curses.A_BOLD)
if i + start_y < end_y:
stdscr.addstr(start_y + i, 1, it)
if len(it)+1 < width:
stdscr.addstr(start_y + i, len(it)+1, " " * (width - len(it) - 1))
if i+self.shift == self.selected:
stdscr.attroff(curses.color_pair(2))
stdscr.attroff(curses.A_BOLD)
# Refresh the screen
stdscr.refresh()
# Wait for next input
k = stdscr.getch()
if (k == ord('q')):
return None
if item_filter != '':
self.selected = self.items.index(tmp_items[self.selected])
return self.selected
import sys,os
import curses
def clear(stdscr):
curses.nocbreak()
stdscr.keypad(0)
curses.echo()
curses.endwin()
def my_raw_input(stdscr, r, c, prompt_string):
win = curses.newwin(3, curses.COLS - 1, r, c)
curses.echo()
win.addstr(0, 0, prompt_string)
# win.refresh()
i = win.getstr(0, len(prompt_string), 65)
del win
# curses.noecho()
return i
def draw_menu(items):
stdscr = curses.initscr()
curses.cbreak()
curses.noecho()
stdscr.keypad(1)
k = 0
cursor_x = 0
cursor_y = 0
# Clear and refresh the screen for a blank canvas
stdscr.clear()
stdscr.refresh()
# Start colors in curses
curses.start_color()
curses.init_pair(1, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_WHITE)
selected = 0
shift = 0
item_filter = ''
# Loop where k is the last character pressed
while (k != ord('l')):
# Initialization
stdscr.clear()
height, width = stdscr.getmaxyx()
max_item_count = height - 4
if k in [curses.KEY_DOWN, ord('j')]:
cursor_y = cursor_y + 1
selected += 1
elif k in [curses.KEY_UP, ord('k')]:
cursor_y = cursor_y - 1
selected -= 1
elif k == curses.KEY_RIGHT:
cursor_x = cursor_x + 1
elif k == curses.KEY_LEFT:
cursor_x = cursor_x - 1
elif k == ord('i'):
item_filter = my_raw_input(stdscr, height - 2, 0, ' > ').decode('utf-8')
selected = 0
shift = 0
cursor_x = max(0, cursor_x)
cursor_x = min(width-1, cursor_x)
cursor_y = max(0, len(items))
cursor_y = min(len(items)-1, cursor_y)
selected = selected % len(items)
start_y = int((2))
end_y = int(height) - 2
if selected == 0:
shift = 0
if selected + start_y > end_y - 10:
shift = selected + start_y - (end_y - 10)
# Declaration of strings
statusbarstr = "Press 'q' to exit | Pos: {}, {} | Selected: {} | Shift {} | Items: {} | Max items: {}".format(cursor_x, cursor_y, selected, shift, len(items), max_item_count)
# Centering calculations
# Rendering some text
whstr = "Width: {}, Height: {}".format(width, height)
stdscr.addstr(0, 0, whstr, curses.color_pair(1))
# Render status bar
stdscr.attron(curses.color_pair(3))
stdscr.addstr(height-1, 0, statusbarstr)
stdscr.addstr(height-1, len(statusbarstr), " " * (width - len(statusbarstr) - 1))
stdscr.attroff(curses.color_pair(3))
tmp_items = items
if item_filter != '':
tmp_items = [x for x in items if item_filter in x]
selected = selected % len(tmp_items)
if selected == 0:
shift = 0
if selected + start_y > end_y - 10:
shift = selected + start_y - (end_y - 10)
tmp_items = tmp_items[shift:max_item_count + shift]
for i, it in enumerate(tmp_items):
if i+shift == selected:
stdscr.attron(curses.color_pair(2))
stdscr.attron(curses.A_BOLD)
if i + start_y < end_y:
stdscr.addstr(start_y + i, 1, it)
if len(it) < width:
stdscr.addstr(start_y + i, len(it)+1, " " * (width - len(it) - 1))
if i+shift == selected:
stdscr.attroff(curses.color_pair(2))
stdscr.attroff(curses.A_BOLD)
# Refresh the screen
stdscr.refresh()
# Wait for next input
k = stdscr.getch()
if (k == ord('q')):
clear(stdscr)
return None
clear(stdscr)
if item_filter != '':
selected = items.index(tmp_items[selected])
return selected
def main():
# curses.wrapper(draw_menu)
draw_menu(["gopa", "sobaka", "chlen"])
if __name__ == "__main__":
main()
from tcp_client import tgcli_send_command
from draw_menu import draw_menu
import json
from draw_chat import NcChat
def main():
while 1:
dialogs = tgcli_send_command("dialog_list")
dialogs = json.loads(dialogs)
# print(dialogs)
dialogs = dialogs[::-1]
names = [x['print_name'] for x in dialogs]
# print(names)
selected = draw_menu(names)
if selected == None:
return
selected = dialogs[selected]
# print(selected)
with NcChat(selected['print_name']) as c:
c.loop()
if __name__ == "__main__":
main()
import socket
import sys
def tgcli_send_command(cmd):
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = ('localhost', 1337)
# print ('connecting to %s port %s' % server_address)
sock.connect(server_address)
try:
message = cmd + '\n'
# print ('sending "%s"' % message)
sock.sendall(message.encode('utf-8'))
# print ('sent')
# Look for the response
data = []
b = b' '
while '\n' not in b.decode():
b = sock.recv(16)
# print("b: ", b)
data.append(b)
data = "".join([x.decode('utf-8') for x in data])
# print("data: %s" % data)
data = data.replace("ANSWER", "")
# print("data: %s" % data)
amount_expected = int(data.split('\n')[0].strip())
amount_received = 16 - (len("ANSWER") + 1 + len(str(amount_expected)) + 1)
# print("amount_expected: %s" % amount_expected)
res = data.split('\n')[1].encode('utf-8')
while amount_received < amount_expected:
data = sock.recv(16)
amount_received += len(data)
# print ('received "%s"' % data)
res += data
res = res.decode()
# print(res)
finally:
# print ('closing socket')
sock.close()
return res
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment