Last active
November 23, 2016 21:51
-
-
Save TaylorMutch/05b9680e475e2b3988c0df1e5e277def to your computer and use it in GitHub Desktop.
Utilities for working with Sodar files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Taylor Mutch, | |
Revision date - 11/22/2016 | |
Utilities for working with the VALCEX data. | |
timestamp in this context will refer to either a Sodar timestamp (e.x. 120314124500), | |
or a datetime.datetime object (e.x. datetime(2012, 3, 14, 12, 45)) | |
""" | |
import os | |
import sys | |
import numpy as np | |
import datetime | |
import sqlite3 | |
import io | |
#x = np.arange(12).reshape(2,6) | |
#con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES) | |
#cur = con.cursor() | |
#cur.execute("create table test (arr array)") | |
months = [None, "Jan", "Feb", "Mar", "Apr", "May", "Jun", | |
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] | |
# tags for direction, speed, SDR (timestamp) and height in .sdr file | |
TAGS = {'DCL', 'VCL', 'SDR', 'H '} | |
SODAR_FIELDS = { | |
"speed": 0, | |
"direction": 1 | |
} | |
def plural_names(bands): | |
""" Allow for plural names to be used """ | |
result = dict() | |
result.update(bands) | |
other_names = {band+'s':bands[band] for band in bands.keys()} | |
result.update(other_names) | |
return result | |
# Meta information for the data structure | |
MAX_TIMESTAMPS = 288 # 288 5-minute periods in 1 day | |
NO_DATA = -1 | |
NIGHT_START_INDEX = 18*60//5 # start at 1800 | |
NIGHT_STOP_INDEX = NIGHT_START_INDEX + 12*60//5 + 1 # stop at 0600 the next day, +1 gives us 0600 included in the night reading | |
NIGHT_SIZE = NIGHT_STOP_INDEX - NIGHT_START_INDEX # 144 timestamps | |
def read_sdr(filepath): | |
""" | |
Base file reader. Returns raw Sodar data | |
@return Heights, timestamps, speeds and directions each as 1D or 2D arrays | |
""" | |
with open(os.path.abspath(filepath)) as datafile: | |
data = datafile.readlines() | |
timestamps = list() | |
speeds = list() | |
directions = list() | |
heights = list() | |
for line in data: | |
tag = line[:3] | |
if tag in TAGS: | |
if tag == 'H ' and len(heights) == 0: | |
heights = [int(j) for j in line.strip().split()[1:]] | |
elif tag == 'VCL': | |
speeds.append([float(j) for j in line.strip().split()[1:]]) | |
elif tag == 'DCL': | |
directions.append([float(j) for j in line.strip().split()[1:]]) | |
elif tag == 'SDR': | |
timestamps.append(int(line[4:16])) | |
# Check that everything is as it should be | |
assert(len(speeds) == len(directions) == len(timestamps)) | |
return heights, timestamps, speeds, directions | |
def timestamp_to_datetime(timestamp): | |
""" Convert Sodar timestamp to datetime.datetime object """ | |
t = str(timestamp) | |
year = int(t[:2]) | |
month = int(t[2:4]) | |
day = int(t[4:6]) | |
hour = int(t[6:8]) | |
minute = int(t[8:10]) | |
return datetime.datetime(year,month,day,hour,minute) | |
def name_to_datetime(name): | |
""" Generates a datetime.datetime object based on Sodar file name/date """ | |
year = 2000 # years aren't important in names, unless there are cross year data (i.e. midnight on Dec. 31) | |
if (os.path.isfile(name)): | |
date = filepath.split(os.sep).split('.')[0] | |
return datetime.datetime(year, int(date[0:2]), int(data[2:4])) | |
else: | |
return datetime.datetime(year, int(name[0:2]), int(name[2:4])) | |
def datetime_to_name(timestamp): | |
""" Reproduces a MMDD name for Sodar files from datetime.datetime object """ | |
mm = timestamp.month | |
if mm < 10: | |
mm = '0{mm}'.format(mm=mm) | |
else: | |
mm = str(mm) | |
dd = timestamp.day | |
if dd < 10: | |
dd = '0{dd}'.format(dd=dd) | |
else: | |
dd = str(dd) | |
return mm + dd | |
class _FakeSource(object): | |
""" A stand-in/filler class for representing missing data. | |
Name implies that this class is interior to the class, | |
and should therefore not be used outside of the class | |
""" | |
def __init__(self, name, heights, timestamps): | |
self.name = str(name) | |
self.heights = heights | |
self.data = np.empty((len(SODAR_FIELDS.keys()), MAX_TIMESTAMPS, len(self.heights))) | |
self.data.fill(NO_DATA) | |
self.timestamps = timestamps[:] | |
# adjust timestamps month and days to match the name | |
for i in range(len(self.timestamps)): | |
adjusted_timestamp = str(self.timestamps[i]) | |
adjusted_timestamp = adjusted_timestamp[0:2] + self.name + adjusted_timestamp[6:len(adjusted_timestamp)] | |
self.timestamps[i] = int(adjusted_timestamp) | |
class SodarCollection(object): | |
""" | |
A collection of Sodar records from a directory of data sources. | |
# ************* Accessing each timestamp ************** # | |
each entry in a band is a timestamp | |
each timestamp has values, starting from height 0 to height max_height | |
Timestamp values are stored in self.timestamps (raw values in self._timestamps), | |
and the index of the timestamp is used as the accessor into the larger dataset | |
I.e. values at index 0 are at height[0], and values at index[33] are at height[33] | |
--> data[band][timestamp_idx][value] | |
E.x. Accessing a row of data | |
Import everything | |
>>> from sodar_utils import * | |
build collection | |
>>> sodars = SodarCollection('path/to/sodar/station') | |
Get timestamp | |
>>> timestamp = timestamp_to_datetime(120312125500) | |
>>> timestamp_idx = sodars.timestamps.index(timestamp) | |
... or equivalently | |
>>> timestamp = 120312125500 | |
>>> timestamp_idx = sodars._timestamps.index(timestamp) | |
Now access the data | |
>>> row_data = sodars.dataset[0][timestamp_idx] | |
>>> print(row_data) | |
# or as a normal python list | |
>>> print(row_data.tolist()) | |
""" | |
def __init__(self, directory, make_db=False): | |
# build independent data sources | |
self.name = directory.split(os.sep)[-1] | |
#self.sources = list() | |
sources = list() | |
for root, dirs, files in os.walk(directory): # since our naming convention is mmdd, order of the files guarantees the order we have valid timestamp orders | |
for path in files: | |
if path.split('.')[-1] in ['sdr', 'SDR']: | |
sources.append(Sodar(os.path.join(root, path))) | |
assert(len(sources) > 0) # don't create collection if no sources were found | |
self.sources = list() | |
self._base_timestamp = sources[0].name | |
current_day = name_to_datetime(self._base_timestamp) | |
self.sources.append(sources[0]) | |
if len(sources) > 0: | |
for i in range(1, len(sources)): | |
next_day = name_to_datetime(sources[i].name) | |
delta = next_day - current_day | |
# Test if we need to insert sources for missing dates between actual sources | |
if delta.days > 1: | |
# dates missing, add fake sources up until next_day | |
fake_source_date = current_day + datetime.timedelta(days=1) | |
while fake_source_date < next_day: | |
fake_name = datetime_to_name(fake_source_date) | |
self.sources.append(_FakeSource(fake_name, | |
sources[0].heights, | |
sources[0].timestamps)) | |
fake_source_date += datetime.timedelta(days=1) | |
self.sources.append(sources[i]) | |
current_day = name_to_datetime(sources[i].name) | |
# now connect the data sources | |
self.dataset = None | |
self._timestamps = list() # _timestamps are the raw timestamps | |
self.heights = list() | |
for source in self.sources: | |
if self.dataset is None: | |
self.dataset = source.data | |
self.heights = source.heights # collect heights once, should be uniform accross all sources | |
else: | |
a = self.dataset | |
b = source.data | |
self.dataset = np.concatenate((a,b), axis=1) # we want to join along the time axis | |
self._timestamps += source.timestamps | |
self._day_index = [i*MAX_TIMESTAMPS for i in range(len(self.sources))] | |
self._night_index = [ | |
{ | |
'name': self.sources[i].name, | |
'start': self._day_index[i] + NIGHT_START_INDEX, | |
'stop': self._day_index[i] + NIGHT_STOP_INDEX | |
} for i in range(len(self.sources) - 1)] # -1 since we don't have data for the morning after the last timestamp | |
# Generate datetime.datetime objects for all the timestamps | |
self.timestamps = [timestamp_to_datetime(timestamp) for timestamp in self._timestamps] | |
# Assert that there exists exactly 5 minutes between each record across all sources, | |
# and that they are in the proper order | |
for i in range(1, len(self.timestamps)): | |
a = self.timestamps[i-1] | |
b = self.timestamps[i] - datetime.timedelta(minutes=5) | |
assert(a == b) | |
if make_db: | |
generate_db(os.path.join(directory, 'sodar_collection.db')) | |
def source_dates(self, use_all=False): | |
""" Return the dates for all the valid sources. Excludes non-data sources if use_all = False""" | |
return [source.name for source in self.sources if type(source) == Sodar or use_all] | |
def night_array(self, band, partial=False, use_all=False, select_nights=None): | |
""" Returns a 2-Tuple of a list of 2D arrays for all nights for the given band, and the accessor data associated with it as a dictionary | |
@param partial Flag whether to include nights where either the evening or morning readings are missing. | |
@param use_all Flag whether to include all nights, regardless of any other settings. | |
@param nights A list of nights to return. Ignores flags if parameter is a list and has valid night dates/names. | |
""" | |
# List that will contain our dictionaries | |
night_index = list() | |
# Logic for responding to parameters | |
if select_nights is not None and type(select_nights) is list: # Select only those nights asked for | |
for night in self._night_index: | |
for selected_night in select_nights: | |
if night['name'] == selected_night: | |
night_index.append(night) | |
break | |
elif use_all: # Get everything | |
night_index = self._night_index | |
else: | |
for i in range(1, len(self.sources)): # Choose every source where evening and morning are valid (partial) sources | |
if (type(self.sources[i-1]) == Sodar and type(self.sources[i]) == Sodar): | |
night_index.append(self._night_index[i-1]) | |
elif (partial and (type(self.sources[i-1]) is Sodar or type(self.sources[i]) is Sodar)): | |
night_index.append(self._night_index[i-1]) | |
if (len(night_index) == 0): | |
raise ValueError("No nights were available. True using 'partial' or 'use_all' set to 'True' to see if values exist at all.") | |
# Slice out the night data we want | |
band_idx = plural_names(SODAR_FIELDS)[band] | |
num_nights = len(night_index) | |
result = None | |
band_data = self.dataset[band_idx] | |
for night in night_index: | |
night_data = band_data[night['start']:night['stop']] | |
if result is None: | |
result = night_data | |
else: | |
result = np.concatenate((result, night_data), axis=0) | |
# reshape to size of night and return array and night_index | |
return (result.reshape(num_nights, NIGHT_SIZE, len(self.heights)),night_index) | |
def adapt_array(arr): | |
""" | |
http://stackoverflow.com/a/31312102/190597 (SoulNibbler) | |
Adapts a numpy array to fit into a single column entry. | |
""" | |
out = io.BytesIO() | |
np.save(out, arr) | |
out.seek(0) | |
return sqlite3.Binary(out.read()) | |
def convert_array(text): | |
""" Loads a numpy array from a bytes object """ | |
out = io.BytesIO(text) | |
out.seek(0) | |
return np.load(out) | |
def register_adapters(): | |
# Converts np.array to TEXT when inserting | |
sqlite3.register_adapter(np.ndarray, adapt_array) | |
# Converts TEXT to np.array when selecting | |
sqlite3.register_converter("array", convert_array) | |
register_adapters() | |
# Converts np.array to TEXT when inserting | |
#sqlite3.register_adapter(np.ndarray, adapt_array) | |
# Converts TEXT to np.array when selecting | |
#sqlite3.register_converter("array", convert_array) | |
def generate_db(sodars, path): | |
""" Generate an sqlite3 database from a collection """ | |
con = sqlite3.connect(path, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) | |
with con: | |
cur = con.cursor() | |
# we want to store the speeds, directions, datetime for the night | |
#cur.execute("CREATE TABLE s_collection (id integer primary key, name text)") # TODO - add a table for multiple collections | |
cur.execute("CREATE TABLE s_array (id integer primary key, night date, speeds array, directions array)") | |
speeds, _ = sodars.night_array('speeds') | |
dirs, meta = sodars.night_array('directions') | |
i = 0 | |
for night in meta: | |
start = night['start'] | |
stop = night['stop'] | |
night_date = sodars.timestamps[start].date() # builtin's are for date and time, not datetime. We only need the date anyway | |
night_speeds = speeds[i] | |
night_dirs = dirs[i] | |
cur.execute("INSERT INTO s_array VALUES (?,?,?)", (night_date, night_speeds, night_dirs)) | |
i += 1 | |
# test the db | |
with con: | |
try: | |
cur = con.cursor() | |
cur.execute("SELECT * from s_array") | |
cur.fetchone() | |
except sqlite3.Error: | |
print("An error occurred in fetching data from the db. Db is non-functional.") | |
def timestamp_to_index(timestamp): | |
""" Converts Sodar timestamps to dataset indices for a single Sodar source """ | |
time_str = str(timestamp) | |
return (int(time_str[6:8]) * 60 + int(time_str[8:10])) // 5 | |
class Sodar(object): | |
""" | |
A collection of Sodar records from a single source. | |
# ********************** NOTE *************************** # | |
-1 is used as the fill value for the datasets. | |
This accounts for having an uneven number of height points | |
across different weather conditions / events that cause | |
no-data gaps to occur | |
""" | |
def __init__(self, fp): | |
self._extract_bands(fp) | |
def _extract_bands(self, filepath): | |
""" | |
Pack SDR data into numpy arrays. | |
Builds a numpy array with two bands that holds velocity and direction | |
""" | |
self.name = filepath.split(os.sep)[-1].split('.')[0] | |
self.heights, self.timestamps, speeds, directions = read_sdr(filepath) | |
self.data = np.empty((len(SODAR_FIELDS.keys()), MAX_TIMESTAMPS, len(self.heights))) | |
self.data.fill(NO_DATA) # 0 is speeds, 1 is directions | |
for j in range(len(speeds)): | |
time_idx = timestamp_to_index(self.timestamps[j]) | |
for i in range(len(speeds[j])): | |
self.data[0][time_idx][i] = speeds[j][i] | |
self.data[1][time_idx][i] = directions[j][i] | |
# At this point we can discard and regenerate the timestamps since | |
# some can be missing, but now they are filled with NO_DATA for every 5 minute interval | |
if len(self.timestamps) != MAX_TIMESTAMPS: | |
# Reassign them based on file name and year | |
base_timestamp = str(self.timestamps[0])[:6] | |
date = base_timestamp[:6] | |
self.timestamps = list() | |
seconds = '00' # seconds are always 0 | |
for i in range(MAX_TIMESTAMPS): | |
hours, minutes = divmod(i*5, 60) | |
if hours < 10: | |
hours = '0' + str(hours) | |
else: | |
hours = str(hours) | |
if minutes < 10: | |
minutes = '0' + str(minutes) | |
else: | |
minutes = str(minutes) | |
self.timestamps.append(int(base_timestamp + hours + minutes + '00')) # seconds |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment