Skip to content

Instantly share code, notes, and snippets.

@lukas-hetzenecker
Created May 15, 2018 09:15
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save lukas-hetzenecker/e8f8479275fb8df0ed24ab993bb02532 to your computer and use it in GitHub Desktop.
Save lukas-hetzenecker/e8f8479275fb8df0ed24ab993bb02532 to your computer and use it in GitHub Desktop.
import logging
import json
import google.oauth2.credentials
import time
from google.assistant.library import Assistant
from PyQt5.QtCore import pyqtProperty, pyqtSignal, pyqtSlot, QObject, QThread
from google.assistant.library.event import EventType
logger = logging.getLogger(__name__)
class AssistantService(QObject):
credentialsChanged = pyqtSignal(str)
conversationTurnStarted = pyqtSignal()
endOfUtterance = pyqtSignal()
recognizingSpeechFinished = pyqtSignal(str, arguments=['text'])
respondingFinished = pyqtSignal()
def __init__(self, parent=None):
super().__init__(parent)
self._credentialFile = ""
self._assistantThread = None
@pyqtProperty('QString', notify=credentialsChanged)
def credentials(self):
return self._credentialFile
@credentials.setter
def credentials(self, credentialFile):
self._credentialFile = credentialFile
self.credentialsChanged.emit(self._credentialFile)
@pyqtSlot()
def start(self):
def processEvent(event):
logger.debug("got event %s", event)
if event.type == EventType.ON_CONVERSATION_TURN_STARTED:
self.conversationTurnStarted.emit()
elif event.type == EventType.ON_END_OF_UTTERANCE:
self.endOfUtterance.emit()
elif event.type == EventType.ON_RECOGNIZING_SPEECH_FINISHED:
self.recognizingSpeechFinished.emit(event.args['text'])
elif event.type == EventType.ON_RESPONDING_FINISHED:
self.respondingFinished.emit()
self._assistantThread = AssistantThread(self._credentialFile)
self._assistantThread.receivedEvent.connect(processEvent)
self._assistantThread.start()
@pyqtSlot()
def startConversation(self):
if self._assistantThread is not None:
self._assistantThread.startConversation()
@pyqtSlot()
def stopConversation(self):
if self._assistantThread is not None:
self._assistantThread.stopConversation()
class AssistantThread(QThread):
receivedEvent = pyqtSignal(object)
def __init__(self, credentialFile, parent=None):
super().__init__(parent)
with open(credentialFile, 'r') as f:
self._credentials = google.oauth2.credentials.Credentials(token=None,
**json.load(f))
self._assistant = None
def run(self):
logger.debug("Starting assistant...")
try:
with Assistant(self._credentials, 'luhe-smart-mirror-assistant-hallway') as self._assistant:
for event in self._assistant.start():
self.receivedEvent.emit(event)
except Exception as e:
logger.error(e)
time.sleep(10)
self.run()
@pyqtSlot()
def startConversation(self):
self._assistant.start_conversation()
@pyqtSlot()
def stopConversation(self):
self._assistant.stop_conversation()
import logging
import asyncio
import websockets
import json
import ssl
import collections
from PyQt5.QtCore import pyqtProperty, pyqtSignal, pyqtSlot, QObject, QThread
logger = logging.getLogger(__name__)
class HomeAssistantService(QObject):
hostnameChanged = pyqtSignal(str)
portChanged = pyqtSignal(int)
secureChanged = pyqtSignal(bool)
passwordChanged = pyqtSignal(str)
def __init__(self, parent=None):
super().__init__(parent)
self._hostname = "localhost"
self._port = 8123
self._secure = False
self._password = ''
self._websocket = None
self._lastId = 0
self._states = {}
self._state_listeners = collections.defaultdict(list)
@pyqtProperty(str, notify=hostnameChanged)
def hostname(self):
return self._hostname
@hostname.setter
def hostname(self, hostname):
self._hostname = hostname
self.hostnameChanged.emit(self._hostname)
@pyqtProperty(int, notify=portChanged)
def port(self):
return self._port
@port.setter
def port(self, port):
self._port = port
self.portChanged.emit(self._port)
@pyqtProperty(bool, notify=secureChanged)
def secure(self):
return self._secure
@secure.setter
def secure(self, secure):
self._secure = secure
self.secureChanged.emit(self._secure)
@pyqtProperty(str, notify=passwordChanged)
def password(self):
return self._password
@password.setter
def password(self, password):
self._password = password
self.passwordChanged.emit(self._password)
@pyqtSlot()
def start(self):
asyncio.ensure_future(self._start())
@pyqtSlot(str, str, 'QVariantMap')
def callService(self, domain, service, service_data={}):
self._lastId += 1
asyncio.ensure_future(self._websocket.send(json.dumps({
'id': self._lastId,
'type': 'call_service',
'domain': domain,
'service': service,
'service_data': service_data
})))
def add_state_listener(self, entity_id, listener):
self._state_listeners[entity_id].append(listener)
return self._states.get(entity_id, {})
def remove_state_listener(self, entity_id, listener):
self._state_listeners[entity_id].remove(listener)
def _update_state(self, entity_id, state):
self._states[entity_id] = state
for listener in self._state_listeners[entity_id]:
listener(state)
async def _start(self):
SUBSCRIBE_EVENTS_ID = 1
GET_STATES_ID = 2
try:
url = '%s://%s:%s/api/websocket' % (
'wss' if self._secure else 'ws', self._hostname, self._port)
logger.debug('Connecting to home-assistant websocket %s...', url)
self._websocket = await websockets.connect(url)
logger.debug('Connected to home-assistant websocket')
auth_message = await self._websocket.recv()
logger.debug(auth_message)
if self._password:
await self._websocket.send(json.dumps({'type': 'auth', 'api_password': self._password}))
auth_message = await self._websocket.recv()
logger.debug(auth_message)
await self._websocket.send(json.dumps({'id': SUBSCRIBE_EVENTS_ID, 'type': 'subscribe_events', 'event_type': 'state_changed'}))
await self._websocket.send(json.dumps({'id': GET_STATES_ID, 'type': 'get_states'}))
self._lastId = GET_STATES_ID
while True:
data = await self._websocket.recv()
if not data:
break
message = json.loads(data)
#logger.debug(message)
if message['id'] == GET_STATES_ID:
for state in message['result']:
self._update_state(state['entity_id'], state)
elif message['type'] == 'event':
event = message['event']
if event['event_type'] == 'state_changed':
entity_id = event['data']['entity_id']
new_state = event['data']['new_state']
self._update_state(entity_id, new_state)
except Exception as e:
logger.error(e)
await asyncio.sleep(10.0)
asyncio.ensure_future(self._start())
class HomeAssistantStateListener(QObject):
entityIdChanged = pyqtSignal(str)
connectionChanged = pyqtSignal(HomeAssistantService)
stateChanged = pyqtSignal(str, arguments=['state'])
def __init__(self, parent=None):
super().__init__(parent)
self._entityId = None
self._connection = None
self._state = {}
@pyqtProperty(str, notify=entityIdChanged)
def entityId(self):
return self._entityId
@entityId.setter
def entityId(self, entityId):
if self._connection is not None and self._entityId is not None:
self._connection.remove_state_listener(self._entityId)
self._entityId = entityId
state = self._connection.add_state_listener(entityId, self._update)
self.entityIdChanged.emit(self._entityId)
self._update(state)
@pyqtProperty(HomeAssistantService, notify=connectionChanged)
def connection(self):
return self._entityId
@connection.setter
def connection(self, connection):
print("connection is now", connection)
self._connection = connection
self.connectionChanged.emit(self._entityId)
if self._entityId is not None:
state = self._connection.add_state_listener(self._entityId, self._update)
self._update(state)
@pyqtProperty(str, notify=stateChanged)
def state(self):
return self._state.get('state', 'unknown')
def _update(self, state):
self._state = state
self.stateChanged.emit(self.state)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment