|
from PIL import Image, ImageDraw, ImageFont |
|
from uuid import uuid4 |
|
import subprocess |
|
import time |
|
import sqlite3 |
|
import telegram |
|
import telegram.ext |
|
import html |
|
import os |
|
|
|
sticker_dump = 1 |
|
error_dump = 2 |
|
|
|
def draw(text): |
|
# print("trying to draw", text) |
|
fn = str(int(time.time())) |
|
subprocess.call(['node', 'gen.js', text, fn]) |
|
subprocess.call(['cwebp', fn + ".png", "-o", fn + ".webp"]) |
|
os.remove(fn + ".png") |
|
# print("finished", fn + ".webp") |
|
return fn + ".webp" |
|
|
|
|
|
def get_db(text): |
|
con = sqlite3.connect("./data.sqlite") |
|
cur = con.cursor() |
|
d = cur.execute("select * from cache where `text` = ?", (text, )).fetchone() |
|
if d is None: |
|
return None |
|
return d[1] |
|
con.commit() |
|
cur.close() |
|
con.close() |
|
|
|
def set_db(text, val): |
|
if get_db(text) is None: |
|
con = sqlite3.connect("./data.sqlite") |
|
#cur = con.cursor() |
|
con.execute("insert into cache (`text`, `id`) values (?, ?)", (text, val)) |
|
con.commit() |
|
#cur.close() |
|
con.close() |
|
|
|
|
|
def get_sticker_id(bot, text, dest=sticker_dump): |
|
# print("requesting", text) |
|
db = get_db(text) |
|
# print("db", db) |
|
if db: |
|
# print("retriving from db", db) |
|
return db |
|
p = draw(text) |
|
m = bot.send_sticker(dest, open(p, 'rb')) |
|
# print("msg", m) |
|
os.remove(p) |
|
set_db(text, m.sticker.file_id) |
|
# print("just sent", m.sticker.file_id) |
|
if dest != sticker_dump: |
|
return m.sticker.file_id, True |
|
return m.sticker.file_id |
|
|
|
|
|
def start(bot, update): |
|
update.message.reply_text("""Text Sticker. |
|
|
|
I'm an inline bot. I make stickers with texts. Quote the text directly, and I will generate stickers for you. |
|
|
|
<b>How to use</b> |
|
1. @ me |
|
2. Type your words |
|
3. Wait until the circle stop turning |
|
4. Tap/click the first sticker (or where it supposed to be) |
|
|
|
If there's no pop-up showing, try again a few seconds later, or check if it's in the public anon dump. If it's not anywhere, I'm probably down. |
|
|
|
<b>Tips</b> |
|
You have to make line breaks manually, or everything will be squeezed in one line. |
|
|
|
Start a message with <code>serif`</code> to generate stickers in serif fonts. |
|
|
|
Major languages are supported by Noto Fonts by Google. Emoji is supported by Noto Color Emoji (before Android O). <em>(Pudding faces Rocks!)</em> |
|
|
|
All unique stickers are saved in the <a href="https://t.me/joinchat/example">Public Anonymous Dump of @big_text_bot</a> as a essential component for this bot to work. |
|
""", disable_web_page_preview=True, |
|
parse_mode="HTML", reply_markup=telegram.InlineKeyboardMarkup([[telegram.InlineKeyboardButton("Try me out", switch_inline_query_current_chat="Hello,\nWorld!")], [telegram.InlineKeyboardButton("Share with friends", switch_inline_query="Hello,\nWorld!")]])) |
|
|
|
|
|
def inlinequery(bot, update): |
|
# print(update) |
|
query = update.inline_query.query |
|
query = html.unescape(query.replace('<br/>', '\n')) |
|
fid = get_sticker_id(bot, query) if query else '' |
|
# print(fid) |
|
results = [telegram.InlineQueryResultCachedSticker(str(uuid4()), sticker_file_id=fid)] if fid else [] |
|
# print("results", results) |
|
try: |
|
u = update.inline_query.answer(results) |
|
# print(update, u) |
|
except Exception as e: |
|
# print(e) |
|
error(bot, update, repr(e) + repr(u)) |
|
|
|
def gen(bot, update, args): |
|
# update.message.reply_text("generating...") |
|
query = " ".join(args) |
|
# print("generating", query) |
|
fid = get_sticker_id(bot, query, update.message.chat.id) |
|
if type(fid) == tuple: |
|
return |
|
# print("sticker id", fid) |
|
update.message.reply_sticker(fid) |
|
|
|
def error(bot, update, error): |
|
if str(error) == "Bad Request: QUERY_ID_INVALID" or str(error) == "Timed out": |
|
return |
|
print("update: %s\nerror: %s" % (update, error)) |
|
bot.send_message(error_dump, 'Update <pre>%s</pre> caused error <pre>%s</pre>' % (html.escape(str(update)), html.escape(str(error))), parse_mode="HTML") |
|
|
|
|
|
def main(): |
|
token = "BOT_TOKEN_GOES_HERE" |
|
u = telegram.ext.Updater(token) |
|
d = u.dispatcher |
|
d.add_handler(telegram.ext.InlineQueryHandler(inlinequery)) |
|
d.add_handler(telegram.ext.CommandHandler('start', start)) |
|
d.add_handler(telegram.ext.CommandHandler('help', start)) |
|
d.add_handler(telegram.ext.CommandHandler('gen', gen, pass_args=True)) |
|
d.add_error_handler(error) |
|
# u.start_polling() |
|
u.start_webhook(listen='0.0.0.0', port=1849, url_path='telegram/' + token) |
|
u.bot.setWebhook(url='https://labs.1a23.com/telegram/' + token) |
|
u.idle() |
|
|
|
main() |