Skip to content

Instantly share code, notes, and snippets.

View h3xagn's full-sized avatar

h3xagn h3xagn

View GitHub Profile
@h3xagn
h3xagn / built-elt-part1a-json-server.py
Last active March 21, 2022 09:22
Build ETL from device to cloud: https://h3xagn.com
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
from datetime import datetime
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
print(f"\nNew POST received at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}. Processing...")
content_length = int(self.headers["Content-Length"])
body = self.rfile.read(content_length)
@h3xagn
h3xagn / built-elt-part1b-json-server.py
Last active March 21, 2022 09:22
Build ETL from device to cloud: https://h3xagn.com
from http.server import HTTPServer, BaseHTTPRequestHandler
import ssl
import json
from datetime import datetime
import logging
logging.basicConfig(filename="webserver.log", level=logging.DEBUG)
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
@h3xagn
h3xagn / build-etl-part2-fastapi.py
Last active April 2, 2022 14:02
Build ETL from device to cloud: https://h3xagn.com
"""
JSON File Uploader
1. Receive JSON file from field device over HTTPS
2. Save raw JSON file to local file system, ./data/raw
"""
# import libraries
import os
from datetime import datetime
@h3xagn
h3xagn / build-etl-part4-blob.py
Created April 24, 2022 08:28
Build ETL from device to cloud: https://h3xagn.com
def upload_data_to_azure(json_filename, json_data, transformed_filename, transformed_data):
"""Upload to Azure Blob Storage"""
# set upload flags to False
has_json_uploaded = False
has_csvgz_uploaded = False
# get connection string from .env file
connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
@h3xagn
h3xagn / build-etl-part4-process.py
Created April 24, 2022 08:30
Build ETL from device to cloud: https://h3xagn.com
# upload data to azure
has_json_uploaded, has_csvgz_uploaded = upload_data_to_azure(json_filename, json_data, transformed_filename, transformed_data)
if not has_json_uploaded:
# generate json path for failed uploads
json_file_path_failed = f"{base_dir}\\data\\failed_upload\\raw\\{json_filename}"
with open(json_file_path_failed, "w") as jsonfile:
json.dump(json_data, jsonfile)
if not has_csvgz_uploaded:
@h3xagn
h3xagn / build-etl-part5-main.py
Created April 24, 2022 11:22
Build ETL from device to cloud: https://h3xagn.com
"""
Function app to send data to Vendor
- App is triggered when new raw JSON file is upload to storage account
- File is read by the app
- POST request created with JSON file as the body
"""
import logging
import azure.functions as func
import requests
@h3xagn
h3xagn / build-etl-part6-ext-table.py
Created May 7, 2022 12:50
Build ETL from device to cloud: https://h3xagn.com
//create dimension table the Site and Equipment
.create table meta_site_eqmt (Site: string, EqmtIp: string, ProductType: string, EqmtId: string, Active: bool)
//create dimension table the System and Tag
.create table meta_system_tag (System: string, Tag: string, EngUnit: string, Min: int, Max: int)
//create link to external table for exporting: factOilData
.create external table factOilData (TimeStamp: datetime, TagName: string, Value: real, Site: string, System: string, EqmtIp: string, Tag: string)
kind=blob
partition by (HH: datetime = bin(TimeStamp, 1h))
@h3xagn
h3xagn / build-etl-part6-exports.py
Created May 7, 2022 12:51
Build ETL from device to cloud: https://h3xagn.com
//one time export of dimension tables
.export async to table dimSiteEqmt <| meta_site_eqmt
.export async to table dimSystemTag <| meta_system_tag
//continuous export of data
.create-or-alter continuous-export ParquetExport over (raw_oil_data) to table factOilData
with
(intervalBetweenRuns=1h, sizeLimit=104857600)
<| raw_oil_data
| parse TagName with Site: string '.' System: string '.' EqmtIp: string '.' Tag: string
@h3xagn
h3xagn / build-etl-part3-process.py
Created May 7, 2022 14:37
Build ETL from device to cloud: https://h3xagn.com
def process_data_task(eqmt_ip: str, json_data: dict):
"""Process data tasks"""
# get current time
file_time = datetime.now()
file_datetime = file_time.strftime("%Y-%m-%d_%H-%M-%S-%f")
file_datetime_ts = file_time.strftime("%Y-%m-%d %H:%M:%S.%f")
# generate json filename and path
json_filename = f'data_{json_data["sn"]}_{file_datetime}.json'
@h3xagn
h3xagn / build-etl-part3-transform.py
Created May 7, 2022 14:39
Build ETL from device to cloud: https://h3xagn.com
def transform_data(eqmt_ip, file_datetime, json_body, file_datetime_ts):
"""Transform JSON data to columnar CSV"""
# create the first row with the current timestamp and device serial number
row = [
{
"Timestamp": file_datetime_ts,
"Tag": f"{site}.OilMon.{eqmt_ip.replace('.', '_')}.DvcSrlNmbr",
"Value": json_body["sn"],
}