simple chatgpt bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://cintaprogramming.com/2023/03/04/membuat-telegram-bot-memakai-api-chatgpt-di-aws-lambda/ | |
# Note: you need to be using OpenAI Python v0.27.0 for the code below to work | |
# pip install openai tiktoken telegram-bot | |
# export OPENAI_API_KEY=openaikey | |
# export ALLOWED_USERS=user1,user2,user3 | |
# export BOT_TOKEN=telegram-bot-token | |
# | |
import openai | |
import logging | |
import sqlite3 | |
import json | |
import os | |
import tiktoken | |
from telegram import Update | |
from telegram.constants import ParseMode | |
from telegram.ext import filters, MessageHandler, ApplicationBuilder, ContextTypes, CommandHandler | |
logging.basicConfig( | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
level=logging.INFO | |
) | |
ALLOWED_USERS=os.getenv("ALLOWED_USERS").split(",") | |
LIMIT_TOKEN_COUNT_FOR_SUMMARY = 2000 | |
def num_tokens_from_messages(messages, model="gpt-3.5-turbo-0301"): | |
"""Returns the number of tokens used by a list of messages.""" | |
try: | |
encoding = tiktoken.encoding_for_model(model) | |
except KeyError: | |
encoding = tiktoken.get_encoding("cl100k_base") | |
if model == "gpt-3.5-turbo-0301": # note: future models may deviate from this | |
num_tokens = 0 | |
for message in messages: | |
num_tokens += 4 # every message follows <im_start>{role/name}\n{content}<im_end>\n | |
for key, value in message.items(): | |
num_tokens += len(encoding.encode(value)) | |
if key == "name": # if there's a name, the role is omitted | |
num_tokens += -1 # role is always required and always 1 token | |
num_tokens += 2 # every reply is primed with <im_start>assistant | |
return num_tokens | |
else: | |
raise NotImplementedError(f"""num_tokens_from_messages() is not presently implemented for model {model}. | |
See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""") | |
ROLE = """ You are a helpful assistant in a group family chat. Your output will be in markdown format. """ | |
#open sqlite database | |
conn = sqlite3.connect('chat.db') | |
#create table chats if not exists | |
#it maps chat id to JSON string of the chat system | |
conn.execute('''CREATE TABLE IF NOT EXISTS chats | |
(chatid INT PRIMARY KEY NOT NULL, | |
chatjson TEXT NOT NULL);''') | |
class ChatSystem: | |
def __init__(self, chatid): | |
self.chatid = chatid | |
self.messages = [] | |
self.load_chat() | |
def load_chat(self): | |
#load the chat system from database | |
c = conn.cursor() | |
c.execute("SELECT chatjson FROM chats WHERE chatid = ?", (self.chatid,)) | |
row = c.fetchone() | |
if row: | |
self.messages = json.loads(row[0]) | |
else: | |
self.messages = [] | |
self.messages.append({"role": "system", "content": ROLE }) | |
def clear_chat(self): | |
#clear the chat system | |
self.messages = [] | |
self.messages.append({"role": "system", "content": ROLE }) | |
self.save_chat() | |
def save_chat(self): | |
#save the chat system to database | |
conn.execute("INSERT OR REPLACE INTO chats (chatid, chatjson) VALUES (?, ?)", (self.chatid, json.dumps(self.messages))) | |
conn.commit() | |
def add_user_message(self, text): | |
self.messages.append({"role": "user", "content": text}) | |
def prune_messages(self): | |
#ask the AI to summarize the conversation | |
self.messages.append({"role": "user", "content": "summarize convo"}) | |
a = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=self.messages | |
) | |
#get the summary | |
summary = a["choices"][0]["message"]["content"] | |
print("Summary: " + summary) | |
#remove all messages except the summary | |
self.messages = [] | |
self.messages.append({"role": "system", "content": ROLE + "\nThe conversation so far: " + summary}) | |
self.save_chat() | |
def get_response(self, text): | |
token_count = num_tokens_from_messages(self.messages) | |
print("Token count ", token_count) | |
if token_count > LIMIT_TOKEN_COUNT_FOR_SUMMARY: | |
print("Pruning messages", self.messages) | |
self.prune_messages() | |
self.messages.append({"role": "user", "content": text}) | |
print("Sending", self.messages) | |
a = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=self.messages | |
) | |
total_tokens = a["usage"]["total_tokens"] | |
resp = a["choices"][0]["message"]["content"] | |
print("Total tokens used: " + str(total_tokens)) | |
cost_usd = (total_tokens / 1000)*0.002 | |
cost_idr = cost_usd * 15200 | |
cost_thb = cost_usd*34.47 | |
tokens_str = "Total tokens used: " + str(total_tokens) | |
cost_str = "ChatGPT Cost: $" + str(round(cost_usd, 4)) + " / IDR " + str(round(cost_idr, 2)) + " / THB " + str(round(cost_thb, 2)) | |
self.messages.append({"role": "assistant", "content": resp}) | |
self.save_chat() | |
return (resp, tokens_str + "\n\n" + cost_str) | |
chat_systems = {} | |
async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
if update.message.from_user.username not in ALLOWED_USERS: | |
await context.bot.send_message(chat_id=update.effective_chat.id, text="You are not allowed to use this bot") | |
return | |
chat_id = update.effective_chat.id | |
if chat_id not in chat_systems: | |
chat_systems[chat_id] = ChatSystem(chat_id) | |
chat_system = chat_systems[chat_id] | |
chat_system.add_user_message(update.message.text) | |
(response, cost) = chat_system.get_response(update.message.text) | |
await context.bot.send_message(chat_id=update.effective_chat.id, text=response, parse_mode=ParseMode.MARKDOWN) | |
await context.bot.send_message(chat_id=update.effective_chat.id, text=cost, parse_mode=ParseMode.MARKDOWN) | |
BOT_TOKEN=os.getenv("BOT_TOKEN") | |
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
await context.bot.send_message(chat_id=update.effective_chat.id, text="I'm a bot, please talk to me!") | |
async def reset(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
chat_id = update.effective_chat.id | |
if chat_id in chat_systems: | |
print("reseting conversation") | |
chat_systems[chat_id].clear_chat() | |
del chat_systems[chat_id] | |
await context.bot.send_message(chat_id=update.effective_chat.id, text="Conversation reset!") | |
else: | |
await context.bot.send_message(chat_id=update.effective_chat.id, text="No converstations found!") | |
#note: make sure to use latest telegram-bot library | |
#activate the env | |
if __name__ == '__main__': | |
application = ApplicationBuilder().token(BOT_TOKEN).build() | |
start_handler = CommandHandler('start', start) | |
reset_handler = CommandHandler('reset', reset) | |
application.add_handler(start_handler) | |
application.add_handler(reset_handler) | |
chat_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), chat) | |
application.add_handler(chat_handler) | |
application.run_polling() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment