Skip to content

Instantly share code, notes, and snippets.

@deliro
Created February 7, 2017 16:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deliro/d121df87b05ecffc7010e67b7231220b to your computer and use it in GitHub Desktop.
Save deliro/d121df87b05ecffc7010e67b7231220b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import csv
import os
import re
import subprocess
import sys
from collections import OrderedDict, namedtuple
from datetime import datetime, timedelta
Timing = namedtuple('Timing', 'start end')
class Subtitle:
__slots__ = ['number', 'timing', 'text']
def __init__(self, number, timing, text):
self.timing = Timing(*timing)
self.number = number
self.text = text
def get_duration(self):
return self.timing.end - self.timing.start
def __str__(self):
return self.text
class SyncedSubtitle:
def __init__(self, sub_en, sub_ru):
self.sub_en = sub_en
self.sub_ru = sub_ru
def __iter__(self):
yield self.sub_en
yield self.sub_ru
class SubtitleParser:
@classmethod
def parse_file(cls, srt_file):
with open(srt_file, 'r') as fp:
return cls().parse(fp.read())
def parse(self, srt_data):
result = []
for block in srt_data.split('\n\n'):
parsed = self.parse_block(block)
if parsed:
result.append(parsed)
return result
def parse_block(self, block):
if not block.strip():
return None
number, timing, text, *_ = block.split('\n')
return Subtitle(
number=number, text=self.parse_text(text),
timing=self.parse_timing(timing))
def parse_text(self, text):
text = text.lstrip('...').replace('\n', ' ').strip()
return re.sub('<[^>]*>', '', text)
def parse_timing(self, timing):
start, end = timing.split(' --> ')
return self.parse_time(start), self.parse_time(end)
def parse_time(self, time_str):
time_str = time_str.replace(',', '.')
t = datetime.strptime(time_str, '%H:%M:%S.%f')
return timedelta(hours=t.hour, minutes=t.minute,
seconds=t.second, microseconds=t.microsecond)
class SubtitleSquasher:
@classmethod
def run(cls, *args, **kwargs):
return cls().sync(*args, **kwargs)
def sync(self, en_subs, ru_subs, offset=None):
sync_edge = timedelta(microseconds=200000)
synced = []
if offset:
self.add_offset(ru_subs, timedelta(seconds=offset))
for en_sub in en_subs:
gen = (ru_sub for ru_sub in ru_subs
if (max(ru_sub.timing.start, en_sub.timing.start) -
min(ru_sub.timing.end, en_sub.timing.end)) < sync_edge)
try:
ru_pair = next(gen)
except StopIteration:
ru_pair = Subtitle(
timing=(en_sub.timing.start, en_sub.timing.end),
number=None, text='')
synced.append(SyncedSubtitle(en_sub, ru_pair))
return self.squash(synced)
def add_offset(self, subs, offset):
for sub in subs:
sub.timing.start += offset
sub.timing.end += offset
def squash(self, synced):
result, buffer_en, buffer_ru = [], [], []
max_duration = timedelta(seconds=5)
for item in synced:
buffer_en.append(item.sub_en)
buffer_ru.append(item.sub_ru)
duration_en = sum((s.get_duration() for s in buffer_en), timedelta())
duration_ru = sum((s.get_duration() for s in buffer_ru), timedelta())
if (duration_en >= max_duration and duration_ru >= max_duration) \
and self.is_complete_text(item.sub_en.text) \
and self.is_complete_text(item.sub_ru.text):
result.append(SyncedSubtitle(self.merge(buffer_en), self.merge(self.unique(buffer_ru))))
buffer_en, buffer_ru = [], []
if buffer_en and buffer_ru:
result.append(SyncedSubtitle(self.merge(buffer_en), self.merge(self.unique(buffer_ru))))
return result
def merge(self, cards):
if len(cards) == 1:
return cards[0]
number = '{}-{}'.format(cards[0].number, cards[-1].number)
timing = (cards[0].timing.start, cards[-1].timing.end)
text = ' '.join(c.text for c in cards)
return Subtitle(number=number, timing=timing, text=text)
def unique(self, subs):
ids = set()
result = []
for sub in subs:
if sub.number not in ids:
result.append(sub)
ids.add(sub.number)
return result
def is_complete_text(self, text):
return text.endswith(('.', '!', '?')) and not text.endswith('...')
class FFmpegError(Exception):
pass
class MediaExtractor:
def __init__(self, media_dir):
self.screenshot_mapping = OrderedDict()
self.media_dir = media_dir
def mkdir(self):
try:
os.mkdir(self.media_dir)
return True
except FileExistsError:
return False
def run_command(self, command, **kwargs):
result = subprocess.run(
command.format(**kwargs), shell=True, stderr=subprocess.PIPE)
if result.stderr:
raise FFmpegError(result.stderr.decode('utf-8'))
def extract_screenshots(self, source_file):
if self.mkdir():
self.run_command(
'ffmpeg -y -i "{source}" -vf scale=320:-1,fps=1/10 -qscale:v 10 -loglevel fatal "{output}"',
source=source_file, output=os.path.join(self.media_dir, '%d.jpg'))
files_count = len(os.listdir(self.media_dir))
self.screenshot_mapping = OrderedDict((n * 10, '{}.jpg'.format(n)) for n in range(1, files_count))
def get_screenshot(self, start_time):
result = None
for key, value in self.screenshot_mapping.items():
if key >= start_time.total_seconds():
result = (key, value)
break
if result:
key, value = result
del self.screenshot_mapping[key]
return '{}.{}'.format(self.media_dir, value)
def main(args):
ru_subs = SubtitleParser.parse_file(args.front)
en_subs = SubtitleParser.parse_file(args.back)
with open(args.name + '.csv', 'w') as output:
writer = csv.writer(output)
media = MediaExtractor(args.name)
if args.screenshots:
media.extract_screenshots(source_file=args.screenshots)
for front, back in SubtitleSquasher.run(ru_subs, en_subs, offset=args.offset):
row_data = [front.text, back.text if back else '']
if args.screenshots:
screenshot_file = media.get_screenshot(front.timing.start)
row_data.append('<img src="{}" />'.format(screenshot_file))
writer.writerow(row_data)
sys.stdout.write('Done\n')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--front', dest='front', help='Subtitle file for the front side of cards')
parser.add_argument('-b', '--back', dest='back', help='Subtitle file for the back side of cards')
parser.add_argument('-o', '--offset', dest='offset', type=int, help='Subtitle offset')
parser.add_argument('-n', '--name', dest='name', help='Deck name')
parser.add_argument('-s', '--screenshots', dest='screenshots')
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment