Skip to content

Instantly share code, notes, and snippets.

@soravux
Created May 25, 2014 04:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soravux/6929ed6105a9b25a51f4 to your computer and use it in GitHub Desktop.
Save soravux/6929ed6105a9b25a51f4 to your computer and use it in GitHub Desktop.
Number station decoding example for blog post.
from scipy.io.wavfile import read, write
from scipy.signal import find_peaks_cwt, firwin, lfilter, medfilt, resample, hamming
from scipy.optimize import fmin
import numpy as np
from matplotlib import pyplot as plt
import argparse
class Tone:
def __init__(self, begin_time=0.0, end_time=0.0, harmonics=None):
# Unreadable but quick, cheap & dirty trick
self.__dict__.update(**locals())
def __repr__(self):
return "{begin_time}-{end_time}: {sorted_harmonics}".format(
sorted_harmonics=sorted(self.harmonics.items()),
**self.__dict__
)
main_harm = lambda x: max(x.harmonics.items(), key=lambda y: y[1])
main_freq = lambda x: float(main_harm(x)[0])
def getTones(data, rate):
"""Get list of tones recognized in the data array."""
norm_data = data.astype('double') / ( 2 ** 15 - 1)
x_axis = np.arange(len(norm_data)) / rate
Pxx, ori_freqs, t, x = plt.specgram(data, NFFT=1024, noverlap=0, Fs=rate)
tones = []
for blk_idx in range(len(Pxx[0])):
fft_block = np.array(list(zip(*Pxx))[blk_idx])[:60]
fft_block, freqs = resample(fft_block, 10*len(fft_block), ori_freqs[:60], window='hamming')
peak_idx = find_peaks_cwt(fft_block[:len(fft_block)/1.8], np.arange(3, 20))
peak_idx = np.array(peak_idx)
if len(peak_idx) and np.max(fft_block[peak_idx]) > 100.0:
tone = Tone(
blk_idx * 512 / 44100.0,
((blk_idx + 1) * 512 - 1) / 44100.0,
{ freqs[freq]: fft_block[freq] for freq in peak_idx },
)
tones.append(tone)
return tones
def mergeTones(tones, freqThr=0.06):
"""
Merge tones: begin and end times adjusted and average their harmonics.
Two tones are considered the same if their main (most powerful) frequency
are in a freqThr% difference.
Input list must be ordered by time.
"""
if not tones:
return []
retVal = [tones[0]]
for tone in tones[1:]:
# If frequencies in both signals are near and proeminent
for harm_freq, harm_ampli in tone.harmonics.items():
near_powerful = harm_ampli > 0.5 * main_harm(retVal[-1])[1]
if not near_powerful:
continue
pdiff = abs(main_freq(retVal[-1]) - harm_freq) / main_freq(retVal[-1])
same_tone = pdiff < freqThr
if same_tone:
retVal[-1].end_time = tone.end_time
break
else:
retVal.append(tone)
# Remove tones that are too small
tmp = []
for tone in retVal:
if tone.end_time - tone.begin_time > 0.02:
tmp.append(tone)
retVal = tmp
return retVal
tones_to_key_number = lambda x: list(int(round(12.0 * np.log2(main_freq(tone) / 440.0) + 49)) for tone in x)
if __name__ == '__main__':
rate, data = read('out.wav')
tones = getTones(data, rate)
merged_tones = mergeTones(tones)
notes = tones_to_key_number(merged_tones)
print(notes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment