Skip to content

Instantly share code, notes, and snippets.

@nocarryr
Last active March 14, 2022 23:34
Show Gist options
  • Save nocarryr/33d9eabc8a4ca6034c26fdd3b0dd0a7d to your computer and use it in GitHub Desktop.
Save nocarryr/33d9eabc8a4ca6034c26fdd3b0dd0a7d to your computer and use it in GitHub Desktop.
Experimenting with the Birddog REST API
#! /usr/bin/env python3
import typing as tp
import enum
from dataclasses import dataclass
import json
from bs4 import BeautifulSoup
import requests
import click
class OperationMode(enum.Enum):
encode = enum.auto()
decode = enum.auto()
class AudioOutput(enum.Enum):
DecodeMain = enum.auto()
DecodeComms = enum.auto()
DecodeLoop = enum.auto()
class VideoOutput(enum.Enum):
sdi = enum.auto()
hdmi = enum.auto()
LowLatency = enum.auto()
NormalMode = enum.auto()
@dataclass
class AudioOutputSetup:
input_gain: int
output_gain: int
output_select: AudioOutput
@classmethod
def from_api(cls, data: tp.Dict) -> 'AudioOutputSetup':
kw = dict(
input_gain=int(data['AnalogAudioInGain']),
output_gain=int(data['AnalogAudioOutGain']),
output_select=getattr(AudioOutput, data['AnalogAudiooutputselect']),
)
return cls(**kw)
@dataclass
class DeviceSettings:
operation_mode: OperationMode
video_output: VideoOutput
audio_setup: AudioOutputSetup
def to_form_data(self) -> tp.Dict:
form_data = {
'mode': self.operation_mode.name,
'vid12g_loop_if': self.video_output.name,
'AnalogAudioInGain': self.audio_setup.input_gain,
'AnalogAudioOutGain': self.audio_setup.output_gain,
'AnalogAudiooutputselect': self.audio_setup.output_select.name,
}
return form_data
@dataclass
class NdiSource:
name: str
address: tp.Optional[str] = None
index: tp.Optional[int] = None
is_current: tp.Optional[bool] = False
def format(self):
if self.is_current:
prefix = '-->'
else:
prefix = ' '
ix = self.index
if ix is None:
ix = ' '
else:
ix = f'{ix:2d}'
return f'{prefix} [{ix}] "{self.name}" ({self.address})'
def format_url(base_url: str, api_method: str) -> str:
base_url = base_url.rstrip('/')
if not base_url.endswith(':8080'):
base_url = f'{base_url}:8080'
if '://' not in base_url:
base_url = f'http://{base_url}'
return f'{base_url}/{api_method}'
class AuthClient:
def __init__(self, base_url: str, password: str = 'birddog'):
if '://' not in base_url:
base_url = f'http://{base_url}'
base_url = base_url.rstrip('/')
self.base_url = base_url
self.password = password
self.session = requests.Session()
self.settings = None
self._logged_in = False
def format_url(self, *paths) -> str:
full_path = '/'.join(paths)
return f'{self.base_url}/{full_path}'
def get(self, *paths):
if not self._logged_in:
self._login()
url = self.format_url(*paths)
r = self.session.get(url)
r.raise_for_status()
return r
def post(self, *paths, data=None):
if not self._logged_in:
self._login()
url = self.format_url(*paths)
if data is not None:
r = self.session.post(url, data=data)
else:
r = self.session.post(url)
r.raise_for_status()
return r
def get_settings(self):
r = self.get('videoset')
soup = BeautifulSoup(r.content, 'html5lib')
mode_form = soup.find(id='mod_sel')
op_mode = None
for input_id in ['encode', 'decode']:
in_el = mode_form.find(id=input_id)
if 'checked' in in_el.attrs:
op_mode = input_id
assert op_mode is not None
op_mode = getattr(OperationMode, op_mode)
vout = None
for input_id in ['sdi', 'hdmi']:
in_el = mode_form.find(id=input_id)
if 'checked' in in_el.attrs:
vout = input_id
assert vout is not None
vout = getattr(VideoOutput, vout)
audio_setup = get_audio_setup(self.base_url)
self.settings = DeviceSettings(
operation_mode=op_mode,
video_output=vout,
audio_setup=audio_setup,
)
return self.settings
def set_operation_mode(self, mode: OperationMode):
settings = self.get_settings()
settings.operation_mode = mode
form_data = settings.to_form_data()
self.post('videoset', data=form_data)
def set_video_output(self, video_output: VideoOutput):
settings = self.get_settings()
settings.video_output = video_output
form_data = settings.to_form_data()
self.post('videoset', data=form_data)
def refresh_sources(self):
self.post('videoset', data={'add_new_sources':'new_sources'})
def _logout(self):
url = self.format_url('logout')
r = self.session.post(url)
r.raise_for_status()
self._logged_in = False
def _login(self):
url = self.format_url('login')
r = self.session.post(url, data={'auth_password':self.password})
r.raise_for_status()
self._logged_in = True
def __enter__(self):
self._login()
return self
def __exit__(self, *args):
try:
if self._logged_in:
self._logout
finally:
self.session.close()
self.session = requests.Session()
def get(base_url: str, api_method: str, params=None) -> tp.Union[bytes, tp.Dict]:
url = format_url(base_url, api_method)
if params is not None:
r = requests.get(url, params=params)
else:
r = requests.get(url)
r.raise_for_status()
try:
content = r.json()
except json.JSONDecodeError:
content = r.content
return content
def post(
base_url: str, api_method: str,
data: tp.Optional[tp.Dict] = None, timeout: tp.Optional[float] = None,
form_encoded: bool = False,
) -> tp.Union[bytes, tp.Dict]:
if data is None:
kw = {'headers':{'Accept':'text'}}
elif form_encoded:
kw = {'headers':{'Accept':'text'}, 'data':data}
else:
kw = {
'headers':{'Content-Type': 'application/json'},
'data':json.dumps(data),
}
if timeout is not None:
kw['timeout'] = timeout
url = format_url(base_url, api_method)
r = requests.post(url, **kw)
r.raise_for_status()
try:
content = r.json()
except json.JSONDecodeError:
content = r.content
return content
def get_hostname(base_url: str) -> str:
content = get(base_url, 'hostname')
if isinstance(content, bytes):
content = content.decode()
return content
def reboot(base_url: str):
# Send a request to make sure the device is on the network
# (since we're ignoring the reboot response)
h = get_hostname(base_url)
try:
r = get(base_url, 'reboot')
except requests.ConnectionError:
pass
def restart(base_url: str):
# Send a request to make sure the device is on the network
# (since we're ignoring the reboot response)
h = get_hostname(base_url)
try:
r = get(base_url, 'restart')
except requests.ConnectionError:
raise
def get_mode(base_url: str) -> OperationMode:
content = get(base_url, 'operationmode')
if isinstance(content, dict):
try:
mode = content['mode']
except KeyError:
print(f'{content=}')
raise
else:
mode = content.decode()
return getattr(OperationMode, mode)
def set_mode(base_url: str, mode: tp.Union[str, OperationMode]):
if isinstance(mode, str):
mode = getattr(OperationMode, mode)
with AuthClient(base_url) as client:
client.set_operation_mode(mode)
def get_audio_setup(base_url: str) -> AudioOutputSetup:
content = get(base_url, 'analogaudiosetup')
return AudioOutputSetup.from_api(content)
# TODO: The devices respond to this api endpoint with a 404. Firmware bug?
def get_video_output(base_url: str) -> VideoOutput:
with AuthClient(base_url) as client:
settings = client.get_settings()
return settings.video_output
def set_video_output(base_url: str, video_output: tp.Union[str, VideoOutput]):
if isinstance(video_output, str):
video_output = getattr(VideoOutput, video_output)
with AuthClient(base_url) as client:
client.set_video_output(video_output)
def list_sources(base_url: str) -> tp.Iterable[NdiSource]:
current_src = get_source(base_url)
content = get(base_url, 'List')
for i, key in enumerate(content.keys()):
val = content[key]
is_current = key == current_src.name
yield NdiSource(name=key, address=val, index=i, is_current=is_current)
def get_source(base_url: str) -> NdiSource:
content = get(base_url, 'connectTo')
return NdiSource(name=content['sourceName'])
def set_source(base_url: str, source: tp.Union[str, NdiSource, int]):
if isinstance(source, int):
source_index = source
sources = {src.index:src for src in list_sources(base_url)}
source = sources[source_index]
if isinstance(source, NdiSource):
source = source.name
content = post(base_url, 'connectTo', {'sourceName':source})
assert content == b'success'
def refresh_sources(base_url: str):
with AuthClient(base_url) as client:
client.refresh_sources()
@click.group()
@click.argument('url', type=str)
@click.pass_context
def cli(ctx, **kwargs):
ctx.ensure_object(dict)
ctx.obj.update({k:v for k,v in kwargs.items()})
@cli.command()
@click.pass_context
def hostname(ctx):
h = get_hostname(ctx.obj['url'])
hurl = f'http://{h}.local'
click.echo(f'{h} -> {hurl}')
@cli.command(name='reboot')
@click.pass_context
def cli_reboot(ctx):
reboot(ctx.obj['url'])
click.echo('Device rebooting')
@cli.command(name='restart')
@click.pass_context
def cli_restart(ctx):
restart(ctx.obj['url'])
click.echo('Video system restarting')
@cli.group()
@click.pass_context
def mode(ctx):
pass
@mode.command(name='get')
@click.pass_context
def cli_mode_get(ctx):
current_mode = get_mode(ctx.obj['url'])
click.echo(f'Current Mode: "{current_mode.name}"')
@mode.command(name='set')
@click.argument('new_mode', type=click.Choice(['encode', 'decode']))
@click.pass_context
def cli_mode_set(ctx, new_mode):
set_mode(ctx.obj['url'], new_mode)
click.echo(f'Mode set to "{new_mode}". Rebooting device...')
reboot(ctx.obj['url'])
@cli.group(name='audio')
@click.pass_context
def cli_audio(ctx):
pass
@cli_audio.command(name='get')
@click.pass_context
def cli_audio_get(ctx):
audio = get_audio_setup(ctx.obj['url'])
click.echo(str(audio))
# TODO: Disabled as part of :func:`get_video_output` and :func:`set_video_output` issues
@cli.group(name='output')
@click.pass_context
def cli_output(ctx):
pass
@cli_output.command(name='get')
@click.pass_context
def cli_output_get(ctx):
output = get_video_output(ctx.obj['url'])
click.echo(f'Video output: "{output.name}"')
@cli_output.command(name='set')
@click.argument('mode', type=click.Choice(['sdi', 'hdmi']))
@click.pass_context
def cli_output_set(ctx, mode):
set_video_output(ctx.obj['url'], mode)
click.echo(f'Video output set to "{mode.name}"')
@cli.group()
@click.pass_context
def source(ctx):
pass
@source.command(name='refresh')
@click.pass_context
def source_refresh(ctx):
click.echo('Refreshing...')
refresh_sources(ctx.obj['url'])
click.echo('Available Sources:')
for src in list_sources(ctx.obj['url']):
click.echo(src.format())
@source.command(name='current')
@click.pass_context
def current_source(ctx):
src = get_source(ctx.obj['url'])
click.echo(f'Current Source:')
click.echo(src.format())
@source.command(name='list')
@click.pass_context
def cli_list_sources(ctx):
click.echo('Available Sources:')
for src in list_sources(ctx.obj['url']):
click.echo(src.format())
@source.command(name='set')
@click.argument('source', type=str)
@click.pass_context
def cli_set_source(ctx, source):
if source.isdigit():
source = int(source)
set_source(ctx.obj['url'], source)
for src in list_sources(ctx.obj['url']):
click.echo(src.format())
if __name__ == '__main__':
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment