Created
April 18, 2017 20:46
-
-
Save mdchaney/07d391a2bb8eca9db3073ac40949a78f to your computer and use it in GitHub Desktop.
Pure Ruby parser for ID3 v2.3 tags
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# By Michael Chaney, mdchaney@michaelchaney.com | |
# Copyright 2017, Michael Chaney Consulting Corporation, All Rights Reserved | |
# | |
# Released under GPL v2 License or MIT License | |
# | |
# Simple parser for ID3 v2.3 tags in MP3 files. Provide the filename | |
# on the command line. | |
ENCODINGS = [ | |
Encoding::ISO_8859_1, | |
Encoding::UTF_16, | |
Encoding::UTF_16, | |
Encoding::UTF_8 | |
] | |
DOUBLE_NULLS = [0,0].pack('CC') | |
SINGLE_NULL = [0].pack('C') | |
def correct_encoding(encoding_byte, string) | |
if string | |
string.force_encoding(ENCODINGS[encoding_byte]).encode(Encoding::UTF_8) | |
else | |
'' | |
end | |
end | |
def get_encoded_text(raw_frame_payload) | |
encoding_byte, payload = raw_frame_payload.unpack('C a*') | |
correct_encoding(encoding_byte, payload) | |
end | |
def get_encoded_description_and_text(raw_frame_payload) | |
encoding_byte, description, payload = raw_frame_payload.unpack('C Z* a*') | |
return [description.force_encoding(Encoding::ISO_8859_1), correct_encoding(encoding_byte, payload)] | |
end | |
def get_encoded_language_description_and_text(raw_frame_payload) | |
encoding_byte, language, description, payload = raw_frame_payload.unpack('C a3 Z* a*') | |
if payload[0] == "\0" | |
payload.slice!(0,1) | |
description += DOUBLE_NULLS | |
end | |
return [language.force_encoding(Encoding::ISO_8859_1), correct_encoding(encoding_byte, description), correct_encoding(encoding_byte, payload)] | |
end | |
def parse_apic(raw_frame_payload) | |
encoding_byte, mime_type, picture_type, description, picture = raw_frame_payload.unpack('C Z* C Z* a*') | |
if picture[0] == "\0" | |
picture.slice!(0,1) | |
description += DOUBLE_NULLS | |
end | |
return [mime_type.force_encoding(Encoding::ISO_8859_1), picture_type, correct_encoding(encoding_byte, description), picture] | |
end | |
def parse_id3v2_header(header) | |
id, major_version, minor_version, raw_flags, sizeb = header.unpack('A3 C C b8 b32') | |
size = [sizeb[24..30]+sizeb[16..22]+sizeb[8..14]+sizeb[0..6]+'0000'].pack('b32').unpack('l').first | |
flags = Hash.new | |
flags[:unsynchronization] = (raw_flags[0] == '1') | |
flags[:extended_header] = (raw_flags[1] == '1') | |
flags[:experimental] = (raw_flags[2] == '1') | |
return { id: id, major_version: major_version, minor_version: minor_version, flags: flags, raw_flags: raw_flags, size: size } | |
end | |
def parse_id3v2_extended_header(ext_header) | |
size, raw_flags, padding_size = ext_header.unpack("N b16 N") | |
flags = Hash.new | |
flags[:has_crc32] = (raw_flags[0] == '1') | |
if raw_flags[0] == '1' | |
crc32 = ext_header.slice(10..13).unpack('N').first | |
else | |
crc32 = nil | |
end | |
return { size: size, flags: flags, raw_flags: raw_flags, crc32: crc32, padding_size: padding_size } | |
end | |
def parse_headers_and_frames(header, tag) | |
header_info = parse_id3v2_header(header) | |
if header_info[:flags][:extended_header] | |
ext_header_info = parse_id3v2_extended_header(tag) | |
header_info[:flags].merge!(ext_header_info[:flags]) | |
header_info[:extended_header_present] = true | |
header_info[:extended_header_size] = ext_header_info[:size] | |
header_info[:extended_header_raw_flags] = ext_header_info[:raw_flags] | |
header_info[:padding_size] = ext_header_info[:padding_size] | |
header_info[:crc32] = ext_header_info[:crc32] | |
extended_header = tag.slice(0,ext_header_info[:size]) | |
raw_frames = tag.slice(ext_header_info[:size], tag.size - ext_header_info[:size] - ext_header_info[:padding_size]) | |
else | |
header_info[:extended_header_present] = false | |
header_info[:padding_size] = 0 | |
header_info[:crc32] = nil | |
extended_header = nil | |
raw_frames = tag | |
end | |
return { header: header, extended_header: extended_header, parsed_header: header_info, raw_frames: raw_frames } | |
end | |
def extract_id3v2_from_mp3(filename) | |
# Need to find all possible id3 tag sets - id3v2 and beginning and/or | |
# end, and id3v1 at end. id3v1 is ignored, but the v2 footer will | |
# precede it if it's present. | |
File.open(filename, 'rb') do |file| | |
begin | |
has_v1 = false | |
chunk = file.read(10) | |
if !chunk || chunk.size < 10 | |
return { error: "Not an MP3 file", valid: false, filename: filename } | |
end | |
# If it's a v2 tag at the start, read the rest of it and return it all. | |
if chunk[0..2] == 'ID3' | |
id3_header_info = parse_id3v2_header(chunk) | |
#STDERR.puts "Has a v2 tag at start, #{id3_header_info[:size]+10} bytes" if DEBUG | |
tag = file.read(id3_header_info[:size]) | |
return parse_headers_and_frames(chunk, tag) | |
end | |
# Check for v1 tag at the end | |
file.seek(-128, IO::SEEK_END) | |
chunk = file.read(3) | |
if !chunk || chunk.size < 3 | |
return { error: "Not an MP3 file", valid: false, filename: filename } | |
end | |
if chunk[0..2] == 'TAG' | |
has_v1 = true | |
#STDERR.puts "Has a v1 tag at end, 128 bytes" if DEBUG | |
end | |
# Check for v2 tag at the end | |
if has_v1 | |
file.seek(-138, IO::SEEK_END) | |
else | |
file.seek(-10, IO::SEEK_END) | |
end | |
chunk = file.read(10) | |
if !chunk || chunk.size < 10 | |
return { error: "Not an MP3 file", valid: false, filename: filename } | |
end | |
if chunk[0..2] == '3DI' | |
id3_header_info = parse_id3v2_header(chunk) | |
file.seek(-id3_header_info[:size]-10, IO::SEEK_CUR) | |
#STDERR.puts "Has a v2 tag at end, #{id3_header_info[:size]+10} bytes" if DEBUG | |
tag = file.read(id3_header_info[:size]) | |
return parse_headers_and_frames(chunk, tag) | |
end | |
end | |
end | |
return { error: "No tag found", valid: false } | |
end | |
def parse_id3v2_frame_header(frame) | |
frame_id, size, raw_flags = frame.unpack("A4 N b16") | |
next_six_bytes = frame.slice(10,6) | |
additional_size = 0 | |
flags = Hash.new | |
flags[:tag_alter_preservation] = (raw_flags[0] == '1') | |
flags[:file_alter_preservation] = (raw_flags[1] == '1') | |
flags[:read_only] = (raw_flags[2] == '1') | |
flags[:compression] = (raw_flags[8] == '1') | |
flags[:encryption] = (raw_flags[9] == '1') | |
flags[:grouping_identity] = (raw_flags[10] == '1') | |
if flags[:compression] | |
additional_size += 4 | |
decompressed_size = next_six_bytes.slice!(0,4).unpack('N').first | |
else | |
decompressed_size = nil | |
end | |
if flags[:encryption] | |
additional_size += 1 | |
encryption_byte = next_six_bytes.slice!(0,1).unpack('C').first | |
else | |
encryption_byte = nil | |
end | |
if flags[:grouping_identity] | |
additional_size += 1 | |
grouping_identity = next_six_bytes.slice!(0,1).unpack('C').first | |
else | |
grouping_identity = nil | |
end | |
return { frame_id: frame_id, size: size, additional_size: additional_size, flags: flags, raw_flags: raw_flags, decompressed_size: decompressed_size, encryption_byte: encryption_byte, grouping_identity: grouping_identity } | |
end | |
def parse_id3v2_frame(raw_frame_header, raw_frame) | |
header = parse_id3v2_frame_header(raw_frame_header+raw_frame) | |
puts "Frame #{header[:frame_id]}, #{header[:raw_flags]}, #{header[:size]} bytes" | |
#puts " " + raw_frame.unpack('H*').first.split(/(..)/).join(' ').gsub(/ /,' ') | |
# Reversed with [str.gsub(/ /,'')].pack('H*').force_encoding(Encoding::US_ASCII) | |
if header[:additional_size] > 0 | |
raw_frame.slice!(0, header[:addtional_size]) | |
end | |
language = description = text = nil | |
if %w{ TXXX IPLS }.include?(header[:frame_id]) | |
description, text = get_encoded_description_and_text(raw_frame) | |
elsif header[:frame_id] =~ /T.../ | |
text = get_encoded_text(raw_frame) | |
elsif header[:frame_id] == 'USLT' | |
language, description, text = get_encoded_language_description_and_text(raw_frame) | |
elsif header[:frame_id] == 'USER' | |
elsif header[:frame_id] == 'UFID' | |
elsif header[:frame_id] == 'WXXX' | |
text = get_encoded_text(raw_frame) | |
elsif header[:frame_id] =~ /W.../ | |
text = raw_frame | |
elsif header[:frame_id] == 'APIC' | |
mime_type, picture_type, description, picture = parse_apic(raw_frame) | |
puts " MIME Type: #{mime_type}" | |
puts " Picture Type: #{picture_type}" | |
elsif header[:frame_id] == 'COMM' | |
language, description, text = get_encoded_language_description_and_text(raw_frame) | |
else | |
puts "Unknown frame id: #{header[:frame_id]}" | |
end | |
puts " Language: #{language}" if language | |
puts " Description: #{description}" if description | |
if text =~ /\n/u | |
puts " Text:" | |
puts " " + text.gsub(/\n/u, "\n ") | |
else | |
puts " Text: #{text}" | |
end | |
end | |
unless ARGV.count == 1 | |
STDERR.puts "Usage: #{$0} filename" | |
exit 1 | |
end | |
info = extract_id3v2_from_mp3(ARGV.first) | |
if !info || !info[:header] | |
STDERR.puts "Some sort of error" | |
if info && info[:error] | |
STDERR.puts " #{info[:error]}" | |
end | |
exit 1 | |
end | |
puts "Found ID3v2 tag 2.#{info[:parsed_header][:major_version]}.#{info[:parsed_header][:minor_version]}, #{info[:parsed_header][:size]} bytes" | |
if info[:parsed_header][:id] == '3DI' | |
puts " (trailing tag)" | |
end | |
puts "Raw flags: #{info[:parsed_header][:raw_flags]}" | |
puts "Flags:" | |
if info[:parsed_header][:flags][:unsynchronization] | |
puts " Unsynchronization (x)" | |
else | |
puts " Unsynchronization ( )" | |
end | |
if info[:parsed_header][:flags][:extended_header] | |
puts " Extended Header (x)" | |
else | |
puts " Extended Header ( )" | |
end | |
if info[:parsed_header][:flags][:experimental] | |
puts " Experimental (x)" | |
else | |
puts " Experimental ( )" | |
end | |
if info[:parsed_header][:extended_header_present] | |
puts "Extended header size: #{info[:extended_header_size]}" | |
puts "Extended raw flags: #{info[:extended_header_raw_flags]}" | |
if info[:flags][:has_crc32] | |
puts " CRC data present (x)" | |
else | |
puts " CRC data present ( )" | |
end | |
if info[:crc32] | |
puts "CRC32: #{info[:crc32]}" | |
end | |
puts "Padding size: #{info[:padding_size]}" | |
end | |
remaining_raw_frames = info[:raw_frames] | |
while remaining_raw_frames.size > 0 | |
break if remaining_raw_frames =~ /\A\0+\z/ | |
raw_frame_header = remaining_raw_frames.slice!(0,10) | |
parsed_frame_header = parse_id3v2_frame_header(raw_frame_header) | |
if parsed_frame_header[:frame_id] && parsed_frame_header[:size] > 0 | |
raw_frame = remaining_raw_frames.slice!(0,parsed_frame_header[:size]) | |
parse_id3v2_frame(raw_frame_header, raw_frame) | |
else | |
raise "Invalid junk (#{remaining_raw_frames.size} bytes) found: #{remaining_raw_frames.slice(0,10)}" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment