Last active
February 16, 2024 20:03
-
-
Save EnisBerk/0c8d987ccbb3b62aab066437d68e4b36 to your computer and use it in GitHub Desktop.
Export WAMD metadata from wav files to csv.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Export WAMD metadata from wav files to csv. | |
usage: | |
python3 wamd2csv.py --folder /path/to/folder --csv_file /path/to/output.csv | |
Adopted from https://github.com/riggsd/guano-py/blob/master/bin/wamd2guano.py | |
""" | |
from __future__ import print_function | |
# from math import e | |
import sys | |
import time | |
import os | |
import os.path | |
# from re import sub | |
import sys | |
import chunk | |
import struct | |
from datetime import datetime, tzinfo, timedelta | |
# from pprint import pprint | |
import csv | |
from glob import glob | |
import argparse | |
# try to import tqdm for progress bar | |
try: | |
from tqdm import tqdm # type: ignore | |
except ImportError: | |
def tqdm(iterable, *args, **kwargs): | |
total = len(iterable) | |
start_time = time.time() | |
for i, item in enumerate(iterable, start=1): | |
yield item | |
elapsed_time = time.time() - start_time | |
avg_time = elapsed_time / i | |
remaining_time = avg_time * (total - i) | |
progress = i / total | |
progress_percent = progress * 100 | |
progress_bar = '#' * int( | |
progress * 20) + ' ' * (20 - int(progress * 20)) | |
sys.stdout.write( | |
"\r{:.1f}% [{}] - elapsed: {:.2f}s - remaining: {:.2f}s".format( | |
progress_percent, progress_bar, elapsed_time, | |
remaining_time)) | |
sys.stdout.flush() | |
sys.stdout.write("\n") | |
_ZERO = timedelta(0) | |
if sys.version_info[0] > 2: | |
unicode = str # type: ignore | |
basestring = str # type: ignore | |
# from guano import GuanoFile, tzoffset | |
class tzoffset(tzinfo): # pylint: disable=C0103:invalid-name | |
""" | |
Fixed-offset concrete timezone implementation. | |
`offset` should be numeric hours or ISO format string like '-07:00'. | |
""" | |
def __init__(self, offset=None): | |
if isinstance(offset, basestring): | |
# offset as ISO string '-07:00' or '-07' format | |
vals = offset.split(':') | |
offset = int(vals[0]) if len(vals) == 1 else int( | |
vals[0]) + int(vals[1]) / 60.0 | |
self._offset_hours = offset | |
self._offset = timedelta(hours=offset) # type: ignore | |
def utcoffset(self, dt): | |
return self._offset | |
def dst(self, dt): | |
return _ZERO | |
def tzname(self, dt): | |
return 'UTC' + str(self._offset_hours) | |
def __repr__(self): | |
return self.tzname(None) | |
# binary WAMD field identifiers | |
WAMD_IDS = { | |
0x00: 'version', | |
0x01: 'model', | |
0x02: 'serial', | |
0x03: 'firmware', | |
0x04: 'prefix', | |
0x05: 'timestamp', | |
0x06: 'gpsfirst', | |
0x07: 'gpstrack', | |
0x08: 'software', | |
0x09: 'license', | |
0x0A: 'notes', | |
0x0B: 'auto_id', | |
0x0C: 'manual_id', | |
0x0D: 'voicenotes', | |
0x0E: 'auto_id_stats', | |
0x0F: 'time_expansion', | |
0x10: 'program', | |
0x11: 'runstate', | |
0x12: 'microphone', | |
0x13: 'sensitivity', | |
} | |
# fields that we exclude from our in-memory representation | |
WAMD_DROP_IDS = ( | |
0x0D, # voice note embedded .WAV | |
0x10, # program binary | |
0x11, # runstate giant binary blob | |
0xFFFF, # used for 16-bit alignment | |
) | |
def _parse_text(value): | |
"""Default coercion function which assumes text is UTF-8 encoded""" | |
return value.decode('utf-8') | |
def _parse_wamd_timestamp(timestamp): | |
"""WAMD timestamps are one of these known formats: | |
2014-04-02 22:59:14-05:00 | |
2014-04-02 22:59:14.000 | |
2014-04-02 22:59:14 | |
Produces a `datetime.datetime`. | |
""" | |
if isinstance(timestamp, bytes): | |
timestamp = timestamp.decode('utf-8') | |
if len(timestamp) == 25: | |
dt, offset = timestamp[:-6], timestamp[19:] | |
tz = tzoffset(offset) | |
return datetime.strptime(dt, '%Y-%m-%d %H:%M:%S').replace(tzinfo=tz) | |
elif len(timestamp) == 23: | |
return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f') | |
elif len(timestamp) == 19: | |
return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S') | |
else: | |
return None | |
def _parse_wamd_gps(gpsfirst): | |
"""WAMD "GPS First" waypoints are in one of these two formats: | |
SM3, SM4, (the correct format): | |
WGS..., LAT, N|S, LON, E|W [, alt...] | |
EMTouch: | |
WGS..., [-]LAT, [-]LON[,alt...] | |
Produces (lat, lon, altitude) float tuple. | |
""" | |
if not gpsfirst: | |
return None | |
if isinstance(gpsfirst, bytes): | |
gpsfirst = gpsfirst.decode('utf-8') | |
vals = tuple(val.strip() for val in gpsfirst.split(',')) | |
datum, vals = vals[0], vals[1:] | |
del datum | |
if vals[1] in ('N', 'S'): | |
# Standard format | |
lat, lon = float(vals[0]), float(vals[2]) | |
if vals[1] == 'S': | |
lat *= -1 | |
if vals[3] == 'W': | |
lon *= -1 | |
alt = int(round(float(vals[4]))) if len(vals) > 4 else None | |
else: | |
# EMTouch format | |
lat, lon = float(vals[0]), float(vals[1]) | |
alt = int(round(float(vals[2]))) if len(vals) > 2 else None | |
return lat, lon, alt | |
# rules to coerce values from binary string to native types (default is `str`) | |
WAMD_COERCE = { | |
'version': lambda x: struct.unpack('<H', x)[0], | |
'timestamp': _parse_wamd_timestamp, | |
'gpsfirst': _parse_wamd_gps, | |
'time_expansion': lambda x: struct.unpack('<H', x)[0], | |
} | |
def wamd(fname): | |
"""Extract WAMD metadata from a .WAV file as a dict""" | |
with open(fname, 'rb') as f: | |
ch = chunk.Chunk(f, bigendian=False) | |
ch_name = ch.getname() | |
if ch_name != b'RIFF': | |
raise Exception(f'{fname} is not a RIFF file! (got {ch_name})') # pylint: disable=W0719:broad-exception-raised | |
ch_4_cc = ch.read(4) | |
if ch_4_cc != b'WAVE': | |
raise Exception(f'{fname} is not a WAVE file! (got {ch_4_cc})') # pylint: disable=W0719:broad-exception-raised | |
wamd_chunk = None | |
while True: | |
try: | |
subch = chunk.Chunk(ch, bigendian=False) # type: ignore | |
except EOFError: | |
break | |
subch_name = subch.getname() | |
if subch_name == b'wamd': | |
wamd_chunk = subch | |
break | |
else: | |
# print(f'skipping {subch_name}') | |
try: | |
subch.skip() | |
except: # pylint: disable=W0702:bare-except | |
break | |
# print(f'error skipping {subch_name}: {e}') | |
if not wamd_chunk: | |
raise Exception(f'"wamd" WAV chunk not found in file {fname}') # pylint: disable=W0719:broad-exception-raised | |
metadata = {} | |
offset = 0 | |
size = wamd_chunk.getsize() | |
buf = wamd_chunk.read(size) | |
while offset < size: | |
id_val = struct.unpack_from('< H', buf, offset)[0] | |
len_val = struct.unpack_from('< I', buf, offset + 2)[0] | |
val = struct.unpack_from('< %ds' % len_val, buf, offset + 6)[0] | |
if id_val not in WAMD_DROP_IDS: | |
name = WAMD_IDS.get(id_val, id_val) | |
val = WAMD_COERCE.get(name, _parse_text)(val) | |
metadata[name] = val | |
offset += 6 + len_val | |
return metadata | |
def wamd2dict(fname): | |
"""Make sure we have all keys for the csv file""" | |
wamd_md = wamd(fname) | |
info_dict = {} | |
timestamp = wamd_md.pop('timestamp') | |
info_dict['timestamp'] = timestamp.isoformat() | |
info_dict['notes'] = wamd_md.pop('notes', '') | |
info_dict['model'] = wamd_md.pop('model', '') | |
info_dict['firmware_version'] = wamd_md.pop('firmware', '') | |
info_dict['species_auto_id'] = wamd_md.pop('auto_id', '') | |
info_dict['species_manual_id'] = wamd_md.pop('manual_id', '') | |
info_dict['te'] = wamd_md.pop('time_expansion', 1) | |
# info_dict['Samplerate'] = gfile.wav_params.framerate * gfile['TE'] | |
# info_dict['Length'] = gfile.wav_params.nframes / float( | |
# gfile.wav_params.framerate) * gfile['TE'] | |
if 'gpsfirst' in wamd_md: | |
lat, lon, alt = wamd_md.pop('gpsfirst') | |
info_dict['loc_position'] = lat, lon | |
info_dict['loc_elevation'] = alt | |
return info_dict | |
def process_folder(folder, results, save_freq, results_file): | |
fnames = glob(os.path.join(folder, '**'), recursive=True) | |
fnames = [fname for fname in fnames if fname.lower().endswith('.wav')] | |
print('Found', len(fnames), ' wav files') | |
fnames_done = [fname for fname in fnames if fname in results] | |
fnames = [fname for fname in fnames if fname not in results] | |
if fnames_done: | |
print(f'{len(fnames_done)} files already in csv.') | |
print(f'Processing {len(fnames)} new files.') | |
fnames.sort() | |
for i, fname in enumerate(tqdm(fnames, desc='Processing files')): | |
try: | |
metadata = wamd2dict(fname) | |
metadata['error'] = '' | |
results[fname] = metadata | |
except Exception as e: # pylint: disable=broad-except | |
results[fname] = {'error': str(e)} | |
if (i + 1) % save_freq == 0: | |
save_results(results, results_file) | |
results.clear() | |
return results | |
def save_results(results, results_file): | |
# Save results | |
fieldnames = [ | |
'fname', 'timestamp', 'notes', 'model', 'firmware_version', | |
'species_auto_id', 'species_manual_id', 'te', 'error' | |
] | |
with open(results_file, 'a', newline='', encoding='utf-8') as f: | |
writer = csv.DictWriter(f, fieldnames=fieldnames) | |
if f.tell() == 0: | |
writer.writeheader() | |
for fname, metadata in results.items(): | |
row = {'fname': fname} | |
row.update(metadata) | |
writer.writerow(row) | |
def main(): | |
parser = argparse.ArgumentParser(description='Process some files.') | |
parser.add_argument('--csv_file', | |
type=str, | |
required=True, | |
help='Path to the CSV file') | |
parser.add_argument('--folder', | |
type=str, | |
required=True, | |
help='folder to process') | |
parser.add_argument('--save_freq', | |
type=int, | |
default=100, | |
help='Frequency of saving results to CSV file') | |
args = parser.parse_args() | |
results_file = args.csv_file | |
# Load previous results if they exist | |
results = {} | |
if os.path.exists(results_file): | |
with open(results_file, 'r', encoding='utf-8') as f: | |
reader = csv.DictReader(f) | |
for row in reader: | |
results[row['fname']] = row | |
# Process folder | |
results = process_folder(args.folder, results, args.save_freq, results_file) | |
# Save remaining results | |
save_results(results, results_file) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment