Skip to content

Instantly share code, notes, and snippets.

@EnisBerk
Last active February 16, 2024 20:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save EnisBerk/0c8d987ccbb3b62aab066437d68e4b36 to your computer and use it in GitHub Desktop.
Save EnisBerk/0c8d987ccbb3b62aab066437d68e4b36 to your computer and use it in GitHub Desktop.
Export WAMD metadata from wav files to csv.
#!/usr/bin/env python3
"""
Export WAMD metadata from wav files to csv.
usage:
python3 wamd2csv.py --folder /path/to/folder --csv_file /path/to/output.csv
Adopted from https://github.com/riggsd/guano-py/blob/master/bin/wamd2guano.py
"""
from __future__ import print_function
# from math import e
import sys
import time
import os
import os.path
# from re import sub
import sys
import chunk
import struct
from datetime import datetime, tzinfo, timedelta
# from pprint import pprint
import csv
from glob import glob
import argparse
# try to import tqdm for progress bar
try:
from tqdm import tqdm # type: ignore
except ImportError:
def tqdm(iterable, *args, **kwargs):
total = len(iterable)
start_time = time.time()
for i, item in enumerate(iterable, start=1):
yield item
elapsed_time = time.time() - start_time
avg_time = elapsed_time / i
remaining_time = avg_time * (total - i)
progress = i / total
progress_percent = progress * 100
progress_bar = '#' * int(
progress * 20) + ' ' * (20 - int(progress * 20))
sys.stdout.write(
"\r{:.1f}% [{}] - elapsed: {:.2f}s - remaining: {:.2f}s".format(
progress_percent, progress_bar, elapsed_time,
remaining_time))
sys.stdout.flush()
sys.stdout.write("\n")
_ZERO = timedelta(0)
if sys.version_info[0] > 2:
unicode = str # type: ignore
basestring = str # type: ignore
# from guano import GuanoFile, tzoffset
class tzoffset(tzinfo): # pylint: disable=C0103:invalid-name
"""
Fixed-offset concrete timezone implementation.
`offset` should be numeric hours or ISO format string like '-07:00'.
"""
def __init__(self, offset=None):
if isinstance(offset, basestring):
# offset as ISO string '-07:00' or '-07' format
vals = offset.split(':')
offset = int(vals[0]) if len(vals) == 1 else int(
vals[0]) + int(vals[1]) / 60.0
self._offset_hours = offset
self._offset = timedelta(hours=offset) # type: ignore
def utcoffset(self, dt):
return self._offset
def dst(self, dt):
return _ZERO
def tzname(self, dt):
return 'UTC' + str(self._offset_hours)
def __repr__(self):
return self.tzname(None)
# binary WAMD field identifiers
WAMD_IDS = {
0x00: 'version',
0x01: 'model',
0x02: 'serial',
0x03: 'firmware',
0x04: 'prefix',
0x05: 'timestamp',
0x06: 'gpsfirst',
0x07: 'gpstrack',
0x08: 'software',
0x09: 'license',
0x0A: 'notes',
0x0B: 'auto_id',
0x0C: 'manual_id',
0x0D: 'voicenotes',
0x0E: 'auto_id_stats',
0x0F: 'time_expansion',
0x10: 'program',
0x11: 'runstate',
0x12: 'microphone',
0x13: 'sensitivity',
}
# fields that we exclude from our in-memory representation
WAMD_DROP_IDS = (
0x0D, # voice note embedded .WAV
0x10, # program binary
0x11, # runstate giant binary blob
0xFFFF, # used for 16-bit alignment
)
def _parse_text(value):
"""Default coercion function which assumes text is UTF-8 encoded"""
return value.decode('utf-8')
def _parse_wamd_timestamp(timestamp):
"""WAMD timestamps are one of these known formats:
2014-04-02 22:59:14-05:00
2014-04-02 22:59:14.000
2014-04-02 22:59:14
Produces a `datetime.datetime`.
"""
if isinstance(timestamp, bytes):
timestamp = timestamp.decode('utf-8')
if len(timestamp) == 25:
dt, offset = timestamp[:-6], timestamp[19:]
tz = tzoffset(offset)
return datetime.strptime(dt, '%Y-%m-%d %H:%M:%S').replace(tzinfo=tz)
elif len(timestamp) == 23:
return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
elif len(timestamp) == 19:
return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
else:
return None
def _parse_wamd_gps(gpsfirst):
"""WAMD "GPS First" waypoints are in one of these two formats:
SM3, SM4, (the correct format):
WGS..., LAT, N|S, LON, E|W [, alt...]
EMTouch:
WGS..., [-]LAT, [-]LON[,alt...]
Produces (lat, lon, altitude) float tuple.
"""
if not gpsfirst:
return None
if isinstance(gpsfirst, bytes):
gpsfirst = gpsfirst.decode('utf-8')
vals = tuple(val.strip() for val in gpsfirst.split(','))
datum, vals = vals[0], vals[1:]
del datum
if vals[1] in ('N', 'S'):
# Standard format
lat, lon = float(vals[0]), float(vals[2])
if vals[1] == 'S':
lat *= -1
if vals[3] == 'W':
lon *= -1
alt = int(round(float(vals[4]))) if len(vals) > 4 else None
else:
# EMTouch format
lat, lon = float(vals[0]), float(vals[1])
alt = int(round(float(vals[2]))) if len(vals) > 2 else None
return lat, lon, alt
# rules to coerce values from binary string to native types (default is `str`)
WAMD_COERCE = {
'version': lambda x: struct.unpack('<H', x)[0],
'timestamp': _parse_wamd_timestamp,
'gpsfirst': _parse_wamd_gps,
'time_expansion': lambda x: struct.unpack('<H', x)[0],
}
def wamd(fname):
"""Extract WAMD metadata from a .WAV file as a dict"""
with open(fname, 'rb') as f:
ch = chunk.Chunk(f, bigendian=False)
ch_name = ch.getname()
if ch_name != b'RIFF':
raise Exception(f'{fname} is not a RIFF file! (got {ch_name})') # pylint: disable=W0719:broad-exception-raised
ch_4_cc = ch.read(4)
if ch_4_cc != b'WAVE':
raise Exception(f'{fname} is not a WAVE file! (got {ch_4_cc})') # pylint: disable=W0719:broad-exception-raised
wamd_chunk = None
while True:
try:
subch = chunk.Chunk(ch, bigendian=False) # type: ignore
except EOFError:
break
subch_name = subch.getname()
if subch_name == b'wamd':
wamd_chunk = subch
break
else:
# print(f'skipping {subch_name}')
try:
subch.skip()
except: # pylint: disable=W0702:bare-except
break
# print(f'error skipping {subch_name}: {e}')
if not wamd_chunk:
raise Exception(f'"wamd" WAV chunk not found in file {fname}') # pylint: disable=W0719:broad-exception-raised
metadata = {}
offset = 0
size = wamd_chunk.getsize()
buf = wamd_chunk.read(size)
while offset < size:
id_val = struct.unpack_from('< H', buf, offset)[0]
len_val = struct.unpack_from('< I', buf, offset + 2)[0]
val = struct.unpack_from('< %ds' % len_val, buf, offset + 6)[0]
if id_val not in WAMD_DROP_IDS:
name = WAMD_IDS.get(id_val, id_val)
val = WAMD_COERCE.get(name, _parse_text)(val)
metadata[name] = val
offset += 6 + len_val
return metadata
def wamd2dict(fname):
"""Make sure we have all keys for the csv file"""
wamd_md = wamd(fname)
info_dict = {}
timestamp = wamd_md.pop('timestamp')
info_dict['timestamp'] = timestamp.isoformat()
info_dict['notes'] = wamd_md.pop('notes', '')
info_dict['model'] = wamd_md.pop('model', '')
info_dict['firmware_version'] = wamd_md.pop('firmware', '')
info_dict['species_auto_id'] = wamd_md.pop('auto_id', '')
info_dict['species_manual_id'] = wamd_md.pop('manual_id', '')
info_dict['te'] = wamd_md.pop('time_expansion', 1)
# info_dict['Samplerate'] = gfile.wav_params.framerate * gfile['TE']
# info_dict['Length'] = gfile.wav_params.nframes / float(
# gfile.wav_params.framerate) * gfile['TE']
if 'gpsfirst' in wamd_md:
lat, lon, alt = wamd_md.pop('gpsfirst')
info_dict['loc_position'] = lat, lon
info_dict['loc_elevation'] = alt
return info_dict
def process_folder(folder, results, save_freq, results_file):
fnames = glob(os.path.join(folder, '**'), recursive=True)
fnames = [fname for fname in fnames if fname.lower().endswith('.wav')]
print('Found', len(fnames), ' wav files')
fnames_done = [fname for fname in fnames if fname in results]
fnames = [fname for fname in fnames if fname not in results]
if fnames_done:
print(f'{len(fnames_done)} files already in csv.')
print(f'Processing {len(fnames)} new files.')
fnames.sort()
for i, fname in enumerate(tqdm(fnames, desc='Processing files')):
try:
metadata = wamd2dict(fname)
metadata['error'] = ''
results[fname] = metadata
except Exception as e: # pylint: disable=broad-except
results[fname] = {'error': str(e)}
if (i + 1) % save_freq == 0:
save_results(results, results_file)
results.clear()
return results
def save_results(results, results_file):
# Save results
fieldnames = [
'fname', 'timestamp', 'notes', 'model', 'firmware_version',
'species_auto_id', 'species_manual_id', 'te', 'error'
]
with open(results_file, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if f.tell() == 0:
writer.writeheader()
for fname, metadata in results.items():
row = {'fname': fname}
row.update(metadata)
writer.writerow(row)
def main():
parser = argparse.ArgumentParser(description='Process some files.')
parser.add_argument('--csv_file',
type=str,
required=True,
help='Path to the CSV file')
parser.add_argument('--folder',
type=str,
required=True,
help='folder to process')
parser.add_argument('--save_freq',
type=int,
default=100,
help='Frequency of saving results to CSV file')
args = parser.parse_args()
results_file = args.csv_file
# Load previous results if they exist
results = {}
if os.path.exists(results_file):
with open(results_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
results[row['fname']] = row
# Process folder
results = process_folder(args.folder, results, args.save_freq, results_file)
# Save remaining results
save_results(results, results_file)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment