This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| import json | |
| from datetime import datetime | |
| class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): | |
| def do_POST(self): | |
| print(f"\nNew POST received at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}. Processing...") | |
| content_length = int(self.headers["Content-Length"]) | |
| body = self.rfile.read(content_length) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| import ssl | |
| import json | |
| from datetime import datetime | |
| import logging | |
| logging.basicConfig(filename="webserver.log", level=logging.DEBUG) | |
| class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| JSON File Uploader | |
| 1. Receive JSON file from field device over HTTPS | |
| 2. Save raw JSON file to local file system, ./data/raw | |
| """ | |
| # import libraries | |
| import os | |
| from datetime import datetime |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def upload_data_to_azure(json_filename, json_data, transformed_filename, transformed_data): | |
| """Upload to Azure Blob Storage""" | |
| # set upload flags to False | |
| has_json_uploaded = False | |
| has_csvgz_uploaded = False | |
| # get connection string from .env file | |
| connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # upload data to azure | |
| has_json_uploaded, has_csvgz_uploaded = upload_data_to_azure(json_filename, json_data, transformed_filename, transformed_data) | |
| if not has_json_uploaded: | |
| # generate json path for failed uploads | |
| json_file_path_failed = f"{base_dir}\\data\\failed_upload\\raw\\{json_filename}" | |
| with open(json_file_path_failed, "w") as jsonfile: | |
| json.dump(json_data, jsonfile) | |
| if not has_csvgz_uploaded: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Function app to send data to Vendor | |
| - App is triggered when new raw JSON file is upload to storage account | |
| - File is read by the app | |
| - POST request created with JSON file as the body | |
| """ | |
| import logging | |
| import azure.functions as func | |
| import requests |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //create dimension table the Site and Equipment | |
| .create table meta_site_eqmt (Site: string, EqmtIp: string, ProductType: string, EqmtId: string, Active: bool) | |
| //create dimension table the System and Tag | |
| .create table meta_system_tag (System: string, Tag: string, EngUnit: string, Min: int, Max: int) | |
| //create link to external table for exporting: factOilData | |
| .create external table factOilData (TimeStamp: datetime, TagName: string, Value: real, Site: string, System: string, EqmtIp: string, Tag: string) | |
| kind=blob | |
| partition by (HH: datetime = bin(TimeStamp, 1h)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //one time export of dimension tables | |
| .export async to table dimSiteEqmt <| meta_site_eqmt | |
| .export async to table dimSystemTag <| meta_system_tag | |
| //continuous export of data | |
| .create-or-alter continuous-export ParquetExport over (raw_oil_data) to table factOilData | |
| with | |
| (intervalBetweenRuns=1h, sizeLimit=104857600) | |
| <| raw_oil_data | |
| | parse TagName with Site: string '.' System: string '.' EqmtIp: string '.' Tag: string |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def process_data_task(eqmt_ip: str, json_data: dict): | |
| """Process data tasks""" | |
| # get current time | |
| file_time = datetime.now() | |
| file_datetime = file_time.strftime("%Y-%m-%d_%H-%M-%S-%f") | |
| file_datetime_ts = file_time.strftime("%Y-%m-%d %H:%M:%S.%f") | |
| # generate json filename and path | |
| json_filename = f'data_{json_data["sn"]}_{file_datetime}.json' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def transform_data(eqmt_ip, file_datetime, json_body, file_datetime_ts): | |
| """Transform JSON data to columnar CSV""" | |
| # create the first row with the current timestamp and device serial number | |
| row = [ | |
| { | |
| "Timestamp": file_datetime_ts, | |
| "Tag": f"{site}.OilMon.{eqmt_ip.replace('.', '_')}.DvcSrlNmbr", | |
| "Value": json_body["sn"], | |
| } |
OlderNewer