Skip to content

Instantly share code, notes, and snippets.

@peteroupc
Last active December 4, 2020 01:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peteroupc/6c3276a988eae7429a39 to your computer and use it in GitHub Desktop.
Save peteroupc/6c3276a988eae7429a39 to your computer and use it in GitHub Desktop.
Reads metadata from an Ogg file
#!/usr/bin/ruby
# Written by Peter O.
# Any copyright to this work is released to the Public Domain.
# http://creativecommons.org/publicdomain/zero/1.0/
#
# If you like this, you should donate to Peter O.
# at: http://peteroupc.github.io/
#
# Usage:
# # takes an IO or IO-like object as a parameter,
# # assuming the object is at the beginning of the stream
# OggMetadata.getMetadata(io)
# # metadata is value returned from getMetadata
# OggMetadata.writeMetadata(metadata,inputFilename)
# OggMetadata.writeMetadata(metadata,inputFilename,outputFilename)
# OggMetadata.getAudioInfo(filename)
#
module OggMetadata
private
def self.crc32table()
c=0x80000000
ret=[0]
i=1; while i<256
c=(c<<1)^(((c & 0x80000000)==0) ? 0 : 0x04c11db7)
c&=0xFFFFFFFF
for j in 0...i
ret[i+j]=c^ret[j]
end
i<<=1
end
return ret
end
@@crc32table=self.crc32table()
def self.crc32(bytes)
ret=0
bytes.each_byte{|b|
ret=@@crc32table[b^(ret>>24)]^(ret<<8)
ret&=0xFFFFFFFF
}
return ret
end
def self.rechecksum(page)
if page[0x16,4].unpack("V")[0]!=0
# Set checksum to 0 if needed
page=page[0,0x16]+([0].pack("V"))+page[0x1A,page.length]
end
checksum=OggMetadata.pageChecksum(page)
# Set checksum to calculated value
page=page[0,0x16]+([checksum].pack("V"))+page[0x1A,page.length]
return page
end
def self.resequencePage(page,seq)
pageseq=page[0x12,4].unpack("V")[0]
if pageseq==seq
return page
end
page=page[0,0x12]+([seq].pack("V"))+page[0x16,page.length]
return rechecksum(page)
end
def self.removeFirstPacket(page,seqno,granularLow,granularHigh)
pagebytes=page.bytes
pagesec=pagebytes[0x1a]
sectionCount=0
packetSize=0
for i in 0...pagesec
packetSize+=pagebytes[0x1b+i]
sectionCount+=1
break if pagebytes[0x1b+i]!=0xFF
end
newSectionCount=pagesec-sectionCount
pageAfterPacket=0x1b+pagesec+packetSize
page=page[0,6]+
([granularLow,granularHigh].pack("VV"))+
page[0x0E,4]+
([seqno,0,newSectionCount].pack("VVC"))+
page[0x1b+sectionCount,newSectionCount]+
page[pageAfterPacket,page.length]
return rechecksum(page)
end
def self.encodePacket(page,packet,seqno,fw)
i=0
flags=(page[0][0]&0x02)
needsContinuation=false
pageCount=0
while i<packet.length || needsContinuation
subsize=[65025,packet.length-i].min
if i+subsize==packet.length && (page[0][0]&0x04)!=0
# Last page
flags|=0x04
end
pagedata="OggS"+[
0,flags,0,0,page[0][3],seqno,0].pack("CCVVVVV")
seqno+=1
if subsize==0
# Lacing of 0
pagedata+=[1,0].pack("CC")
needsContinuation=false
else
numSegments=(subsize+254)/255
segments=[numSegments]
tmpsize=subsize
for k in 0...numSegments
segsize=[255,tmpsize].min
segments.push(segsize)
tmpsize-=segsize
end
if subsize%255==0 && numSegments<255
# Payload size is divisible by 255, add a lacing segment
# if possible
segments[0]+=1
segments.push(0)
end
needsContinuation=(segments[segments.length-1]==255)
pagedata+=segments.pack("C*")
end
if subsize>0
pagedata+=packet[i,subsize]
end
pagedata=rechecksum(pagedata)
fw.write(pagedata)
pageCount+=1
# Next page is a continuation of the packet
flags|=0x01
i+=subsize
end
return pageCount
end
def self.getFreeFile(dest)
base=dest
fn=base;i=0
while FileTest.exist?(fn)
fn=base+i.to_s
i+=1
end
return fn
end
def self.readPage(f)
return nil if f.read(4)!="OggS"
f.pos+=1
data=f.read(22).unpack("CVVVVVC")
segments=f.read(data[6]).unpack("C*")
packets=[]
totalSize=0
lastPacket=0
packetOffset=0
i=0;while i<segments.length
totalSize+=segments[i]
if segments[i]==0xFF
lastPacket+=0xFF
else
lastPacket+=segments[i]
packets.push([packetOffset,lastPacket])
packetOffset+=lastPacket
lastPacket=0
end
i+=1
end
return [data,packets,totalSize]
end
def self.streamSerial(x)
return x[0][3]
end
def self.isContinuation(x)
return (x[0][0]&1)!=0
end
def self.isLastPage(x)
return (x[0][0]&4)!=0
end
def self.pageChecksum(x)
modpage=x[0,0x16]+([0].pack("V"))+x[0x1A,x.length]
return OggMetadata.crc32(modpage)
end
def self.replaceCommentPacket(f,fw,commentPacket)
packetIndex=-1
desiredPacket=1
serial=nil
seqno=0
while true
pageStart=f.pos
page=OggMetadata.readPage(f)
firstPacket=packetIndex+1
isContinuation=OggMetadata.isContinuation(page)
if isContinuation
raise "continuation in first page" if packetIndex<0
#p "Packet continued"
firstPacket-=1
end
if serial!=nil && serial!=OggMetadata.streamSerial(page)
raise "multiple logical bitstreams not supported"
end
serial=OggMetadata.streamSerial(page)
endPagePos=f.pos
if desiredPacket>=firstPacket &&
desiredPacket<(firstPacket+page[1].length)
packet=page[1][desiredPacket-firstPacket]
f.pos=endPagePos+packet[0]
if !isContinuation && page[1].length==1
# Single-packet page, not a continuation
pagesWritten=OggMetadata.encodePacket(
page,commentPacket,seqno,fw)
seqno+=pagesWritten
elsif firstPacket==desiredPacket && page[1].length>1
# First packet is the desired packet
# Remove the first packet
pageSize=(f.pos+page[2])-pageStart
f.pos=pageStart
pagedata=f.read(pageSize)
# Write the comment packet (if not a continuation)
if !isContinuation
pagesWritten=OggMetadata.encodePacket(
page,commentPacket,seqno,fw)
seqno+=pagesWritten
end
# Write the remaining packets
pagedata=removeFirstPacket(pagedata,seqno,page[0][1],page[0][2])
seqno+=1
fw.write(pagedata)
elsif isContinuation && firstPacket==desiredPacket
# Single-packet continuation. This packet was already
# written, so no need to write this page
else
raise "Not supported: comment packet not the first in a page"
end
else
# Packet not represented in this page
pageSize=(f.pos+page[2])-pageStart
f.pos=pageStart
pagedata=f.read(pageSize)
pagedata=resequencePage(pagedata,seqno)
seqno+=1
fw.write(pagedata)
end
packetIndex+=page[1].length
if isContinuation
packetIndex-=1
end
f.pos=endPagePos+page[2]
break if OggMetadata.isLastPage(page)
end
end
def self.encodeUtf8(str)
begin
return str.encode("UTF-8")
rescue
# Fall back by replacing extended bytes with question marks
ret=""
str.each_byte{|b|
ret+=(b>=0x80) ? "?" : b.chr
}
return ret
end
end
def self.makeCommentPacket(hash)
header=3.chr+"vorbis"+([11].pack("V"))+"OggMetadata"
comments=[]
for key in hash.keys
value=hash[key]
if value.is_a?(Array)
for item in value
# Note: Falsy values such as nil are replaced with
# an empty string
val=encodeUtf8((item||"").to_s)
keyvalue=key.downcase+"="+val
keyvalue=keyvalue.gsub(/[\r\n]/,"")
comments.push([keyvalue.length].pack("V"))
comments.push(keyvalue)
end
else
val=encodeUtf8((value||"").to_s)
keyvalue=key.downcase+"="+val
keyvalue=keyvalue.gsub(/[\r\n]/,"")
comments.push([keyvalue.length].pack("V"))
comments.push(keyvalue)
end
end
return header+([comments.length/2].pack("V"))+comments.join("")+1.chr
end
def self.getCommentPacket(f)
packetIndex=-1
# Extract the "comment packet", always the second packet in an Ogg bitstream
commentPacket=""
desiredPacket=1
serial=nil
while true
pageStart=f.pos
page=OggMetadata.readPage(f)
firstPacket=packetIndex+1
isContinuation=OggMetadata.isContinuation(page)
if isContinuation
raise "continuation in first page" if packetIndex<0
#p "Packet continued"
firstPacket-=1
end
if serial!=nil && serial!=OggMetadata.streamSerial(page)
raise "multiple logical bitstreams not supported"
end
serial=OggMetadata.streamSerial(page)
endPagePos=f.pos
if desiredPacket>=firstPacket &&
desiredPacket<(firstPacket+page[1].length)
packet=page[1][desiredPacket-firstPacket]
f.pos=endPagePos+packet[0]
commentPacket+=f.read(packet[1])
end
packetIndex+=page[1].length
if isContinuation
packetIndex-=1
end
f.pos=endPagePos+page[2]
break if OggMetadata.isLastPage(page)
end
return commentPacket
end
public
def self.getAudioInfo(f)
if f.is_a?(String)
File.open(f,"rb"){|ff| return getAudioInfo(ff) }
end
page=OggMetadata.readPage(f)
isContinuation=OggMetadata.isContinuation(page)
if isContinuation
raise "continuation in first page" if packetIndex<0
end
f.pos+=1
if f.read(6)!="vorbis"
raise "Unsupported codec"
end
data=f.read(23).unpack("VCVVVVCC")
if data[0]!=0 || data[1]==0 || data[2]==0 || data[7]==0 ||
(data[6]&15)<6 || (data[6]&15)>13 ||
((data[6]>>4)&15)<6 || ((data[6]>>4)&15)>13 ||
(data[6]&15)>((data[6]>>4)&15)
raise "Invalid Vorbis header"
end
channels=data[1]
samplerate=data[2]
f.pos=0
serial=nil
maxGranulePos=0
while true
page=OggMetadata.readPage(f)
isContinuation=OggMetadata.isContinuation(page)
if isContinuation
raise "continuation in first page" if packetIndex<0
firstPacket-=1
end
if serial!=nil && serial!=OggMetadata.streamSerial(page)
raise "multiple logical bitstreams not supported"
end
granulePos=page[0][1]+(page[0][2]<<32)
if (granulePos>>63)==0
# Use only positive granule positions
maxGranulePos=[maxGranulePos,granulePos].max
end
serial=OggMetadata.streamSerial(page)
endPagePos=f.pos
f.pos=endPagePos+page[2]
break if OggMetadata.isLastPage(page)
end
duration=maxGranulePos*1.0/samplerate
return {"channels"=>channels,"samplerate"=>samplerate,"duration"=>duration}
end
def self.writeMetadata(metadata, fileInput, fileOutput=nil)
fileOutput=fileInput if !fileOutput
commentPacket=makeCommentPacket(metadata)
freefile=getFreeFile(fileOutput)
begin
File.open(fileInput,"rb"){|f|
File.open(freefile,"wb"){|fw|
replaceCommentPacket(f,fw,commentPacket)
}}
if freefile!=fileOutput
File.delete(fileOutput) rescue nil
File.rename(freefile,fileOutput)
end
ensure
if freefile!=fileOutput
File.delete(freefile) rescue nil
end
end
end
def self.getMetadata(f)
if f.is_a?(String)
File.open(f,"rb"){|ff| return getMetadata(ff) }
end
comment=OggMetadata.getCommentPacket(f)
if comment[1,6]!="vorbis"
raise "Unsupported comment packet"
end
offset=7
length=comment[offset,4].unpack("V")[0]
offset+=4+length # skip vendor string
commentLength=comment[offset,4].unpack("V")[0]
offset+=4
commentHash={}
for i in 0...commentLength
length=comment[offset,4].unpack("V")[0]
offset+=4
keyvalue=comment[offset,length]
offset+=length
if keyvalue.include?("=")
idx=keyvalue.index("=")
key=keyvalue[0,idx].downcase
value=(keyvalue[idx+1,keyvalue.length]||"")
existing=commentHash[key]
if existing
commentHash[key]=[existing] if existing.is_a?(String)
commentHash[key].push(value)
else
commentHash[key]=value
end
end
end
framingBit=comment[offset,1].unpack("C")[0]
raise "invalid framing bit" if framingBit==0
return commentHash
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment