Skip to content

Instantly share code, notes, and snippets.

@jamesmf
Last active February 18, 2020 05:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jamesmf/d651f15661af60fc7d98926553edec54 to your computer and use it in GitHub Desktop.
Save jamesmf/d651f15661af60fc7d98926553edec54 to your computer and use it in GitHub Desktop.
Mycroft as a WoT Thing
from __future__ import division
from webthing import Action, Event, Property, SingleThing, Thing, Value, WebThingServer
from mycroft_bus_client import MessageBusClient, Message
import logging
import time
import uuid
class QuestionAction(Action):
def __init__(self, thing, input_):
Action.__init__(self, uuid.uuid4().hex, thing, "question", input_=input_)
def perform_action(self):
print(self.input)
self.thing.client.emit(
Message("question:query", data={"phrase": self.input["query"]})
)
class SpeakEvent(Event):
def __init__(self, thing, data):
Event.__init__(self, thing, "mycroftsaid", data=data)
def make_thing(client):
thing = Thing(
"test:mycroft:mycroft-hook-1234", "Mycroft", [], "A controller for Mycroft"
)
thing.client = client
thing.add_property(
Property(
thing,
"MycroftResponse",
Value(""),
metadata={
"@type": "String",
"title": "Response",
"type": "string",
"description": "Latest response from Mycroft",
},
)
)
thing.add_available_action(
"question",
{
"title": "Question",
"description": "Send a query to Mycroft",
"input": {
"type": "object",
"required": ["query"],
"properties": {"query": {"type": "string"}},
},
},
QuestionAction,
)
thing.add_available_event(
"mycroftsaid", {"description": "Mycroft Said:", "type": "string"}
)
return thing
def run_server():
client = MessageBusClient()
thing = make_thing(client)
def print_utterance(message):
if message.data.get("answer") is not None:
print('Mycroft said "{}"'.format(message.data.get("answer")))
# thing.set_property("MycroftResponse", message.data.get("answer"))
thing.add_event(SpeakEvent(thing, message.data.get("answer")))
else:
print(message.data)
if "utterance" in message.data:
print(message.data["utterance"], "utterance!")
thing.add_event(SpeakEvent(thing, data=message.data["utterance"]))
client.on("question:query.response", print_utterance)
client.on("speak", print_utterance)
client.run_in_thread()
# If adding more than one thing, use MultipleThings() with a name.
# In the single thing case, the thing's name will be broadcast.
server = WebThingServer(SingleThing(thing), port=8888)
try:
logging.info("starting the server")
server.start()
except KeyboardInterrupt:
logging.info("stopping the server")
server.stop()
logging.info("done")
if __name__ == "__main__":
logging.basicConfig(
level=10, format="%(asctime)s %(filename)s:%(lineno)s %(levelname)s %(message)s"
)
run_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment