Skip to content

Instantly share code, notes, and snippets.

@gene1wood
Created May 10, 2020 20:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gene1wood/965baa710afb88e986feb57735f4ce13 to your computer and use it in GitHub Desktop.
Save gene1wood/965baa710afb88e986feb57735f4ce13 to your computer and use it in GitHub Desktop.
Script to trigger a Subsonic media rescan with locking
#!/bin/env python
import sys
import requests
import os
import os.path
import time
import logging
import subprocess
import json
def trigger_scan(subsonic, squeeze):
result = []
s = requests.Session()
if subsonic:
logging.debug("Logging in to subsonic")
r = s.post('%s/j_acegi_security_check' % config['base_subsonic_url'],
{'j_username': config['subsonic_username'],
'j_password': config['subsonic_password'],
'submit': 'Log+in'})
logging.debug("Telling subsonic to rescan")
r = s.get('%s/musicFolderSettings.view?scanNow' % config['base_subsonic_url'])
result.append(r.status_code)
if squeeze:
logging.debug("Telling squeeze to rescan")
r = s.get('%s:9000/settings/index.html?p0=rescan' % config['base_squeeze_url'])
result.append(r.status_code)
return result
def touch(fname, times=None):
try:
with open(fname, 'a'):
os.utime(fname, times)
except:
open(fname, 'a').close()
logging.basicConfig(
filename='/var/log/rescan_subsonic_media.log',
level=logging.DEBUG,
format='%(asctime)s %(message)s')
with open('/etc/rescan_subsonic_media_config.json') as f:
config = json.load(f)
LAST_SCAN_FILENAME = '/var/local/rescan_subsonic_media'
SCAN_LOCK = '/var/local/rescan_subsonic_media.lock'
QUEUED_SCAN_TRIGGERED_DIRNAME = '/var/local/rescan_subsonic_media.trigger_queued'
TIME_DELTA = 60 * 5 # 5 minutes
DRY_RUN = False
TRIGGERED_KEYWORD = 'triggered'
try:
last_scan = os.path.getmtime(LAST_SCAN_FILENAME)
# We could instead look for the LastScanned entry in /var/subsonic/subsonic.properties
# which contains epoch milliseconds
except:
logging.error("Unexpected error: %s" % sys.exc_info()[0])
last_scan = 0
time_since_last_scan = time.time() - last_scan
logging.debug("Time since last scan at %s is %s"
% (last_scan, time_since_last_scan))
triggered = False
if len(sys.argv) > 1 and sys.argv[1] == TRIGGERED_KEYWORD:
logging.debug("This is a triggered run called by a scheduled at task")
os.rmdir(QUEUED_SCAN_TRIGGERED_DIRNAME)
triggered = True
if (time_since_last_scan > TIME_DELTA) or triggered:
# Over TIME_DELTA has elapsed
# or this is a triggered call
# Rescan
try:
os.mkdir(SCAN_LOCK)
touch(LAST_SCAN_FILENAME)
result = "dryrun" if DRY_RUN else trigger_scan(True, False)
logging.info("Scanning %s" % result)
os.rmdir(SCAN_LOCK)
except:
logging.debug("Skipping rescan as %s exists implying a rescan is "
"already running" % SCAN_LOCK)
pass
else:
# Not enough time has elapsed since the last scan
try:
os.mkdir(QUEUED_SCAN_TRIGGERED_DIRNAME)
queue_run = True
except:
queue_run = False
if queue_run:
# Though a rescan has already been started, since another filesystem
# change has occured, we'll queue up a run for 4 minutes from now
process = subprocess.Popen(
'at now + 4 minute'.split(),
stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout_data = process.communicate(
input='/usr/bin/python %s %s'
% (os.path.abspath(__file__), TRIGGERED_KEYWORD))[0]
logging.debug("Skipping scan but queuing one for 4 minutes")
else:
logging.debug("Skipping scan. One's already queued")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment