Skip to content

Instantly share code, notes, and snippets.

@enginebai

enginebai/bot.py

Last active Jun 20, 2017
Embed
What would you like to do?
Chatbot implement
#!/usr/bin/python3
# -*- encoding: utf-8 -*-
import configparser
import time
from enum import Enum
from nlp_parser import parse_sentence, Intent
from message import Messager, QuickReply, GenericElement, ActionButton, ButtonType
import config
import api
__author__ = "Engine Bai"
class Bot(object):
def __init__(self, sender, locale="zh_TW"):
self.sender = sender
self.locale = locale
self._config = configparser.ConfigParser()
self._config.read("res/strings")
self._reset_context()
self._context_intent = Intent.HELP.name
self._messager = Messager(config.access_token)
def receive_message(self, message_payload):
current_intent = parse_sentence(message_payload["message"])
if current_intent == self._context_intent: # don't handle duplicate intent
return
else:
self._context_intent = current_intent
self._handle_intent(message_payload)
def receive_postback(self, message_payload):
current_intent = message_payload["postback"]["payload"]
if current_intent == self._context_intent:
return
else:
self._context_intent = current_intent
self._handle_intent(message_payload)
def _handle_intent(self, message_payload):
print("Handle intent", self.sender, self._context_intent, message_payload)
if self._context_intent == Intent.HELP.name or self._context_intent == Intent.CONFUSED.name:
self.send_help()
elif self._context_intent == Intent.PROJECTS.name:
projects = api.data["projects"]
project_list = []
for project_id in projects.keys():
project = projects[project_id]
project_list.append(GenericElement(project["title"], project["description"],
config.api_root + project["image_url"], [
ActionButton(ButtonType.POSTBACK,
self._get_string("button_more"),
# Payload用Intent本身作為開頭
payload=Intent.PROJECTS.name + project_id)
]))
self._messager.send_generic(self.sender, project_list)
elif self._context_intent == Intent.ARTICLES.name:
self.send_link_list("articles")
elif self._context_intent == Intent.ADVANTAGES.name or self._context_intent == Intent.PERSONALITY.name:
contents = api.data[self._context_intent.lower()]
for content in contents:
self.send_contents(content)
self.next()
elif
# 略...
pass
else:
if self._context_intent is None:
self._context_intent = Intent.CONFUSED.name
self.send_help()
# 直接從Payload來的
elif self._context_intent.startswith(Intent.PROJECTS.name):
project = api.data[Intent.PROJECTS.name.lower()][self._context_intent.replace(Intent.PROJECTS.name, "")]
for detail in project["detail"]:
self.send_contents(detail)
self.next()
def send_help(self, restart=False):
"""
傳送幫助資訊
:param restart:
:return:
"""
self._messager.send_quick_replies(self.sender,
self._get_string("title_help_restart")
if restart else self._get_string("title_help"), [
QuickReply(self._get_string("button_works_primary"),
Intent.WORKS_PRIMARY.name),
QuickReply(self._get_string("button_works_secondary"),
Intent.WORKS_SECONDARY.name),
QuickReply(self._get_string("button_advantages"), Intent.ADVANTAGES.name),
QuickReply(self._get_string("button_personality"),
Intent.PERSONALITY.name),
QuickReply(self._get_string("button_contact_me"), Intent.CONTACT_ME.name)
])
def short_break(self, sleep=1):
"""
每一個句子之間的停頓
:param sleep: 停頓秒數,預設為1秒
:return:
"""
self._messager.typing(self.sender)
time.sleep(sleep)
def next(self):
"""
在每一個段落之後,可以短暫停留然後繼續問說下一步
:return:
"""
self.short_break(2)
self.send_help(True)
def send_link_list(self, data_payload):
"""
傳送列表資料
:param data_payload:
:return:
"""
data = api.data[data_payload]
items_list = []
for item_id in data.keys():
item = data[item_id]
links = item["link"]
action_buttons = []
for i in range(len(links)):
action_buttons.append(ActionButton(ButtonType.WEB_URL, self._get_string("button_link") +
(str(i + 1) if len(links) > 1 else ""), links[i]))
items_list.append(GenericElement(item["title"], item["description"],
config.api_root + item["image_url"], action_buttons))
self._messager.send_generic(self.sender, items_list)
self.next()
def send_contents(self, detail):
"""
傳送一般語句和圖片或連結
:param detail:
:return:
"""
self.short_break()
if "message" in detail:
self._messager.send_text(self.sender, detail["message"])
elif "image_url" in detail and "link" not in detail:
self._messager.send_image(self.sender, config.api_root + detail["image_url"])
elif "title" in detail and "link" in detail:
self._messager.send_generic(self.sender, [
GenericElement(detail["title"], detail["subtitle"], config.api_root + detail["image_url"], [
ActionButton(ButtonType.WEB_URL, detail["button"], detail["link"])
])])
def _reset_context(self):
self._current_task = None
self._current_data = None
self._context_intent = Intent.HELP.name
def _get_string(self, key):
"""
從文字檔取得句子
:param key:
:return:
"""
return self._config[self.locale][key]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.