Skip to content

Instantly share code, notes, and snippets.

@nishitpatel01
Last active October 12, 2022 18:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nishitpatel01/9846a5d0fc7016b6ae6f354edef403d0 to your computer and use it in GitHub Desktop.
Save nishitpatel01/9846a5d0fc7016b6ae6f354edef403d0 to your computer and use it in GitHub Desktop.
main python code for TSI cloud function
import base64
import google.oauth2.id_token
import google.auth.transport.requests
import json
import os
import csv
import requests
import datetime
from datetime import date
import time
from oauth2client.client import GoogleCredentials
from google.auth.transport.requests import AuthorizedSession
from google.oauth2 import service_account
from google.cloud import bigquery
def detect_anomaly(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
#read incoming message from pubsub
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
#date reformatting
date = int(pubsub_message["attributes"]["time"])
date = datetime.datetime.fromtimestamp(date).strftime('%Y-%m-%d %H:%M:%S')
final_time = date.replace(" ","T") + "Z"
event_decoded = pubsub_message["attributes"]
#Build out JSON Payload for append
eventdict = {}
#extract event attributes
time = int(event_decoded["time"])
h2 = str(event_decoded["h2_raw"])
temperature = str(event_decoded["temperature"])
humidity = str(event_decoded["humidity"])
light = str(event_decoded["light"])
request_json["attributes"]["time"] = final_time
#create groupid for appending event
cols = (time, h2, humidity, light, temperature)
groupid = hash(cols)
eventdict["groupId"] = float(groupid)
eventdict["eventTime"] = event_decoded["time"]
dimensions = [
{"name": "measure", "stringVal": "LTTH"},
{"name": "humidity", "doubleVal": float(event_decoded["humidity"])},
{"name": "light", "doubleVal": float(event_decoded["light"])},
{"name": "h2", "doubleVal": float(event_decoded["h2"])},
{"name": "temperature", "doubleVal": float(event_decoded["temperature"])}
]
eventdict["dimensions"] = dimensions
eventdict["dataset"] = event_decoded["dataset"]
eventdict["metric"] = event_decoded["metric"]
eventdict["forecast_history"] = event_decoded["forecast_history"]
eventdict["granularity"] = event_decoded["granularity"]
eventdict["noise_threshold"] = event_decoded["noise_threshold"]
eventdict["anomaly_threshold"] = event_decoded["anomaly_threshold"]
#authenticate request to TSI api
credentials = service_account.Credentials.from_service_account_file('key.json')
scoped_credentials = credentials.with_scopes(['https://www.googleapis.com/auth/cloud-platform'])
authed_session = AuthorizedSession(scoped_credentials)
#setup endpoint
PROJECT_ID = "<YOUR-PROJECT-ID>" #REPLACE THIS WITH YOUR PROJECT ID
DATASET_ID = "<YOUR-BQ-DATASET-NAME>" #REPLACE THIS WITH YOUR BQ DATASET THAT CONTAINS THE ANOMALY RESULT TABLE
TABLE_ID = "<YOUR-BQ-TABLE-NAME>" #REPLACE THIS WITH BQ TABLE NAME
ts_endpoint = "https://timeseriesinsights.googleapis.com/v1/projects/{}/datasets".format(PROJECT_ID)
#Check if dataset exists
listdata = query_ts(method="GET", endpoint=ts_endpoint, data="", auth_session=authed_session)
count = 0 #used to check if dataset exists
for i in listdata.items():
#return(i)
print(i)
for j in i[1]:
if j["name"].rsplit('/',1)[1] == eventdict["dataset"]:
count = count + 1
print("found")
if count == 0:
print("no dataset found rip")
return
#retrieve attribute being test list and name of dataset
atts = eventdict['metric']
dataset_name = eventdict['dataset']
forecast_history = eventdict["forecast_history"]
granularity = eventdict["granularity"]
noise_threshold = eventdict["noise_threshold"]
anomaly_threshold = eventdict["anomaly_threshold"]
del eventdict["metric"] #this is done to make dict in form need for TSI API
del eventdict["dataset"]
del eventdict["forecast_history"]
del eventdict["granularity"]
del eventdict["noise_threshold"]
del eventdict["anomaly_threshold"]
#Append Event to dataset
print(f"Appending new events...")
timestr = str(eventdict['eventTime'])
eventdict["eventTime"] = timestr
url_endpoint = f'https://timeseriesinsights.googleapis.com/v1/projects/{PROJECT_ID}/datasets/{dataset_name}:appendEvents'
request_body = {
"events":[]
}
request_body['events'] = [eventdict]
res = query_ts('POST', url_endpoint, request_body, auth_session=authed_session)
print(f"New event has been successfully added to API dataset {dataset_name}")
#Run anomaly query on input time
detectionresults = {}
fulltime = str(eventdict['eventTime'])
for i in atts:
print(f"Querying for anomalies for metric {i}...")
request_body = {
"detectionTime": fulltime,
"slicingParams": {
"dimensionNames": ["measure"]
},
"timeseriesParams": {
"forecastHistory": forecast_history,
"granularity": granularity,
"metric": f"{i}"
},
"forecastParams": {
"noiseThreshold": noise_threshold,
"seasonalityHint": "DAILY"
},
"returnTimeseries": "true"
}
query_ds_endpt = f'https://timeseriesinsights.googleapis.com/v1/projects/{PROJECT_ID}/datasets/{dataset_name}:query'
res = query_ts(method="POST", endpoint=query_ds_endpt, data=request_body, auth_session=authed_session)
# bigquery streaming insert the anomaly result
# Construct a BigQuery client object.
anomaly_result = is_anomaly(res, anomaly_threshold)
client = bigquery.Client()
table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
rows_to_insert = [
{"Timestamp": time, "h2": h2, "temperature":temperature,"humidity":humidity,"light":light, "anomaly":int(anomaly_result)}
]
errors = client.insert_rows_json(table_id, rows_to_insert) # Make an API request.
if errors == []:
print("New rows have been added.")
else:
print("Encountered errors while inserting rows: {}".format(errors))
return(json.dumps(res, indent=3, sort_keys=True))
#API Method handler
def query_ts(method, endpoint, data, auth_session):
data = str(data)
#headers = {'Content-type': 'application/json', "Authorization": f"Bearer {auth_token}"}
if method == "GET":
resp = auth_session.request(method, endpoint)
if method == "POST":
resp = auth_session.request(method, endpoint, data=data)
if method == "DELETE":
resp = auth_session.request(method, endpoint)
return(resp.json())
#Anomaly result handler
def is_anomaly(response, anomaly_threshold):
out_key = 'slices'
in_key = 'anomalyScore'
if len(response) == 1 and 'name' in response:
anomaly = 0
elif response.get(out_key)[0].get(in_key):
if response[out_key][0][in_key] >= anomaly_threshold:
anomaly = 1
else:
anomaly = 0
else:
anomaly = 0
return anomaly
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment