Skip to content

Instantly share code, notes, and snippets.

@dubhater
Last active August 29, 2015 13:58
Show Gist options
  • Save dubhater/9933829 to your computer and use it in GitHub Desktop.
Save dubhater/9933829 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# The idea is that tfm will put nc matches (n[nc]cc) in the correct position a lot more often than in other positions.
# TODO:
# remove unneeded leftovers
# add a separate mode of operation, so to speak, which only searches for cycles without decimation overrides
# output some stats: number of sections analysed, number of sections modified, maybe number of sections with a particular pattern
# print a summary of the actions before doing anything ("analysing only the error log. using three n matches always/never/when vmetric is lower. dropping the first duplicate/the duplicate with higher vmetric.")
import argparse
#import copy
# FIXME: shit breaks if two ini sections aren't separated by an empty line
def parse_yap(input_file):
section_header_expected = 0
matches_expected = 1
metrics_expected = 2
decimate_expected = 3
sections_expected = 4
decimatemetrics_expected = 5
errorlog_expected = 6
other_expected = 7
original_matches_expected = 8
postprocess_expected = 9
yap = {}
state = section_header_expected
section_header = ""
f_in = open(input_file, "r")
for line in f_in:
line = line.rstrip("\r\n")
if state == section_header_expected:
if line.startswith("[") and line.endswith("]"):
if line.startswith("[MATCHES]"):
state = matches_expected
elif line.startswith("[ORIGINALMATCHES]"):
state = original_matches_expected
elif line.startswith("[METRICS]"):
state = metrics_expected
elif line.startswith("[DECIMATE]"):
state = decimate_expected
elif line.startswith("[SECTIONS]"):
state = sections_expected
elif line.startswith("[DECIMATEMETRICS]"):
state = decimatemetrics_expected
elif line.startswith("[ERROR LOG]"):
state = errorlog_expected
elif line.startswith("[POSTPROCESS]"):
state = postprocess_expected
else:
state = other_expected
section_header = line
yap[section_header] = []
continue
if state == matches_expected:
if len(line) > 0:
yap[section_header].append(line)
else:
state = section_header_expected
continue
if state == original_matches_expected:
if len(line) > 0:
yap[section_header].append(line)
else:
state = section_header_expected
continue
if state == metrics_expected:
if len(line) > 0:
# order is n c p n c p
metrics = line.split(" ")
for i in range(len(metrics)):
metrics[i] = int(metrics[i])
yap[section_header].append(metrics)
else:
state = section_header_expected
continue
if state == decimate_expected:
if len(line) > 0:
yap[section_header].append(int(line))
else:
state = section_header_expected
continue
if state == sections_expected:
if len(line) > 0:
yap[section_header].append(int(line[:line.find(",")]))
else:
state = section_header_expected
continue
if state == decimatemetrics_expected:
if len(line) > 0:
yap[section_header].append(int(line))
else:
state = section_header_expected
continue
if state == postprocess_expected:
if len(line) > 0:
yap[section_header].append(int(line))
else:
state = section_header_expected
continue
if state == errorlog_expected:
if len(line) > 0:
first_space = line.find(" ")
err = {"start":int(line[:first_space]), "message":line[first_space+1:]}
yap[section_header].append(err)
else:
state = section_header_expected
continue
if state == other_expected:
if len(line) > 0:
yap[section_header].append(line)
else:
state = section_header_expected
continue
f_in.close()
return yap
def get_sections_from_error_log(yap):
error_log = yap["[ERROR LOG]"]
sections = []
for section in error_log:
if section["message"].startswith("Matching impossible (too short section)"):
continue
sections.append(section["start"])
return sections
# FIXME: the name is not entirely correct any more: it does more than just analysing sections
def analyse_sections(sections_to_analyse, yap, drop_high_vmetric, minimum_section_length):
analysis = []
sections = yap["[SECTIONS]"]
num_sections = len(sections)
matches = yap["[ORIGINALMATCHES]"]
frame_count = len(matches)
decimate = yap["[DECIMATE]"]
metrics = yap["[METRICS]"]
errorlog = yap["[ERROR LOG]"]
for section in sections_to_analyse:
start = section
end = 0
section_index = sections.index(start)
if section_index == num_sections - 1:
end = frame_count
else:
end = sections[section_index + 1]
if end - start + 1 < minimum_section_length:
short_section = { "start": start, "message": "Matching impossible (too short section): {}".format(start) }
errorlog.append(short_section)
continue
positions = [0, 0, 0, 0, 0]
total = 0
for i in range(start, end):
if i == frame_count - 1:
continue # end of the loop
if matches[i] == "n" and matches[i+1] == "c":
positions[i % 5] += 1
total += 1
analysed_section = {}
analysed_section["start"] = start
analysed_section["length"] = end - start
analysed_section["total nc matches"] = total
analysed_section["positions"] = positions
best = 0
next_best = 0
tmp = -1
for i in range(5):
if positions[i] > tmp:
tmp = positions[i]
best = i
tmp = -1
for i in range(5):
if i == best:
continue
if positions[i] > tmp:
tmp = positions[i]
next_best = i
best_percent = 0
next_best_percent = 0
if total > 0:
best_percent = positions[best] * 100 / total
next_best_percent = positions[next_best] * 100 / total
if best_percent >= 40.0 and best_percent - next_best_percent > 10.0:
analysed_section["my pick"] = best
if drop_high_vmetric:
drop_n = drop_c = 0
for i in range(start, end):
if i == frame_count - 1:
continue
if i % 5 == best:
vmetric_n = metrics[i][3]
vmetric_c = metrics[i+1][4]
if vmetric_n > vmetric_c:
drop_n += 1
else:
drop_c += 1
if drop_n > drop_c:
analysed_section["drop"] = best
else:
analysed_section["drop"] = (best + 1) % 5
else:
analysed_section["drop"] = best
else:
analysed_section["my pick"] = -1
failed_section = { "start": start, "message": "Matching failure at: {}".format(start) }
errorlog.append(failed_section)
yattas_pick = -1
for i in range(start, end):
try:
decimate.index(i)
yattas_pick = i % 5
break
except ValueError:
pass
analysed_section["yatta's pick"] = yattas_pick
analysis.append(analysed_section)
return analysis
def cycle_has_decimation_override(decimate, frame):
frame = frame // 5 * 5
for i in range(frame, frame + 5):
try:
decimate.index(i)
return True
except ValueError:
pass
return False
def guessed_patterns_to_yap(yap, analysis, three_n_matches):
patterns = ["ncccn", "nnccc", "cnncc", "ccnnc", "cccnn"]
if three_n_matches == "always":
patterns = ["nccnn", "nnccn", "nnncc", "cnnnc", "ccnnn"]
matches = yap["[MATCHES]"]
decimate = yap["[DECIMATE]"]
metrics = yap["[METRICS]"]
postprocess = yap["[POSTPROCESS]"]
errorlog = yap["[ERROR LOG]"]
for section in analysis:
pattern_index = section["my pick"]
if pattern_index == -1:
# we refused to pick a pattern for this section
continue
pattern = patterns[pattern_index]
drop = section["drop"] # frame to drop
start = section["start"]
length = section["length"]
# TODO: don't touch frames in the [NODECIMATE] section
# set matches and decimation overrides
for i in range(start, start + length):
if three_n_matches == "maybe" and pattern[i % 5] == "c" and pattern[(i + 1) % 5] == "n":
vmetric_c = metrics[i][4]
vmetric_n = metrics[i][3]
matches[i] = "n" if vmetric_n < vmetric_c else "c"
else:
matches[i] = pattern[i % 5]
if i % 5 == drop:
if (i < start // 5 * 5 + 5 or i > (start + length - 1) // 5 * 5) and cycle_has_decimation_override(decimate, i):
# the first cycle or the last cycle in the section
print("Two frames should be decimated in the cycle at {}.".format(i))
else:
decimate.append(i)
# remove section from error log
for err in errorlog:
if err["start"] == start and err["message"].startswith("Matching failure at"):
errorlog.remove(err)
# if the last frame has a much higher vmetric with c/n match than with p match, flag it for postprocessing
last = start + length - 1
match = pattern[last % 5]
vmetric_cn = metrics[last][3]
if match == "c":
vmetric_cn = metrics[last][4]
vmetric_p = metrics[last][5]
# ratio pulled out of my ass
if vmetric_cn > vmetric_p * 2:
postprocess.append(last)
# sort decimate and postprocess
decimate.sort()
postprocess.sort()
def get_sorting_key(asdf):
return asdf["length"]
def print_summary(analysis, yap):
sections = yap["[SECTIONS]"]
analysis.sort(key=get_sorting_key, reverse=True)
print("\n")
print("Number of sections analysed: {}".format(len(analysis)))
print("Total number of sections: {}\n".format(len(sections)))
agree_with_yatta = 0
yatta_and_me_total = 0
certain = 0
for section in analysis:
print("Section length: {}".format(section["length"]))
print("Section start: {}".format(section["start"]))
total = section["total nc matches"]
for i in range(5):
matches = section["positions"][i]
percentage = 0
if total > 0:
percentage = matches * 100 / total
print("Number of nc matches in position {}: {} ({:.3f}%)".format(i, matches, percentage))
print("Total number of nc matches found in this section: {}".format(total))
my_pick = section["my pick"]
yattas_pick = section["yatta's pick"]
print("My pick: {}".format(my_pick))
print("Yatta's pick: {}".format(yattas_pick))
if my_pick != -1 and yattas_pick != -1 and my_pick != yattas_pick:
print("Disagree.")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
if yattas_pick != -1 and my_pick != -1:
yatta_and_me_total += 1
if yattas_pick == my_pick:
agree_with_yatta += 1
if my_pick != -1:
certain += 1
print("Number of sections where we're confident enough about the pattern: {}.".format(certain))
if yatta_and_me_total > 0:
print("")
print("We agreed with yatta in {} out of {} sections ({:.3f}%).".format(agree_with_yatta, yatta_and_me_total, agree_with_yatta * 100 / yatta_and_me_total))
def write_yap(output_file, yap):
f_out = open(output_file, "w")
for section in yap:
f_out.write(section + "\n")
if section in ["[DECIMATE]", "[DECIMATEMETRICS]", "[POSTPROCESS]"]:
for line in yap[section]:
f_out.write(str(line) + "\n")
elif section == "[SECTIONS]":
for line in yap[section]:
# FIXME: Store the preset associated with each section
# instead of assuming it's always 0.
f_out.write(str(line) + ",0\n")
elif section == "[METRICS]":
for line in yap[section]:
f_out.write(" ".join([str(metric) for metric in line]) + "\n")
elif section == "[ERROR LOG]":
for line in yap[section]:
f_out.write(str(line["start"]) + " " + line["message"] + "\n")
else:
for line in yap[section]:
f_out.write(line + "\n")
f_out.write("\n")
f_out.close()
parser = argparse.ArgumentParser(description="Guess telecine patterns.")
parser.add_argument("-i", "--input-file", required=True, help="Yatta project to process.")
parser.add_argument("-o", "--output-file", help="The input file will be overwritten if this is not used.")
parser.add_argument("--three-n-matches", choices=["always", "never", "maybe"], default="never", help="Use three n matches instead of two, i.e. ccnnn instead of cccnn. Use in case of field blending. (default: %(default)s)")
parser.add_argument("--drop-high-vmetric", action="store_true", default=False, help="Drop the duplicate with consistently higher vmetric (more likely to be combed) instead of always dropping the first duplicate. Use in case of field blending.")
parser.add_argument("--errors-only", action="store_true", default=False, help="Only consider sections where Yatta's pattern guidance failed.")
parser.add_argument("--minimum-section-length", default=10, type=int, help="Sections shorter than this will be skipped. (default: %(default)s)")
args = parser.parse_args()
yap = parse_yap(args.input_file)
sections = yap["[SECTIONS]"]
if args.errors_only:
sections = get_sections_from_error_log(yap)
analysis = analyse_sections(sections, yap, args.drop_high_vmetric, args.minimum_section_length)
guessed_patterns_to_yap(yap, analysis, args.three_n_matches)
if args.output_file is None:
write_yap(args.input_file, yap)
else:
write_yap(args.output_file, yap)
#print_summary(analysis, yap)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment