Skip to content

Instantly share code, notes, and snippets.

@mschrader15
Created February 23, 2021 16:12
Show Gist options
  • Save mschrader15/89fd40b5d03dee2eabef7916c7d8a4d8 to your computer and use it in GitHub Desktop.
Save mschrader15/89fd40b5d03dee2eabef7916c7d8a4d8 to your computer and use it in GitHub Desktop.
Pull data from a COE sql database
"""
NOTES:
Solves the Microsoft ODBC Driver 17 for SQL Server : SSL Provider ssl_choose_client_version:unsupported protocol
https://askubuntu.com/questions/1284658/how-to-fix-microsoft-odbc-driver-17-for-sql-server-ssl-provider-ssl-choose-cli
"""
import csv
import os
import sys
from configparser import ConfigParser
from datetime import datetime, timedelta
import pyodbc
def generate_signal_id_list(traffic_light_ids):
if not isinstance(traffic_light_ids, list):
sys.exit("traffic_light_ids should be in list format. If only 1 ID, please enter like: ['63069006]")
if len(list(set(traffic_light_ids))) < len(traffic_light_ids):
sys.exit("not all of the requested IDs are unique. check for duplicates")
for ID in traffic_light_ids:
if len(ID) < 8:
sys.exit("traffic_light_ids should be 8 characters in length")
try:
idz = "('" + traffic_light_ids[0] + "'"
if len(traffic_light_ids) > 1:
for ID in traffic_light_ids[1:]:
idz += ", '{ID}'".format(ID=ID)
idz = idz + ')'
else:
idz = idz + ")"
except TypeError:
sys.exit('traffic_light_ids are not the right data type')
return idz
def check_dates(start_time, end_time, max_hour_difference):
if start_time is None or end_time is None:
sys.exit('Both start and end time need to be entered')
try:
start_datetime = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S.%f')
except ValueError:
sys.exit('start_time must be entered in this format: %Y-%m-%d %H:%M:%S.%f')
try:
end_datetime = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S.%f')
except ValueError:
sys.exit('end_time must be entered in this format: %Y-%m-%d %H:%M:%S.%f')
hours = check_time_delta(start_time, end_time)
if (hours > max_hour_difference) or (hours <= 0):
sys.exit('The difference between the start and end date must be > 0 & <= {limit} hours \n'
'It is currently {diff} hours'.
format(limit=max_hour_difference, diff=hours))
def to_datetime(date_string):
return datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S.%f')
def check_time_delta(start_time, end_time):
start_datetime = to_datetime(start_time)
end_datetime = to_datetime(end_time)
time_difference = end_datetime - start_datetime
return time_difference.total_seconds() / 3600
class Connection:
def __init__(self, login_file, time_offset=None):
self.MAX_HOUR_DIFFERENCE = 24
self._get_config(login_file)
self._make_connection()
self._data = self._clean_data()
self._time_offsets = time_offset
self._tl_popped = False
@staticmethod
def _clean_data():
return {'header': None, 'data': []}
def _get_config(self, login_file):
self._cp = ConfigParser()
self._cp.read(login_file)
def _make_connection(self):
self._db_conn = pyodbc.connect(f"Driver={self._cp['login']['Driver']};"
f"Server={self._cp['login']['Server']};"
f"Database={self._cp['login']['Database']};"
f"uid={self._cp['login']['uid']};"
f"pwd={self._cp['login']['pwd']};"
)
def _set_data(self, query, _offset_time=None):
cursor = self._db_conn.cursor()
cursor.execute(query)
self._data['header'] = [column[0] for column in cursor.description]
offset_obj = timedelta(hours=_offset_time) if _offset_time else None
for row in cursor:
if offset_obj:
row[1] = row[1] + offset_obj
self._data['data'].append(list(row))
@staticmethod
def _create_query(traffic_light_ids, start_time, end_time):
return (
"select *\n \
from Controller_Event_Log\n \
where SignalID in {ID}\n \
and Timestamp >= '{start_time}'\n \
and Timestamp < '{end_time}'\n \
order by Timestamp\n \
".format(ID=traffic_light_ids, start_time=start_time, end_time=end_time)
)
def _handle_corrections(self, start_time, end_time, weekends):
store_data = []
for tl, offset in self._time_offsets.items():
start_time_string = (start_time + timedelta(hours=-1 * offset)).strftime('%Y-%m-%d %H:%M:%S.%f')
end_time_string = (end_time + timedelta(hours=-1 * offset)).strftime('%Y-%m-%d %H:%M:%S.%f')
self.pull_data([tl], start_time_string, end_time_string, weekends, _offset=False, _iteration=1,
_offset_time=offset)
return store_data
def _pull_long_data(self, time_delta, traffic_light_ids, start_time, end_time, skip_days=(), weekends=False):
import time
import random
for multiplier in range(int(time_delta / self.MAX_HOUR_DIFFERENCE)):
time.sleep(random.randint(2, 5))
start_day_local = to_datetime(start_time) + timedelta(hours=multiplier * self.MAX_HOUR_DIFFERENCE)
end_day_local = to_datetime(start_time) + timedelta(hours=(multiplier + 1) * self.MAX_HOUR_DIFFERENCE)
if (start_day_local.weekday() < 5 or weekends) and (
start_day_local.strftime('%Y-%m-%d') not in [day.strftime('%Y-%m-%d') for day in skip_days]):
end_day_local_string = end_time if end_day_local > to_datetime(end_time) else end_day_local.strftime(
'%Y-%m-%d %H:%M:%S.%f')
start_day_local_string = start_day_local.strftime('%Y-%m-%d %H:%M:%S.%f')
print("Pulling: ", start_day_local, end_day_local)
self.pull_data(traffic_light_ids, start_day_local_string, end_day_local_string, weekends,
_offset=False, _iteration=1)
if self._time_offsets:
self._handle_corrections(start_time=start_day_local,
end_time=to_datetime(
end_time) if end_day_local > to_datetime(
end_time) else end_day_local,
weekends=weekends)
def pull_data(self, traffic_light_ids, start_time, end_time, skip_days=(), weekends=False, _offset=True, _offset_time=None, _iteration=0):
if _iteration < 1:
self._data = self._clean_data()
self._tl_popped = False
traffic_light_ids = self._pop_tl_w_offset(traffic_light_ids.copy())
time_delta = check_time_delta(start_time, end_time)
if time_delta > self.MAX_HOUR_DIFFERENCE:
return self._pull_long_data(time_delta, traffic_light_ids, start_time, end_time, skip_days, weekends)
elif time_delta <= 0:
sys.exit('end time is greater than start time')
query = self._create_query(traffic_light_ids=generate_signal_id_list(traffic_light_ids), start_time=start_time,
end_time=end_time)
self._set_data(query, _offset_time)
if self._time_offsets and _offset:
self._handle_corrections(to_datetime(start_time), to_datetime(end_time), weekends=weekends)
def _pop_tl_w_offset(self, traffic_light_ids):
if self._time_offsets and not self._tl_popped:
for tl, offset in self._time_offsets.items():
traffic_light_ids.pop([i for i, tl_interior in enumerate(traffic_light_ids) if tl_interior == tl][0])
self._tl_popped = True
return traffic_light_ids
def save_data(self, file_path):
extension = os.path.split(file_path)[-1].split('.')[-1]
if extension in 'csv':
with open(file_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile, delimiter=',')
writer.writerow(self._data['header'])
writer.writerows(self._data['data'])
# elif extension in '.h5':
# import h5py
# hf = h5py.File(file_path)
# hf.create_dataset()
elif extension in 'pkl':
import pickle
with open(file_path, 'wb') as f:
pickle.dump(self._data, f)
def get_data(self):
return self._data
if __name__ == "__main__":
conn = Connection('/home/max/SUMO/airport-harper-sumo/assets/sqlLogin.ini')
conn.pull_data(['63082002', '63082003', '63082004'], '2020-02-12 20:00:00.000', '2020-02-13 04:00:00.000')
conn.save_data('output.csv')
@mschrader15
Copy link
Author

mschrader15 commented Feb 23, 2021

This file requires an additional file that contains the database login information. I pass this to the Connection class in line 189.

I can share that privately

@mschrader15
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment