Created
July 12, 2024 06:12
-
-
Save apage43/3dba2ba4c4077d2a7231c05832473b9e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from collections import Counter | |
import os | |
from struct import unpack | |
from dataclasses import dataclass | |
@dataclass | |
class Mp3Frame: | |
data: bytes | |
bitrate: int | |
sampling_rate: int | |
version: float | |
layer: int | |
def mp3_frames(file_handle): | |
def read_frame_header(file_handle): | |
header_bytes = file_handle.read(4) | |
if len(header_bytes) < 4: | |
return None | |
header, = unpack('>I', header_bytes) | |
version_bits = (header >> 19) & 0b11 | |
if version_bits == 0b00: | |
version = 2.5 | |
elif version_bits == 0b10: | |
version = 2 | |
elif version_bits == 0b11: | |
version = 1 | |
else: | |
return None | |
layer_bits = (header >> 17) & 0b11 | |
if layer_bits == 0b01: | |
layer = 3 | |
elif layer_bits == 0b10: | |
layer = 2 | |
elif layer_bits == 0b11: | |
layer = 1 | |
else: | |
return None | |
bitrate_index = (header >> 12) & 0b1111 | |
bitrates = { | |
1: [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320], | |
2: [0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384], | |
2.5: [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224] | |
} | |
if bitrate_index >= len(bitrates[version]): | |
return None | |
bitrate = bitrates[version][bitrate_index] * 1000 | |
sampling_rate_index = (header >> 10) & 0b11 | |
sampling_rates = { | |
1: [44100, 48000, 32000], | |
2: [22050, 24000, 16000], | |
2.5: [11025, 12000, 8000] | |
} | |
if sampling_rate_index >= len(sampling_rates[version]): | |
return None | |
sampling_rate = sampling_rates[version][sampling_rate_index] | |
padding_bit = (header >> 9) & 0b1 | |
if layer == 1: | |
frame_size = (12 * bitrate // sampling_rate + padding_bit) * 4 | |
else: | |
frame_size = 144 * bitrate // sampling_rate + padding_bit | |
return frame_size, header_bytes, bitrate, sampling_rate, version, layer | |
while True: | |
frame_info = read_frame_header(file_handle) | |
if frame_info is None: | |
break | |
frame_size, header_bytes, bitrate, sampling_rate, version, layer = frame_info | |
frame_data = file_handle.read(frame_size - 4) | |
if len(frame_data) < frame_size - 4: | |
break | |
yield Mp3Frame(header_bytes + frame_data, bitrate, sampling_rate, version, layer) | |
def seek_past_id3_tag(file_handle): | |
header = file_handle.read(10) | |
if len(header) < 10 or header[:3] != b'ID3': | |
file_handle.seek(0) | |
return | |
size_bytes = header[6:10] | |
size = ((size_bytes[0] & 0x7F) << 21) | ((size_bytes[1] & 0x7F) << 14) | ((size_bytes[2] & 0x7F) << 7) | (size_bytes[3] & 0x7F) | |
total_size = size + 10 | |
file_handle.seek(total_size) | |
def process_input_files(input_files, output_file, uncommon_dir): | |
all_frames = [] | |
common_frames = None | |
first_file_frames = None | |
for file_path in input_files: | |
with open(file_path, 'rb') as f: | |
seek_past_id3_tag(f) | |
frames = list(mp3_frames(f)) | |
frame_data = [frame.data for frame in frames] | |
all_frames.append(frame_data) | |
if common_frames is None: | |
common_frames = set(frame_data) | |
first_file_frames = frame_data | |
else: | |
common_frames &= set(frame_data) | |
if output_file: | |
assert first_file_frames is not None | |
assert common_frames is not None | |
with open(output_file, 'wb') as out_f: | |
for frame_data in first_file_frames: | |
if frame_data in common_frames: | |
out_f.write(frame_data) | |
if uncommon_dir: | |
os.makedirs(uncommon_dir, exist_ok=True) | |
frame_counts = Counter(frame for frames in all_frames for frame in frames) | |
for i, frames in enumerate(all_frames): | |
output_file = os.path.join(uncommon_dir, f"unique_frames_{i+1}.mp3") | |
with open(output_file, 'wb') as out_f: | |
for frame_data in frames: | |
if frame_counts[frame_data] == 1: | |
out_f.write(frame_data) | |
def main(): | |
parser = argparse.ArgumentParser(description='Process MP3 frames from multiple input files.') | |
parser.add_argument('input_files', nargs='+', type=str, help='Input MP3 file paths') | |
parser.add_argument('--output', type=str, help='Output file path to write common frames') | |
parser.add_argument('--uncommon', type=str, help='Output directory for unique frames per file') | |
args = parser.parse_args() | |
process_input_files(args.input_files, args.output, args.uncommon) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment