Created
March 8, 2024 03:55
-
-
Save JonLech/f9f21ea927b623da8377f753cdfd43f1 to your computer and use it in GitHub Desktop.
Dahua VCS-SH30 speaker tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-License-Identifier: MIT | |
# SPDX-FileCopyrightText: 2024 Jon Lech Johansen <jon@nanocrew.net> | |
import mimetypes | |
import requests | |
import argparse | |
import json | |
import sys | |
import os | |
from getpass import getpass | |
parser = argparse.ArgumentParser(description="Dahua VCS-SH30 speaker tool") | |
parser.add_argument("-a", "--addr", required=True, help="IP Address of VCS-SH30") | |
parser.add_argument("-u", "--username", required=False, default="admin") | |
parser.add_argument("-p", "--password", required=False) | |
group = parser.add_mutually_exclusive_group(required=True) | |
group.add_argument("--info", action="store_true", help="Show device info") | |
group.add_argument( | |
"--list", | |
action="store_true", | |
help="List audio files on device storage", | |
) | |
group.add_argument("--play", type=int, help="Play audio file by ID") | |
group.add_argument("--stop", action="store_true", help="Stop playback") | |
group.add_argument("--delete", type=int, help="Delete audio file by ID") | |
group.add_argument("--push", type=str, help="Upload MP3 file") | |
args = parser.parse_args() | |
if args.password is None: | |
args.password = getpass() | |
API_TMPL = f"http://{args.addr}/prod-api" | |
PRG_TMPL = f"{API_TMPL}/program" | |
def handle_error(res): | |
if res.status_code != requests.codes.ok: | |
print(f"HTTP status {res.status_code}: {res.reason}") | |
sys.exit(1) | |
else: | |
rj = res.json() | |
code = rj.get("code") | |
message = rj.get("message") | |
if code is not None and code != requests.codes.ok: | |
print(f"API error {code}: {message}") | |
sys.exit(1) | |
def list_files(): | |
res = requests.get(f"{PRG_TMPL}/info", headers=headers) | |
files = res.json().get("data", {}).get("files", []) | |
return (res, {file["id"]: file["name"] for file in files}) | |
res = requests.post( | |
f"{API_TMPL}/uer/login", | |
json={"username": args.username, "password": args.password}, | |
) | |
handle_error(res) | |
token = res.json().get("data", {}).get("token") | |
if token is None: | |
print("Login failure: token is missing.") | |
sys.exit(1) | |
headers = {"X-Token": token} | |
res = None | |
if args.info: | |
res = requests.get(f"{API_TMPL}/device/info", headers=headers) | |
info = res.json().get("data") or {} | |
for key in info: | |
print(f"{key}: {info[key]}") | |
if args.list: | |
res, map = list_files() | |
if map: | |
print(json.dumps(map, indent=2)) | |
if args.delete: | |
res = requests.post( | |
f"{PRG_TMPL}/delete", | |
json={"ids": [args.delete]}, | |
headers=headers, | |
) | |
if args.push: | |
if not os.path.exists(args.push): | |
print(f"File does not exist: {args.push}") | |
sys.exit(1) | |
AUDIO_MPEG = "audio/mpeg" | |
mt = mimetypes.guess_type(args.push) | |
if mt is None or mt[0] != AUDIO_MPEG: | |
print(f"File does not appear to be an MP3: {args.push}") | |
sys.exit(1) | |
name = os.path.basename(args.push) | |
with open(args.push, "rb") as f: | |
files = {"file": (name, f, AUDIO_MPEG)} | |
res = requests.post(f"{PRG_TMPL}/upload", files=files, headers=headers) | |
handle_error(res) | |
res, map = list_files() | |
if map: | |
map = {v: k for k, v in map.items()} | |
fid = map.get(name) | |
if fid is not None: | |
print(json.dumps({fid: name}, indent=2)) | |
if args.play: | |
res = requests.post( | |
f"{PRG_TMPL}/start", | |
json={"id": args.play}, | |
headers=headers, | |
) | |
if args.stop: | |
res = requests.post( | |
f"{PRG_TMPL}/stop", | |
headers=headers, | |
) | |
if res is not None: | |
handle_error(res) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment