Skip to content

Instantly share code, notes, and snippets.

@cppxaxa
Created December 10, 2023 11:36
Show Gist options
  • Save cppxaxa/61703da440c004c0a926ec8fbfaea85a to your computer and use it in GitHub Desktop.
Save cppxaxa/61703da440c004c0a926ec8fbfaea85a to your computer and use it in GitHub Desktop.
A Telegram chatbot that responds back from a GGUF LLM Intel neural chat running on local host machine with some buttons for list items
# llama_cpp_python==0.2.20
# requests==2.23.0
# timeout_decorator==0.5.0
import json
import re
import uuid
import requests
import time
import urllib
from llama_cpp import Llama
import threading
from urllib.parse import quote
from timeout_decorator import timeout, TimeoutError
TOKEN = "<Telegram token from TG BotFather>"
URL = "https://api.telegram.org/bot{}/".format(TOKEN)
chat_history_map = {}
llm: Llama = None
def get_url(url):
response = requests.get(url)
content = response.content.decode("utf8")
return content
def get_json_from_url(url):
content = get_url(url)
js = json.loads(content)
return js
def get_updates(offset=None):
url = URL + "getUpdates?timeout=100"
if offset:
url += "&offset={}".format(offset)
js = get_json_from_url(url)
return js
def get_last_update_id(updates):
update_ids = []
for update in updates["result"]:
update_ids.append(int(update["update_id"]))
return max(update_ids)
@timeout(800)
def call_llm(message_list):
global llm
q = "Q: {{question_without_mark}}? "
a = "A: {{answer}} <END> "
a_placeholder = "A: "
prompt = "Q: What do you do? A: I'm an intelligent machine that responds to questions by being concise and I try to respond in points. <END> Q: What is 2+2? A: 4 <END> Q: What is the state of water in room temperature? A: Liquid. <END> "
mode = 'q'
for message in message_list:
if mode == 'q':
question_without_mark = message
if question_without_mark.strip()[-1] == "?":
question_without_mark = question_without_mark.strip()[:-1]
pq = q.replace("{{question_without_mark}}", question_without_mark)
prompt = prompt + pq
mode = 'a'
else:
pa = a.replace("{{answer}}", message)
prompt = prompt + pa
mode = 'q'
if mode == 'a':
prompt = prompt + a_placeholder
output = llm(
prompt,
max_tokens=4000,
stop=["Q:", "<END>"],
echo=True
)
# try:
# output = llm(
# prompt,
# max_tokens=4000,
# stop=["Q:", "<END>"],
# echo=True
# )
# except Exception as e:
# return repr(e)
result = output["choices"][0]["text"][len(prompt):].strip()
return result
class log_thread(threading.Thread):
def __init__(self, thread_name, thread_ID, tgchat):
threading.Thread.__init__(self)
self.thread_name = thread_name
self.thread_ID = thread_ID
self.tgchat = tgchat
self.should_stop = False
def run(self):
while not self.should_stop:
print(str(self.thread_name) + " " + str(self.thread_ID))
send_message("[INFO] Working", self.tgchat)
time.sleep(120)
def stop(self):
print("Stopping")
self.should_stop = True
def process_conversation(text_messages, tgchat):
chat_logger = log_thread(f"tgchat:{tgchat}", 1000, tgchat)
chat_logger.start()
prompt = None
excp = None
try:
prompt = call_llm(text_messages)
except TimeoutError as e:
excp = e
finally:
chat_logger.stop()
del chat_logger
if excp:
raise excp
return prompt
def echo_all(updates):
global chat_history_map
for update in updates["result"]:
if "message" in update:
text = update["message"]["text"]
tgchat = update["message"]["chat"]["id"]
chat_history = [] if update["message"]["chat"]["id"] not in chat_history_map else chat_history_map[update["message"]["chat"]["id"]]
chat_history.append(text)
try:
result = process_conversation(chat_history, tgchat)
chat_history.append(result)
chat_history = chat_history[-4:] # Last 2 message pairs.
chat_history_map[update["message"]["chat"]["id"]] = chat_history
reply_markup = get_reply_markup(result)
send_message(result, tgchat, reply_markup)
except TimeoutError:
print("Process timed out")
send_message("[ERROR] Process timed out", tgchat)
elif "callback_query" in update:
callback_query = update["callback_query"]
message = callback_query["message"]
chat_history = [] if message["chat"]["id"] not in chat_history_map else chat_history_map[message["chat"]["id"]]
tgchat = message["chat"]["id"]
button_id = callback_query["data"]
inline_keyboard = message["reply_markup"]["inline_keyboard"]
button_text = None
for callback_list in inline_keyboard:
for callback in callback_list:
if callback["callback_data"] == button_id:
button_text = callback["text"]
if button_text:
chat_history.append(button_text)
try:
result = process_conversation(chat_history, tgchat)
if result.strip() != "":
chat_history.append(result)
chat_history = chat_history[-4:] # Last 2 message pairs.
chat_history_map[message["chat"]["id"]] = chat_history
reply_markup = get_reply_markup(result)
send_message(result, tgchat, reply_markup)
else:
send_message("[ERROR] We don't have any response for you", tgchat)
except TimeoutError:
print("Process timed out")
send_message("[ERROR] Process timed out", tgchat)
else:
send_message("[ERROR] Unable to understand input", tgchat)
else:
pass
def get_last_chat_id_and_text(updates):
num_updates = len(updates["result"])
last_update = num_updates - 1
text = updates["result"][last_update]["message"]["text"]
chat_id = updates["result"][last_update]["message"]["chat"]["id"]
return (text, chat_id)
def send_message(text, chat_id, reply_markup=None):
text = urllib.parse.quote_plus(text)
url = URL + "sendMessage?text={}&chat_id={}".format(text, chat_id)
if reply_markup:
url += "&reply_markup={}".format(reply_markup)
get_url(url)
def get_reply_markup_basic_newline(input_string):
input_string = input_string.strip()
lines = input_string.split("\n")
if len(lines) < 2:
print("Found lines less than 2")
return None
question_list = [line.strip() for line in lines if len(line) < 45 and len(line) > 3]
question_list = [line if not line.endswith(".") else line[:len(line) - 1] for line in question_list]
question_list = [line for line in question_list if line.find('.') == -1]
question_list = [f"Explain me {line.strip()}?" for line in question_list]
callbacks = [[{'text': quote(question), 'callback_data': str(uuid.uuid4()).replace('-', '_')}] for question in question_list]
keyboard = {
'inline_keyboard': callbacks
}
return json.dumps(keyboard)
def get_reply_markup_numbered_list(input_string):
# Check if the input contains a numbered list
matches = re.findall(r'\d+\.\s+(.*?)(?=\d+\.|\Z)', input_string, re.DOTALL)
if matches:
question_list = [f"What is {match.strip()}" for match in matches]
question_list = [q.strip() if not q.strip().endswith("??") else q.strip()[:len(q.strip()) - 1] for q in question_list]
callbacks = [[{'text': quote(question), 'callback_data': str(uuid.uuid4()).replace('-', '_')}] for question in question_list]
keyboard = {
'inline_keyboard': callbacks
}
return json.dumps(keyboard)
else:
return None
def get_reply_markup(input_string):
val = get_reply_markup_numbered_list(input_string)
if val: return val
val = get_reply_markup_basic_newline(input_string)
if val: return val
return None
def main():
global llm
last_update_id = None
llm = Llama(model_path="./neural-chat-7b-v3-1.Q8_0.gguf", n_ctx=4000)
while True:
updates = get_updates(last_update_id)
if len(updates["result"]) > 0:
last_update_id = get_last_update_id(updates) + 1
echo_all(updates)
time.sleep(0.5)
if __name__ == '__main__':
# message = '''
# 1. 889+111
# 2. sum of 9 and 11
# 3. 7+9?
# '''
# reply_markup = get_reply_markup(message)
# send_message(message.strip(), "6832486947", reply_markup)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment