Last active
October 12, 2022 18:04
-
-
Save nishitpatel01/9846a5d0fc7016b6ae6f354edef403d0 to your computer and use it in GitHub Desktop.
main python code for TSI cloud function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import google.oauth2.id_token | |
import google.auth.transport.requests | |
import json | |
import os | |
import csv | |
import requests | |
import datetime | |
from datetime import date | |
import time | |
from oauth2client.client import GoogleCredentials | |
from google.auth.transport.requests import AuthorizedSession | |
from google.oauth2 import service_account | |
from google.cloud import bigquery | |
def detect_anomaly(event, context): | |
"""Triggered from a message on a Cloud Pub/Sub topic. | |
Args: | |
event (dict): Event payload. | |
context (google.cloud.functions.Context): Metadata for the event. | |
""" | |
#read incoming message from pubsub | |
pubsub_message = base64.b64decode(event['data']).decode('utf-8') | |
#date reformatting | |
date = int(pubsub_message["attributes"]["time"]) | |
date = datetime.datetime.fromtimestamp(date).strftime('%Y-%m-%d %H:%M:%S') | |
final_time = date.replace(" ","T") + "Z" | |
event_decoded = pubsub_message["attributes"] | |
#Build out JSON Payload for append | |
eventdict = {} | |
#extract event attributes | |
time = int(event_decoded["time"]) | |
h2 = str(event_decoded["h2_raw"]) | |
temperature = str(event_decoded["temperature"]) | |
humidity = str(event_decoded["humidity"]) | |
light = str(event_decoded["light"]) | |
request_json["attributes"]["time"] = final_time | |
#create groupid for appending event | |
cols = (time, h2, humidity, light, temperature) | |
groupid = hash(cols) | |
eventdict["groupId"] = float(groupid) | |
eventdict["eventTime"] = event_decoded["time"] | |
dimensions = [ | |
{"name": "measure", "stringVal": "LTTH"}, | |
{"name": "humidity", "doubleVal": float(event_decoded["humidity"])}, | |
{"name": "light", "doubleVal": float(event_decoded["light"])}, | |
{"name": "h2", "doubleVal": float(event_decoded["h2"])}, | |
{"name": "temperature", "doubleVal": float(event_decoded["temperature"])} | |
] | |
eventdict["dimensions"] = dimensions | |
eventdict["dataset"] = event_decoded["dataset"] | |
eventdict["metric"] = event_decoded["metric"] | |
eventdict["forecast_history"] = event_decoded["forecast_history"] | |
eventdict["granularity"] = event_decoded["granularity"] | |
eventdict["noise_threshold"] = event_decoded["noise_threshold"] | |
eventdict["anomaly_threshold"] = event_decoded["anomaly_threshold"] | |
#authenticate request to TSI api | |
credentials = service_account.Credentials.from_service_account_file('key.json') | |
scoped_credentials = credentials.with_scopes(['https://www.googleapis.com/auth/cloud-platform']) | |
authed_session = AuthorizedSession(scoped_credentials) | |
#setup endpoint | |
PROJECT_ID = "<YOUR-PROJECT-ID>" #REPLACE THIS WITH YOUR PROJECT ID | |
DATASET_ID = "<YOUR-BQ-DATASET-NAME>" #REPLACE THIS WITH YOUR BQ DATASET THAT CONTAINS THE ANOMALY RESULT TABLE | |
TABLE_ID = "<YOUR-BQ-TABLE-NAME>" #REPLACE THIS WITH BQ TABLE NAME | |
ts_endpoint = "https://timeseriesinsights.googleapis.com/v1/projects/{}/datasets".format(PROJECT_ID) | |
#Check if dataset exists | |
listdata = query_ts(method="GET", endpoint=ts_endpoint, data="", auth_session=authed_session) | |
count = 0 #used to check if dataset exists | |
for i in listdata.items(): | |
#return(i) | |
print(i) | |
for j in i[1]: | |
if j["name"].rsplit('/',1)[1] == eventdict["dataset"]: | |
count = count + 1 | |
print("found") | |
if count == 0: | |
print("no dataset found rip") | |
return | |
#retrieve attribute being test list and name of dataset | |
atts = eventdict['metric'] | |
dataset_name = eventdict['dataset'] | |
forecast_history = eventdict["forecast_history"] | |
granularity = eventdict["granularity"] | |
noise_threshold = eventdict["noise_threshold"] | |
anomaly_threshold = eventdict["anomaly_threshold"] | |
del eventdict["metric"] #this is done to make dict in form need for TSI API | |
del eventdict["dataset"] | |
del eventdict["forecast_history"] | |
del eventdict["granularity"] | |
del eventdict["noise_threshold"] | |
del eventdict["anomaly_threshold"] | |
#Append Event to dataset | |
print(f"Appending new events...") | |
timestr = str(eventdict['eventTime']) | |
eventdict["eventTime"] = timestr | |
url_endpoint = f'https://timeseriesinsights.googleapis.com/v1/projects/{PROJECT_ID}/datasets/{dataset_name}:appendEvents' | |
request_body = { | |
"events":[] | |
} | |
request_body['events'] = [eventdict] | |
res = query_ts('POST', url_endpoint, request_body, auth_session=authed_session) | |
print(f"New event has been successfully added to API dataset {dataset_name}") | |
#Run anomaly query on input time | |
detectionresults = {} | |
fulltime = str(eventdict['eventTime']) | |
for i in atts: | |
print(f"Querying for anomalies for metric {i}...") | |
request_body = { | |
"detectionTime": fulltime, | |
"slicingParams": { | |
"dimensionNames": ["measure"] | |
}, | |
"timeseriesParams": { | |
"forecastHistory": forecast_history, | |
"granularity": granularity, | |
"metric": f"{i}" | |
}, | |
"forecastParams": { | |
"noiseThreshold": noise_threshold, | |
"seasonalityHint": "DAILY" | |
}, | |
"returnTimeseries": "true" | |
} | |
query_ds_endpt = f'https://timeseriesinsights.googleapis.com/v1/projects/{PROJECT_ID}/datasets/{dataset_name}:query' | |
res = query_ts(method="POST", endpoint=query_ds_endpt, data=request_body, auth_session=authed_session) | |
# bigquery streaming insert the anomaly result | |
# Construct a BigQuery client object. | |
anomaly_result = is_anomaly(res, anomaly_threshold) | |
client = bigquery.Client() | |
table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" | |
rows_to_insert = [ | |
{"Timestamp": time, "h2": h2, "temperature":temperature,"humidity":humidity,"light":light, "anomaly":int(anomaly_result)} | |
] | |
errors = client.insert_rows_json(table_id, rows_to_insert) # Make an API request. | |
if errors == []: | |
print("New rows have been added.") | |
else: | |
print("Encountered errors while inserting rows: {}".format(errors)) | |
return(json.dumps(res, indent=3, sort_keys=True)) | |
#API Method handler | |
def query_ts(method, endpoint, data, auth_session): | |
data = str(data) | |
#headers = {'Content-type': 'application/json', "Authorization": f"Bearer {auth_token}"} | |
if method == "GET": | |
resp = auth_session.request(method, endpoint) | |
if method == "POST": | |
resp = auth_session.request(method, endpoint, data=data) | |
if method == "DELETE": | |
resp = auth_session.request(method, endpoint) | |
return(resp.json()) | |
#Anomaly result handler | |
def is_anomaly(response, anomaly_threshold): | |
out_key = 'slices' | |
in_key = 'anomalyScore' | |
if len(response) == 1 and 'name' in response: | |
anomaly = 0 | |
elif response.get(out_key)[0].get(in_key): | |
if response[out_key][0][in_key] >= anomaly_threshold: | |
anomaly = 1 | |
else: | |
anomaly = 0 | |
else: | |
anomaly = 0 | |
return anomaly |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment