Skip to content

Instantly share code, notes, and snippets.

@shivampip
Created October 11, 2019 13:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shivampip/25199c331b3ae1843dd21194edc0baad to your computer and use it in GitHub Desktop.
Save shivampip/25199c331b3ae1843dd21194edc0baad to your computer and use it in GitHub Desktop.
Telegram Connnector for Rasa Modified
import logging
import json
from copy import deepcopy
from sanic import Blueprint, response
from sanic.request import Request
from telegram import (
Bot,
InlineKeyboardButton,
Update,
InlineKeyboardMarkup,
KeyboardButton,
ReplyKeyboardMarkup,
)
from typing import Dict, Text, Any, List, Optional
from rasa.core.channels.channel import InputChannel, UserMessage, OutputChannel
from rasa.core.constants import INTENT_MESSAGE_PREFIX, USER_INTENT_RESTART
logger = logging.getLogger(__name__)
class TelegramOutput(Bot, OutputChannel):
"""Output channel for Telegram"""
@classmethod
def name(cls):
return "telegram"
def __init__(self, access_token):
super(TelegramOutput, self).__init__(access_token)
async def send_text_message(
self, recipient_id: Text, text: Text, **kwargs: Any
) -> None:
for message_part in text.split("\n\n"):
self.send_message(recipient_id, message_part)
async def send_image_url(
self, recipient_id: Text, image: Text, **kwargs: Any
) -> None:
self.send_photo(recipient_id, image)
async def send_text_with_buttons(
self,
recipient_id: Text,
text: Text,
buttons: List[Dict[Text, Any]],
button_type: Optional[Text] = "inline",
**kwargs: Any
) -> None:
"""Sends a message with keyboard.
For more information: https://core.telegram.org/bots#keyboards
:button_type inline: horizontal inline keyboard
:button_type vertical: vertical inline keyboard
:button_type reply: reply keyboard
"""
if button_type == "inline":
button_list = [
[
InlineKeyboardButton(s["title"], callback_data=s["payload"])
for s in buttons
]
]
reply_markup = InlineKeyboardMarkup(button_list)
elif button_type == "vertical":
button_list = [
[InlineKeyboardButton(s["title"], callback_data=s["payload"])]
for s in buttons
]
reply_markup = InlineKeyboardMarkup(button_list)
elif button_type == "reply":
button_list = []
for bttn in buttons:
if isinstance(bttn, list):
button_list.append([KeyboardButton(s["title"]) for s in bttn])
else:
button_list.append([KeyboardButton(bttn["title"])])
reply_markup = ReplyKeyboardMarkup(
button_list, resize_keyboard=True, one_time_keyboard=True
)
else:
logger.error(
"Trying to send text with buttons for unknown "
"button type {}".format(button_type)
)
return
self.send_message(recipient_id, text, reply_markup=reply_markup)
async def send_custom_json(
self, recipient_id: Text, json_message: Dict[Text, Any], **kwargs: Any
) -> None:
json_message = deepcopy(json_message)
recipient_id = json_message.pop("chat_id", recipient_id)
send_functions = {
("text",): "send_message",
("photo",): "send_photo",
("audio",): "send_audio",
("document",): "send_document",
("sticker",): "send_sticker",
("video",): "send_video",
("video_note",): "send_video_note",
("animation",): "send_animation",
("voice",): "send_voice",
("media",): "send_media_group",
("latitude", "longitude", "title", "address"): "send_venue",
("latitude", "longitude"): "send_location",
("phone_number", "first_name"): "send_contact",
("game_short_name",): "send_game",
("action",): "send_chat_action",
(
"title",
"decription",
"payload",
"provider_token",
"start_parameter",
"currency",
"prices",
): "send_invoice",
}
for params in send_functions.keys():
if all(json_message.get(p) is not None for p in params):
args = [json_message.pop(p) for p in params]
api_call = getattr(self, send_functions[params])
api_call(recipient_id, *args, **json_message)
class TelegramInput(InputChannel):
"""Telegram input channel"""
@classmethod
def name(cls):
return "telegram"
@classmethod
def from_credentials(cls, credentials):
if not credentials:
cls.raise_missing_credentials_exception()
return cls(
credentials.get("access_token"),
credentials.get("verify"),
credentials.get("webhook_url"),
)
def __init__(self, access_token, verify, webhook_url, debug_mode=True):
self.access_token = access_token
self.verify = verify
self.webhook_url = webhook_url
self.debug_mode = debug_mode
@staticmethod
def _is_location(message):
return message.location
@staticmethod
def _is_user_message(message):
return message.text
@staticmethod
def _is_photo(message):
return message.photo
@staticmethod
def _is_button(update):
return update.callback_query
def blueprint(self, on_new_message):
telegram_webhook = Blueprint("telegram_webhook", __name__)
out_channel = self.get_output_channel()
@telegram_webhook.route("/", methods=["GET"])
async def health(request: Request):
return response.json({"status": "ok"})
@telegram_webhook.route("/set_webhook", methods=["GET", "POST"])
async def set_webhook(request: Request):
s = out_channel.setWebhook(self.webhook_url)
if s:
logger.info("Webhook Setup Successful")
return response.text("Webhook setup successful")
else:
logger.warning("Webhook Setup Failed")
return response.text("Invalid webhook")
@telegram_webhook.route("/webhook", methods=["GET", "POST"])
async def message(request: Request):
if request.method == "POST":
if not out_channel.get_me()["username"] == self.verify:
logger.debug("Invalid access token, check it matches Telegram")
return response.text("failed")
update = Update.de_json(request.json, out_channel)
if self._is_button(update):
msg = update.callback_query.message
text = update.callback_query.data
else:
msg = update.message
if self._is_user_message(msg):
text = msg.text.replace("/bot", "")
elif self._is_location(msg):
text = '{{"lng":{0}, "lat":{1}}}'.format(
msg.location.longitude, msg.location.latitude
)
elif self._is_photo(msg):
ff= msg.photo[-1].get_file()
fsize= ff.file_size
furl= str(ff.file_path)
fid= ff.file_id
data= {
"r_image_url": furl,
"r_image_size": fsize,
"r_image_id": fid
}
text= "/receive_image{}".format(json.dumps(data))
print("TEXT: {}".format(text))
#print(ff)
#print("Id: {}".format(ff.file_id))
#print("Size: {}".format(ff.file_size))
#print("Path: {}".format(ff.file_path))
#ff.download("ram.jpg")
else:
return response.text("success")
sender_id = msg.chat.id
try:
if text == (INTENT_MESSAGE_PREFIX + USER_INTENT_RESTART):
await on_new_message(
UserMessage(
text, out_channel, sender_id, input_channel=self.name()
)
)
await on_new_message(
UserMessage(
"/start",
out_channel,
sender_id,
input_channel=self.name(),
)
)
else:
await on_new_message(
UserMessage(
text, out_channel, sender_id, input_channel=self.name()
)
)
except Exception as e:
logger.error(
"Exception when trying to handle message.{0}".format(e)
)
logger.debug(e, exc_info=True)
if self.debug_mode:
raise
pass
return response.text("success")
return telegram_webhook
def get_output_channel(self) -> TelegramOutput:
channel = TelegramOutput(self.access_token)
channel.setWebhook(self.webhook_url)
return channel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment