Skip to content

Instantly share code, notes, and snippets.

@a96lex
Created February 14, 2022 13:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save a96lex/5036473d3501bbd5321a1acfb3030f73 to your computer and use it in GitHub Desktop.
Save a96lex/5036473d3501bbd5321a1acfb3030f73 to your computer and use it in GitHub Desktop.
import json
import re
import sys
from uuid import uuid4
import aiosonic
def sanitize(out: list) -> str:
text: str = out[0]
if not text:
return "Not text"
return re.sub(r"<.+?>", "", text)
class AsyncVoiceflow2go:
client = None
async def __request(self, **kwargs):
"""
The internal request call that depends on aiosonic
"""
if not self.client:
self.client = aiosonic.HTTPClient()
response = await self.client.request(**kwargs)
content = await response.content()
if response.status_code != 200:
raise Exception(
f"voiceflow got ERROR code {response.status_code}: {content}"
)
else:
return content
async def get_voiceflow_answer(self, api_key: str, user_id: str, question: str):
"""
Gets conversation response from voiceflow's API
"""
if question.lower() == "stop":
return {"text": "STOP INTENT REQUESTED"}
data = {"request": {"type": "text", "payload": question}}
print(data)
content = await self.__request(
method="POST",
url=f"https://general-runtime.voiceflow.com/state/user/{user_id}/interact",
headers={"Authorization": api_key},
data=data,
)
try:
content: list = json.loads(content)
print("content:", content)
text: str = ""
suggestions: list = []
for c in content:
c_type = c["type"]
c_payload = c.get("payload")
if c_type == "text":
print(c_payload["message"])
text += c_payload["message"] + " " # we fix later
print("text", text)
elif c_type == "choice":
c_buttons = c_payload["buttons"]
for b in c_buttons:
suggestions.append(b["name"])
elif c_type == "no-reply":
suggestions.append("skip")
response = {"text": text}
if len(suggestions) > 0:
suggestions = [{"text": text} for text in suggestions]
response["hook"] = {
"inputType": "options",
"hookname": "voiceflowOptions",
"inputs": suggestions,
}
except Exception as e:
raise Exception(f"Could not retrieve answer from voiceflow: {e}")
return response
if __name__ == "__main__":
import os
import asyncio
from pprint import pprint as p
user_id = sys.argv[1]
question = sys.argv[2]
async def main():
api_key = os.getenv("VOICEFLOW_SUBSCRIPTION_KEY")
voiceflow = AsyncVoiceflow2go()
content = await voiceflow.get_voiceflow_answer(
api_key=api_key, user_id=user_id, question=question
)
p(content)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment