Skip to content

Instantly share code, notes, and snippets.

@sboily
Created December 2, 2019 20:32
Show Gist options
  • Save sboily/221ee47e50db5b433308bd1f88bc7231 to your computer and use it in GitHub Desktop.
Save sboily/221ee47e50db5b433308bd1f88bc7231 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import yaml
import requests
from concurrent.futures import ThreadPoolExecutor
from wazo_auth_client import Client as Auth
from wazo_calld_client import Client as Calld
from wazo_websocketd_client import Client as Websocketd
class Wazo:
def __init__(self, config_file):
self.config = self._get_config(config_file)
self.host = self.config['wazo']['host']
self.username = self.config['wazo']['username']
self.password = self.config['wazo']['password']
self.port = self.config['wazo']['port']
self.backend = self.config['wazo']['backend']
self.application_uuid = self.config['wazo']['application_uuid']
self.expiration = 3600
self.calld = None
self.auth = None
self.ws = None
self._callbacks = {}
self._threadpool = ThreadPoolExecutor(max_workers=1)
def on(self, event, callback):
self._callbacks[event] = callback
def run(self):
self._connect()
def _get_config(self, config_file):
with open(config_file) as config:
data = yaml.load(config, Loader=yaml.SafeLoader)
return data if data else {}
def _connect(self):
print('Connection...')
token_data = self._get_token()
token = token_data['token']
self.calld = Calld(self.host, token=token, prefix='api/calld', port=self.port, verify_certificate=False)
self.ws = Websocketd(self.host, token=token, verify_certificate=False)
self._threadpool.submit(self._ws, self._callbacks)
print('Connected...')
def _ws(self, events):
for event in events:
self.ws.on(event, events[event])
self.ws.run()
def _get_token(self):
self.auth = Auth(self.host, username=self.username, password=self.password, prefix='api/auth', port=self.port, verify_certificate=False)
return self.auth.token.new(self.backend, expiration=self.expiration)