Skip to content

Instantly share code, notes, and snippets.

@VincentK16
Last active December 22, 2021 06:16
Show Gist options
  • Save VincentK16/79776687d7fd683b11f9ed39b71f88cd to your computer and use it in GitHub Desktop.
Save VincentK16/79776687d7fd683b11f9ed39b71f88cd to your computer and use it in GitHub Desktop.
Alpha Mini with IBM Watson Assistant + Robot Actions
# To run this code, make sure:
# 1. Have installed "alphamini" and "ibm-watson" Python modules
# 2. Changes to your IBM Watson Assistant parameters:
# a. API key
# b. Assistant URL
# c. Assistant ID
# 3. Alpha Mini serial number
# Follow through the code below to see where are the changes needed.
import asyncio
from typing import Counter
import mini.mini_sdk as MiniSdk
from mini.apis.api_observe import ObserveSpeechRecognise
from mini.apis.api_sound import StartPlayTTS
from mini.apis.api_action import PlayAction, PlayActionResponse
from mini.dns.dns_browser import WiFiDevice
from mini.apis.base_api import MiniApiResultType
from mini.pb2.codemao_speechrecognise_pb2 import SpeechRecogniseResponse
import json
from ibm_watson import AssistantV2
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
#from test.test_connect import test_connect, shutdown
#from test.test_connect import test_get_device_by_name, test_start_run_program
authenticator = IAMAuthenticator(
'2mLWS_yLl13ZgCy7WT_G6YFXJmCskbUUEb078vTIfZ2O')
assistant = AssistantV2(
version='2020-09-24',
authenticator=authenticator
)
assistant.set_service_url('https://api.us-south.assistant.watson.cloud.ibm.com/')
async def test_play_action(y):
# action_name: The action name can be obtained with GetActionList API: http://docs.ubtrobot.com/alphamini/python-sdk-en/mini.apis.api_action.html#mini.apis.api_action.GetActionList
block: PlayAction = PlayAction(action_name=y)
# response: PlayActionResponse
(resultType, response) = await block.execute()
print(f'test_play_action result:{response}')
assert resultType == MiniApiResultType.Success, 'test_play_action timetout'
assert response is not None and isinstance(response, PlayActionResponse), 'test_play_action result unavailable'
assert response.isSuccess, 'play_action failed'
async def test_connect(dev: WiFiDevice) -> bool:
"""Connect the device
Connect the specified device
Args:
dev (WiFiDevice): Specified device object WiFiDevice
Returns:
bool: Whether the connection is successful
"""
return await MiniSdk.connect(dev)
# Disconnect and release resources
async def shutdown():
"""Disconnect and release resources
Disconnect the currently connected device and release resources
"""
await MiniSdk.quit_program()
await MiniSdk.release()
async def test_get_device_by_name():
"""Search for devices based on the suffix of the robot serial number
To search for the robot with the specified serial number (behind the robot's butt), you can just enter the tail character of the serial number, any length, it is recommended that more than 5 characters can be matched accurately, and the timeout is 10 seconds
Returns:
WiFiDevice: Contains robot name, ip, port and other information
"""
result: WiFiDevice = await MiniSdk.get_device_by_name("1644", 10)
print(f"test_get_device_by_name result:{result}")
return result
async def test_start_run_program():
"""Enter programming mode
Make the robot enter the programming mode, wait for the response result, and delay 6 seconds, let the robot finish "Enter the programming mode"
Returns:
None:
"""
await MiniSdk.enter_program()
asyncio.create_task(__tts("Hello guest, welcome to Marina Bay Sands Singapore Hotel! How can I help you?"))
asyncio.create_task(test_play_action("action_014"))
async def __tts(x):
#api_address='http://api.openweathermap.org/data/2.5/weather?appid=dc20d3160e7d504175d202850b79b74b&q=Singapore'
#city = input('City Name :')
#url = api_address
#json_data = requests.get(url).json()
#format_add = json_data['weather'][0]['description']
print (x)
#print(format_add)
block: StartPlayTTS = StartPlayTTS(text=x)
response = await block.execute()
print(f'tes_play_tts: {response}')
async def open_hour():
block: StartPlayTTS = StartPlayTTS(text="We are open from 10 in the morning to 6 in the evening")
response = await block.execute()
print(f'tes_play_tts: {response}')
# 测试监听语音识别
async def test_speech_recognise():
create = assistant.create_session(assistant_id='b197f077-ca47-44ca-9ea2-fcf0f9215a8e').get_result()
print(create)
value = create["session_id"]
"""Monitor voice recognition demo
Monitor voice recognition events, and the robot reports the text after voice recognition
When the voice is recognized as "hello", broadcast "Hello, I am alphamini, Lalila, Lalila"
When the voice is recognized as "stop", stop monitoring
# SpeechRecogniseResponse.text
# SpeechRecogniseResponse.isSuccess
# SpeechRecogniseResponse.resultCode
"""
# 语音监听对象
observe: ObserveSpeechRecognise = ObserveSpeechRecognise()
# 处理器
# SpeechRecogniseResponse.text
# SpeechRecogniseResponse.isSuccess
# SpeechRecogniseResponse.resultCode
def handler(msg: SpeechRecogniseResponse):
count = 0
print(f'=======handle speech recognise:{msg}')
print(msg.text.lower())
print(type(msg.text.lower()))
print("{0}".format(str(msg.text.lower())))
response = assistant.message(assistant_id='b197f077-ca47-44ca-9ea2-fcf0f9215a8e', session_id=value, input={'message_type': 'text','text':str(msg.text)}).get_result()
#print(json.dumps(response, indent=2))
if len (response["output"]["generic"]) > 1:
text = response["output"]["generic"][0]["text"] + ' ' + response["output"]["generic"][1]["text"]
else:
text = response["output"]["generic"][0]["text"]
#watson_response = response["output"]["generic"][0]["text"]
asyncio.create_task(__tts(text))
if "day" in text:
asyncio.create_task(test_play_action("action_019"))
if "time" in text:
asyncio.create_task(test_play_action("Surveillance_006"))
if "confirm" in text:
asyncio.create_task(test_play_action("action_012"))
if "safe" in text:
asyncio.create_task(test_play_action("action_018"))
# if str(msg.text)[-1].isalpha() is False:
# if str(msg.text)[:-1].lower() == "Hello":
# asyncio.create_task(__tts())
#if str(msg.text).lower() == "weather":
# print("I am in weather loop")
# 监听到"悟空", tts打个招呼
# asyncio.create_task(test_play_action())
# asyncio.create_task(MiniSdk.play_online_audio("http://hao.haolingsheng.com/ring/000/995/52513bb6a4546b8822c89034afb8bacb.mp3"))
#elif str(msg.text).lower() == "what is your opening hours?":
# print("I am in oepning hours loop")
# asyncio.create_task(open_hour())
# asyncio.create_task(test_play_action())
# asyncio.create_task(MiniSdk.play_expression('codemao19'))
#elif str(msg.text).lower() == "stop":
# 监听到结束, 停止监听
# observe.stop()
# 结束event_loop
# asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop)
observe.set_handler(handler)
# 启动
observe.start()
await asyncio.sleep(0)
if __name__ == '__main__':
MiniSdk.set_robot_type(MiniSdk.RobotType.EDU)
device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name())
if device:
asyncio.get_event_loop().run_until_complete(test_connect(device))
asyncio.get_event_loop().run_until_complete(test_start_run_program())
asyncio.get_event_loop().run_until_complete(test_speech_recognise())
# 定义了事件监听对象,必须让event_loop.run_forver()
asyncio.get_event_loop().run_forever()
asyncio.get_event_loop().run_until_complete(shutdown())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment