Skip to content

Instantly share code, notes, and snippets.

@ThiefMaster
Last active September 29, 2015 13:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ThiefMaster/11269741 to your computer and use it in GitHub Desktop.
Save ThiefMaster/11269741 to your computer and use it in GitHub Desktop.
Small FUSE filesystem that maps TV series into subfolders so e.g. XBMC picks them up properly. Any file matching _EPISODE_RE will show up as a symlink in a folder named after the series. If there's a .srt file with the same name, another symlink is "created".
import logging
import os
import re
import sys
from errno import ENOENT, EINVAL
from stat import S_IFDIR, S_IFLNK
from fuse import FUSE, Operations, LoggingMixIn, FuseOSError
EPISODE_REGEXES = (re.compile(r'^(?P<series>.+?)\.? - S0*\d+E0*\d+ - .+\.mkv$'),
re.compile(r'^(?P<series>.+?)\.+S0*\d+E0*\d+.+\.mkv$'),
re.compile(r'^(?P<series>.+?)\.+0*\d+x0*\d+.+\.mkv$'))
NAME_MAP = {'Doctor Who': 'Doctor Who (2005)',
'The Flash': 'The Flash (2014)',
'Atlantis': 'Atlantis (2013)'}
def _iter_series(path):
for name in os.listdir(path):
for i, regex in enumerate(EPISODE_REGEXES):
m = regex.match(name)
if m:
series = m.group('series')
if i > 0:
series = series.replace('.', ' ')
series = series.replace('_', ' ')
series = series.title()
series = NAME_MAP.get(series, series)
yield os.path.join(path, name), name, series
def _get_series_names(path):
return {x[2] for x in _iter_series(path)}
def _get_episodes_by_series(root, series):
return [x for x in _iter_series(root) if x[2] == series]
def _get_episode_by_filename(root, series, filename):
episodes = _get_episodes_by_series(root, series)
for path, name, data in episodes:
if name == filename:
return path, name, data
return None
class SeriesFuse(Operations):
def __init__(self, root):
self.root = root
def getattr(self, path, fh=None):
# / is always a folder containing only folders
if path == '/':
return dict(st_mode=(S_IFDIR | 0o755), st_nlink=len(self.readdir(path, fh)))
segments = path[1:].split('/')
# /foo is either a folder containing only symlinks or ENOENT
if len(segments) == 1:
folder = path[1:]
if folder in _get_series_names(self.root):
mtime = int(max(os.path.getmtime(item[0]) for item in _get_episodes_by_series(self.root, folder)))
return dict(st_mode=(S_IFDIR | 0o755), st_nlink=2, st_mtime=mtime)
# /foo/bar is always a symlink
elif len(segments) == 2:
return dict(st_mode=(S_IFLNK | 0o777), st_nlink=1)
raise FuseOSError(ENOENT)
def readdir(self, path, fh):
default_files = {'.', '..'}
# / contains the list of series
if path == '/':
return default_files | _get_series_names(self.root)
segments = path[1:].split('/')
# /foo contains the episodes and possibly subtitles
if len(segments) == 1:
mkvs = set()
srts = set()
for item in _get_episodes_by_series(self.root, segments[0]):
# item is (path, name, data)
mkvs.add(item[1])
subpath = os.path.splitext(item[0])[0] + '.srt'
if os.path.exists(subpath):
srts.add(os.path.basename(subpath))
return default_files | mkvs | srts
raise FuseOSError(ENOENT)
def readlink(self, path):
if path == '/':
raise FuseOSError(EINVAL)
segments = path[1:].split('/')
if len(segments) == 2:
basename, ext = os.path.splitext(segments[1])
if ext == '.srt':
episode = _get_episode_by_filename(self.root, segments[0], basename + '.mkv')
if episode:
return os.path.splitext(episode[0])[0] + '.srt'
else:
episode = _get_episode_by_filename(self.root, segments[0], segments[1])
if episode:
return episode[0]
raise FuseOSError(EINVAL)
def main():
try:
mountpoint = sys.argv[1]
root = sys.argv[2]
except IndexError:
print('Usage: {} <mountpoint> <root>'.format(sys.argv[0]))
sys.exit(1)
FUSE(SeriesFuse(root), mountpoint, allow_other=True)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment