Skip to content

Instantly share code, notes, and snippets.

@TaylorMutch
Created May 23, 2016 01:35
Show Gist options
  • Save TaylorMutch/c433aebd4865673e277b630a18bac465 to your computer and use it in GitHub Desktop.
Save TaylorMutch/c433aebd4865673e277b630a18bac465 to your computer and use it in GitHub Desktop.
METEK SOdar (.sdr) to CSV converter. Includes support for gzipped files.
"""
Created by Taylor Mutch on 22 May, 2016
Simple SOdar to CSV file converter.
"""
import os
import sys
import csv
import gzip
import shutil
tags = {'DCL', 'VCL', 'SDR', 'H '}
def readSDR(filepath):
with open(os.path.abspath(filepath)) as datafile:
data = datafile.readlines()
datafile.close()
numLines = len(data)
index = -1
dates = {}
speeds = {}
directions = {}
heights = {}
dates = []
for i in range(0,numLines):
line = data[i]
tag = line[:3]
if tag == 'SDR':
dates.append(int(line[4:16]))
current_date = dates[0]
results = {date : {'heights': [],
'directions': [],
'speeds': []} for date in dates}
for i in range(1,numLines):
line = data[i]
tag = line[:3]
if tag in tags:
speeds = []
directions = []
heights = []
if tag == 'H ':
heights = [int(j) for j in line.strip().split()[1:]]
results[current_date]['heights'] = heights
elif tag == 'VCL':
speeds = [float(j) for j in line.strip().split()[1:]]
results[current_date]['speeds'] = speeds
elif tag == 'DCL':
directions = [float(j) for j in line.strip().split()[1:]]
results[current_date]['directions'] = directions
elif tag == 'SDR':
current_date = int(line[4:16])
return results
def writeCSV(filepath):
data = readSDR(filepath)
dates = data.keys()
with open(filepath.split('.')[0] + '.csv', 'w', newline='') as csvfile:
fieldnames = ['time', 'height', 'speed', 'direction']
csvwriter = csv.DictWriter(csvfile, fieldnames=fieldnames, dialect='excel')
csvwriter.writeheader()
for date in dates:
heights = data[date]['heights']
speeds = data[date]['speeds']
directions = data[date]['directions']
for i in range(0,len(speeds)):
csvwriter.writerow({'time' : date,
'height': heights[i],
'speed' : speeds[i],
'direction' : directions[i]})
def processFile(filepath, directory=None):
actual_file = filepath
file_parts = filepath.split('.')
if len(file_parts) > 2 and file_parts[2] == 'gz':
with gzip.open(filepath,'rb') as f_in:
actual_file = file_parts[0] + '.' + file_parts[1]
if (actual_file not in os.listdir(directory)) or directory == None:
with open(actual_file, 'wb') as f_out:
shutil.copyfileobj(f_in,f_out)
return actual_file
if __name__ == "__main__":
if len(sys.argv) == 3:
if sys.argv[2] in ['-all', '-a']:
# convert all files with .sdr extension in current working directory to csv
orig_file = sys.argv[1]
directory = os.path.dirname(os.path.abspath(orig_file))
extension = orig_file.split('.')[1]
extensions = [extension]
extensions.append(extension.upper()) # include .SDR
files = os.listdir(directory)
for file in files:
file_parts = file.split('.')
if len(file_parts) > 1 and file_parts[1] in extensions:
writeCSV(processFile(file,directory))
elif len(sys.argv) == 2:
# convert single file to csv
writeCSV(processFile(sys.argv[1]))
else:
print("\nNo file provided. Use local file names, and add the")
print(" -a or -all tag to operate on all files in a directory.")
print("Does not operate on recursive folders")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment