Skip to content

Instantly share code, notes, and snippets.

@andrewshulgin
Last active May 5, 2019 03:17
Show Gist options
  • Save andrewshulgin/ee383efa88d5be91d4083ece788d2996 to your computer and use it in GitHub Desktop.
Save andrewshulgin/ee383efa88d5be91d4083ece788d2996 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import datetime
import fcntl
import glob
import html.parser
import logging
import os
import re
import shutil
import socket
import subprocess
import sys
import urllib.error
import urllib.parse
import urllib.request
logging.basicConfig(level=logging.NOTSET)
logger = logging.getLogger('main')
auth_handler = urllib.request.HTTPDigestAuthHandler()
options = {
'address': None,
'output_dir': '/media/hdd/rec',
'keep_days': 30,
'lock_file': '/tmp/samsung_downloader.lock',
'socket_timeout': 10
}
class CautiousHTTPDigestAuthHandler(urllib.request.HTTPDigestAuthHandler):
def http_error_auth_reqed(self, auth_header, host, req, headers):
logger.warning('http_error_auth_reqed:%d', self.retried)
if self.retried > 2:
raise urllib.error.HTTPError(req.full_url, 401, 'digest auth failed', headers, None)
super(CautiousHTTPDigestAuthHandler, self).http_error_auth_reqed(auth_header, host, req, headers)
class ListingParser(html.parser.HTMLParser):
def __init__(self, extensions=None, *args, **kwargs):
super(ListingParser, self).__init__(*args, **kwargs)
self._is_tbody = False
self._is_td = False
self._is_a = False
self._extensions = extensions
self._tmp_entries = set()
self.entries = set()
def handle_starttag(self, tag, attrs):
if tag == 'tbody':
self._is_tbody = True
elif self._is_tbody and tag == 'td':
self._is_td = True
elif self._is_td and tag == 'a' and 'href' in dict(attrs):
target = dict(attrs)['href'].rstrip('/')
if target != '..':
name = target.split('.', 1)[0]
if target.endswith('.tmp'):
self._tmp_entries.add(name)
if name in self.entries:
self.entries.remove(name)
elif name not in self._tmp_entries:
if (
not self._extensions
or target.rsplit('.', 1)[-1] in self._extensions
):
self.entries.add(name)
def handle_endtag(self, tag):
if tag == 'tbody':
self._is_tbody = False
elif self._is_tbody and tag == 'td':
self._is_td = False
def handle_data(self, data):
pass
def error(self, message):
raise ValueError(message)
class SAMIParser(html.parser.HTMLParser):
def __init__(self, *args, **kwargs):
super(SAMIParser, self).__init__(*args, **kwargs)
self._got_sync = False
self._last_time = None
self._got_br = 0
self._got_md = False
self.start_time = None
self.end_time = None
def finalize(self):
self.end_time = self._last_time
return self.start_time, self.end_time
def handle_starttag(self, tag, attrs):
if tag == 'sync':
self._got_sync = True
if self._got_sync:
self._got_br = tag == 'br'
def handle_endtag(self, tag):
pass
def handle_data(self, data):
if self._got_md:
self._last_time = data.strip()
if self.start_time is None:
self.start_time = self._last_time
if self._got_br:
self._got_md = data.strip() == 'MD'
def error(self, message):
raise ValueError(message)
def check_single_instance():
fp = os.open(options['lock_file'], os.O_CREAT | os.O_TRUNC | os.O_WRONLY, 0o666)
try:
fcntl.lockf(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
return False
return True
def remove_old_dirs():
today = datetime.datetime.now().date()
for entry in os.listdir(options['output_dir']):
abs_path = os.path.join(options['output_dir'], entry)
if os.path.isdir(abs_path):
try:
date = datetime.datetime.strptime(entry, '%Y%m%d').date()
except ValueError:
logger.warning('remove_old_dirs:skipping:%s', entry)
continue
if today - date > datetime.timedelta(days=options['keep_days']):
logger.debug('remove_old_dirs:shutil.rmtree("%s")', abs_path)
shutil.rmtree(abs_path)
def authenticate(path, username, password):
logger.debug('authenticate:%s:%s:%s:%s', options['address'], path, username, '*' * len(password))
url = 'http://%s/%s' % (options['address'], path.lstrip('/'))
req = urllib.request.Request(url)
realm = ''
if not len(getattr(auth_handler.passwd, 'passwd', {})):
try:
urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
if e.code != 401:
raise e
realm = re.compile(r'(realm)[:=][\s"]?([^",]+)"?').search(e.headers.get('WWW-Authenticate')).group(2)
logger.debug('realm:%s', realm)
auth_handler.add_password(realm, url, username, password)
opener = urllib.request.build_opener(auth_handler)
urllib.request.install_opener(opener)
def request(path, data=None):
logger.debug('request:%s:%s', options['address'], path)
url = 'http://%s/%s' % (options['address'], path.lstrip('/'))
req = urllib.request.Request(url, data=data)
return urllib.request.urlopen(req)
def get_dirs():
logger.debug('get_dirs:%s', options['address'])
results = set()
dir_response = request('/sd/ch00/img/')
dir_parser = ListingParser()
dir_parser.feed(dir_response.read().decode('utf-8'))
for date_str in dir_parser.entries:
results.add(date_str)
return results
def get_basenames(dir_name):
logger.debug('get_basenames:%s:%s', options['address'], dir_name)
results = set()
file_parser = ListingParser(extensions=['avi'])
file_response = request('/sd/ch00/img/%s/AVI/' % dir_name)
file_parser.feed(file_response.read().decode('utf-8'))
for file_str in file_parser.entries:
results.add(file_str)
return results
def get_smi(dir_name, name):
logger.debug('get_smi:%s:%s:%s', options['address'], dir_name, name)
try:
response = request('/sd/ch00/img/%s/AVI/%s.smi' % (dir_name, name))
except urllib.error.HTTPError as e:
if e.code == 404:
logger.warning('smi file "%s" not found', e.url)
return None, None
raise
sami_parser = SAMIParser()
sami_parser.feed(response.read().decode('utf-8'))
return sami_parser.finalize()
def get_existing_ids(dir_name):
results = set()
subdir = dir_name.replace('_', '')
abs_dir_path = os.path.join(options['output_dir'], subdir)
if not os.path.isdir(abs_dir_path):
return results
for abs_path in glob.glob(os.path.join(abs_dir_path, '*_*_*.mp4')):
if abs_path.rsplit('.', 1)[-1] != 'tmp':
results.add(abs_path.split('.')[0].rsplit('_', 1)[-1])
logger.debug('get_existing_ids:%s:%s', dir_name, results)
return results
def is_remote_dir_recent(dir_name):
today = datetime.datetime.now().date()
date = datetime.datetime.strptime(dir_name, '%Y_%m_%d').date()
return today - date < datetime.timedelta(days=options['keep_days'])
def filter_existing_ids(dir_name, basenames):
existing_ids = get_existing_ids(dir_name)
return filter(
lambda x: x.split('_', 1)[0] not in existing_ids,
basenames
)
def download_avi(dir_name, name, output):
logger.debug('download_avi:%s:%s:%s:%s', options['address'], dir_name, name, output)
output_dir = os.path.dirname(output)
if not os.path.exists(os.path.dirname(output)):
os.makedirs(output_dir)
try:
urllib.request.urlretrieve('http://%s/sd/ch00/img/%s/AVI/%s.avi' % (options['address'], dir_name, name), output)
except urllib.error.HTTPError as e:
if e.code == 404:
logger.warning('avi file "%s" not found', e.url)
return
raise
def remux(src, dst):
logger.debug('remux:%s:%s', src, dst)
try:
subprocess.run(
[
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
'-i', src,
'-codec', 'copy',
dst
],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
)
except subprocess.CalledProcessError as e:
logger.error('remux:failed:%s:%s', src, dst)
for line in e.stderr.decode('utf-8').split('\n'):
logger.critical('remux:ffmpeg:%s', line.strip())
raise e
def update_playlist(dir_name):
results = set()
subdir = dir_name.replace('_', '')
abs_dir_path = os.path.join(options['output_dir'], subdir)
if not os.path.isdir(abs_dir_path):
return
for abs_path in glob.glob(os.path.join(abs_dir_path, '*_*_*.mp4')):
if abs_path.rsplit('.', 1)[-1] != 'tmp':
results.add(os.path.basename(abs_path))
with open(os.path.join(abs_dir_path, 'playlist.m3u'), 'w') as f:
for result in sorted(results):
f.write(result)
f.write('\n')
def main():
if len(sys.argv) == 3 and sys.argv[1] == 'update_playlists':
options['output_dir'] = sys.argv[2]
logger.debug('update_playlists:%s', options['output_dir'])
for d in os.listdir(os.path.abspath(options['output_dir'])):
update_playlist(d)
exit(0)
if len(sys.argv) == 5 and sys.argv[1] == 'reboot':
_, _, address, username, password = sys.argv
logger.debug('reboot:%s', address)
options['address'] = address
authenticate('/cgi-bin/stw.cgi', username, password)
request('/cgi-bin/stw.cgi', b'%3CSetReboot%2F%3E')
exit(0)
if len(sys.argv) < 5:
sys.stderr.write('usage: %s <camera address> <current admin password> <output directory>\n' % sys.argv[0])
exit(1)
_, address, username, password, output_dir = sys.argv
options['address'] = address
options['output_dir'] = output_dir
avi_filepath = None
h264_filepath = None
mp4_filepath = None
socket.setdefaulttimeout(options['socket_timeout'])
if not check_single_instance():
logger.warning('check_single_instance:false')
return 0
remove_old_dirs()
try:
authenticate('/sd/', username, password)
for dir_name in sorted(filter(is_remote_dir_recent, get_dirs())):
for name in sorted(filter_existing_ids(dir_name, get_basenames(dir_name))):
file_id, date, time = name.split('_')
if not file_id or not date or not time:
logger.warning('%s:no file_id or date or time', name)
continue
start_datetime_str, end_datetime_str = get_smi(dir_name, name)
if not start_datetime_str or not end_datetime_str:
logger.warning('%s:no start_datetime_str or end_datetime_str', name)
continue
start_time = (
datetime.datetime.strptime(
start_datetime_str, '%Y-%m-%d %H:%M:%S'
) - datetime.timedelta(seconds=1)
)
end_time = (
datetime.datetime.strptime(
end_datetime_str, '%Y-%m-%d %H:%M:%S'
) + datetime.timedelta(seconds=1)
)
abs_subdir = os.path.join(
os.path.abspath(options['output_dir']),
start_time.strftime('%Y%m%d')
)
basename = '%s_%s_%s' % (
start_time.strftime('%H%M%S'),
end_time.strftime('%H%M%S'),
file_id
)
abs_basepath = os.path.join(abs_subdir, basename)
avi_filepath = '%s.avi' % abs_basepath
h264_filepath = '%s.h264' % abs_basepath
mp4_filepath = '%s.mp4' % abs_basepath
download_avi(dir_name, name, avi_filepath)
remux(avi_filepath, h264_filepath)
os.path.exists(avi_filepath) and os.remove(avi_filepath)
avi_filepath = None
remux(h264_filepath, mp4_filepath)
mp4_filepath = None
os.path.exists(h264_filepath) and os.remove(h264_filepath)
h264_filepath = None
update_playlist(abs_subdir)
except BaseException as e:
for filepath in (avi_filepath, h264_filepath, mp4_filepath):
filepath and os.path.exists(filepath) and os.remove(filepath)
if not isinstance(e, KeyboardInterrupt):
raise e
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment