Skip to content

Instantly share code, notes, and snippets.

@amn41
Created August 31, 2017 07:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amn41/bdfef63a233b28cf6b2d751be3c5e699 to your computer and use it in GitHub Desktop.
Save amn41/bdfef63a233b28cf6b2d751be3c5e699 to your computer and use it in GitHub Desktop.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import logging
from flask import Blueprint, request, jsonify
import requests
from rasa_dm.channels.channel import UserMessage, OutputChannel
from rasa_dm.channels.rest import HttpInputComponent
logger = logging.getLogger(__name__)
class CustomBot(OutputChannel):
"""A bot that uses a custom channel to communicate."""
def __init__(self, access_token):
# you probably need a token to connect to your channel?
self.access_token = access_token
def send_text_message(self, recipient_id, message):
# you probably use http to send a messgage
url="http://my-custom-endpoint/message"
requests.post(
url,
headers={"Auth-token": self.access_token},
message
)
class CustomInput(HttpInputComponent):
def __init__(self, access_token):
self.access_token = access_token
self.out_channel = CustomBot(self.access_token)
def blueprint(self, on_new_message):
custom_webhook = Blueprint('custom_webhook', __name__)
@custom_webhook.route("/", methods=['GET'])
def health():
return jsonify({"status": "ok"})
@custom_webhook.route("/webhook", methods=['POST'])
def receive():
payload = request.json
sender = payload["sender"]
text = payload["message"]
on_new_message(UserMessage(text, self.out_channel, sender))
return "success"
return custom_webhook
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment