Skip to content

Instantly share code, notes, and snippets.

@EmilHernvall
Created September 29, 2011 14:07
Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save EmilHernvall/1250788 to your computer and use it in GitHub Desktop.
Save EmilHernvall/1250788 to your computer and use it in GitHub Desktop.
Encode data as sound, and retrieve it using a FFT
class GlobalFormat(object):
def __init__(self):
self.id = 0x46464952 # RIFF
self.size = 0
self.type = 0x45564157 # WAVE
def as_bin(self):
return struct.pack("III", self.id, self.size, self.type)
class FormatChunk(object):
def __init__(self):
self.id = 0x20746d66
self.size = 0
self.formatTag = 0
self.channels = 0
self.bitsPerSample = 0
self.samplesPerSec = 0
self.avgBytesPerSec = 0
self.blockAlign = 4
def as_bin(self):
return struct.pack("IIhHIIHH", self.id, self.size, self.formatTag, self.channels, \
self.samplesPerSec, self.avgBytesPerSec, self.blockAlign, self.bitsPerSample)
import struct
import math
import os.path
infile = "sieve1.py"
filename = "out.wav"
sampleFreq = 8000
denominator = 8
frequencies = (262, 294, 330, 350, 392, 440, 494, 523, 587, 659, 698, 784, 880, 988, 1047, 1175)
size = os.path.getsize(infile)
fmt = FormatChunk()
fmt.size = 0x10
fmt.formatTag = 1
fmt.channels = 1
fmt.bitsPerSample = 16
fmt.samplesPerSec = sampleFreq
fmt.avgBytesPerSec = fmt.samplesPerSec * fmt.bitsPerSample / 8
fmt.blockAlign = 4
print "channels: %d" % (fmt.channels,)
print "samplesPerSec: %d" % (fmt.samplesPerSec,)
print "bytesPerSec: %d" % (fmt.avgBytesPerSec,)
print "bitsPerSample: %d" % (fmt.bitsPerSample,)
amplitude = 3000
length = (3*denominator + size) * fmt.samplesPerSec / denominator * fmt.channels * fmt.bitsPerSample / 8
infh = open(infile, "rb")
fh = open(filename, "wb")
format = GlobalFormat()
format.size = length
fh.write(format.as_bin())
fh.write(fmt.as_bin())
dataId = 0x61746164
fh.write(struct.pack("I", dataId))
dataLength = length - 12 - 16 - 12
fh.write(struct.pack("I", dataLength))
print "sync length: %d" % (3*fmt.avgBytesPerSec,)
frequency = 440
period = 2*math.pi/(fmt.samplesPerSec/float(frequency))
samples = []
for i in xrange(0, 3*fmt.samplesPerSec):
sample = amplitude * (1 + math.sin(period*float(i)))
samples.append(int(sample))
fh.write(struct.pack("h", int(sample)))
fh.flush()
halfByteSampleLength = fmt.samplesPerSec / denominator
print "half byte sample length: %d" % (halfByteSampleLength,)
j = 0
while True:
data = infh.read(1)
if data == "":
break
(m,) = struct.unpack("B", data)
a = m & 0xF
b = m >> 4
for halfbyte in (a,b):
frequency = frequencies[halfbyte]
#print frequency
period = 2*math.pi/(fmt.samplesPerSec/float(frequency))
for i in xrange(0, halfByteSampleLength):
sample = amplitude * (1 + math.sin(period*float(i)))
fh.write(struct.pack("h", int(sample)))
fh.flush()
j += 1
#print "i: %d" % (j,)
infh.close()
fh.close()
def read_and_unpack(fh, format):
size = struct.calcsize(format)
return struct.unpack(format, fh.read(size))
def get_maxfrequency(samples, norm, maxAcceptable = 1200):
samplesArr = numpy.numarray.array(samples)
transformedSamples = numpy.fft.fft(samplesArr)
maxFreq = 0
maxI = 0
for i, freq in enumerate(transformedSamples):
j = int(norm * i)
a = math.sqrt(freq.real**2 + freq.imag**2)
if a > maxFreq and i > 0 and j < maxAcceptable:
#print "%d: %10.10f" % (i, a)
maxFreq = a
maxI = j
return maxI
def get_closest_index(frequencies, freq):
minDist = ()
minI = -1
for i, cmp in enumerate(frequencies):
dist = abs(freq - cmp)
if dist < minDist:
minDist = dist
minI = i
return minI
import struct
import math
import os.path
import numpy.numarray
import numpy.fft
filename = "out.wav"
sourceSampleFreq = 8000
denominator = 8
frequencies = (262, 294, 330, 350, 392, 440, 494, 523, 587, 659, 698, 784, 880, 988, 1047, 1175)
fh = open(filename, "rb")
(id, size, type) = read_and_unpack(fh, "III")
(id, size, formatTag, channels, samplesPerSec, \
bytesPerSec, blockAlign, bitsPerSample) = read_and_unpack(fh, "IIhHIIHH")
print "channels: %d" % (channels,)
print "samplesPerSec: %d" % (samplesPerSec,)
print "bytesPerSec: %d" % (bytesPerSec,)
print "bitsPerSample: %d" % (bitsPerSample,)
# skip data chunk header
fh.read(4)
# read length
(length,) = read_and_unpack(fh, "I")
print "length: %d" % (length,)
syncToneLength = 3*samplesPerSec
print "sync length: %d" % (syncToneLength,)
syncSamples = []
for i in xrange(0, syncToneLength):
(sample,) = read_and_unpack(fh, "h")
syncSamples.append(sample)
syncFreq = get_maxfrequency(syncSamples, 1.0/3.0)
if syncFreq == 440:
print "Found valid 3 second syncbeep of 440hz"
else:
print "Invalid sync beep: %d" % (syncFreq,)
exit(0)
halfByteSampleLength = samplesPerSec / denominator
print "half byte sample length: %d" % (halfByteSampleLength,)
j = 0
buffer = ""
while True:
try:
halfByte = []
for w in xrange(0,2):
samples = []
for i in xrange(0, halfByteSampleLength):
(sample,) = read_and_unpack(fh, "h")
samples.append(sample)
freq = get_maxfrequency(samples, samplesPerSec / float(halfByteSampleLength))
v = get_closest_index(frequencies, freq)
halfByte.append(v)
fullbyte = halfByte[0] | (halfByte[1] << 4)
buffer += chr(fullbyte)
j += 1
except:
break
print
print "decoded data:"
print buffer
#print "i: %d" % (j,)
fh.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment