Skip to content

Instantly share code, notes, and snippets.

@apage43
Created July 12, 2024 06:12
Show Gist options
  • Save apage43/3dba2ba4c4077d2a7231c05832473b9e to your computer and use it in GitHub Desktop.
Save apage43/3dba2ba4c4077d2a7231c05832473b9e to your computer and use it in GitHub Desktop.
import argparse
from collections import Counter
import os
from struct import unpack
from dataclasses import dataclass
@dataclass
class Mp3Frame:
data: bytes
bitrate: int
sampling_rate: int
version: float
layer: int
def mp3_frames(file_handle):
def read_frame_header(file_handle):
header_bytes = file_handle.read(4)
if len(header_bytes) < 4:
return None
header, = unpack('>I', header_bytes)
version_bits = (header >> 19) & 0b11
if version_bits == 0b00:
version = 2.5
elif version_bits == 0b10:
version = 2
elif version_bits == 0b11:
version = 1
else:
return None
layer_bits = (header >> 17) & 0b11
if layer_bits == 0b01:
layer = 3
elif layer_bits == 0b10:
layer = 2
elif layer_bits == 0b11:
layer = 1
else:
return None
bitrate_index = (header >> 12) & 0b1111
bitrates = {
1: [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320],
2: [0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384],
2.5: [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224]
}
if bitrate_index >= len(bitrates[version]):
return None
bitrate = bitrates[version][bitrate_index] * 1000
sampling_rate_index = (header >> 10) & 0b11
sampling_rates = {
1: [44100, 48000, 32000],
2: [22050, 24000, 16000],
2.5: [11025, 12000, 8000]
}
if sampling_rate_index >= len(sampling_rates[version]):
return None
sampling_rate = sampling_rates[version][sampling_rate_index]
padding_bit = (header >> 9) & 0b1
if layer == 1:
frame_size = (12 * bitrate // sampling_rate + padding_bit) * 4
else:
frame_size = 144 * bitrate // sampling_rate + padding_bit
return frame_size, header_bytes, bitrate, sampling_rate, version, layer
while True:
frame_info = read_frame_header(file_handle)
if frame_info is None:
break
frame_size, header_bytes, bitrate, sampling_rate, version, layer = frame_info
frame_data = file_handle.read(frame_size - 4)
if len(frame_data) < frame_size - 4:
break
yield Mp3Frame(header_bytes + frame_data, bitrate, sampling_rate, version, layer)
def seek_past_id3_tag(file_handle):
header = file_handle.read(10)
if len(header) < 10 or header[:3] != b'ID3':
file_handle.seek(0)
return
size_bytes = header[6:10]
size = ((size_bytes[0] & 0x7F) << 21) | ((size_bytes[1] & 0x7F) << 14) | ((size_bytes[2] & 0x7F) << 7) | (size_bytes[3] & 0x7F)
total_size = size + 10
file_handle.seek(total_size)
def process_input_files(input_files, output_file, uncommon_dir):
all_frames = []
common_frames = None
first_file_frames = None
for file_path in input_files:
with open(file_path, 'rb') as f:
seek_past_id3_tag(f)
frames = list(mp3_frames(f))
frame_data = [frame.data for frame in frames]
all_frames.append(frame_data)
if common_frames is None:
common_frames = set(frame_data)
first_file_frames = frame_data
else:
common_frames &= set(frame_data)
if output_file:
assert first_file_frames is not None
assert common_frames is not None
with open(output_file, 'wb') as out_f:
for frame_data in first_file_frames:
if frame_data in common_frames:
out_f.write(frame_data)
if uncommon_dir:
os.makedirs(uncommon_dir, exist_ok=True)
frame_counts = Counter(frame for frames in all_frames for frame in frames)
for i, frames in enumerate(all_frames):
output_file = os.path.join(uncommon_dir, f"unique_frames_{i+1}.mp3")
with open(output_file, 'wb') as out_f:
for frame_data in frames:
if frame_counts[frame_data] == 1:
out_f.write(frame_data)
def main():
parser = argparse.ArgumentParser(description='Process MP3 frames from multiple input files.')
parser.add_argument('input_files', nargs='+', type=str, help='Input MP3 file paths')
parser.add_argument('--output', type=str, help='Output file path to write common frames')
parser.add_argument('--uncommon', type=str, help='Output directory for unique frames per file')
args = parser.parse_args()
process_input_files(args.input_files, args.output, args.uncommon)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment