Skip to content

Instantly share code, notes, and snippets.

@vldplcd
Created January 19, 2021 17:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vldplcd/9f7568af10df1a715b38c0c99f188cba to your computer and use it in GitHub Desktop.
Save vldplcd/9f7568af10df1a715b38c0c99f188cba to your computer and use it in GitHub Desktop.
Код функции записи данных из IoT Core в PostgreSQL
import os
import logging
import psycopg2
import psycopg2.errors
import datetime as dt
import json
import base64
logger = logging.getLogger()
logger.setLevel(logging.INFO)
verboseLogging = eval(os.environ['VERBOSE_LOG']) ## Convert to bool
if verboseLogging:
logger.info('Loading msgHandler function')
def getConnString():
"""
Extract env variables to connect to DB and return a db string
Raise an error if the env variables are not set
:return: string
"""
db_hostname = os.environ['DB_HOSTNAME']
db_port = os.environ['DB_PORT']
db_name = os.environ['DB_NAME']
db_user = os.environ['DB_USER']
db_password = os.environ['DB_PASSWORD']
db_connection_string = f"host={db_hostname} port={db_port} dbname={db_name} user={db_user} password={db_password} sslmode=require"
return db_connection_string
def makeInsertStatement(event_ts, payload_json, table_name):
msg = json.loads(payload_json)
msg_str = json.dumps(msg)
logger.info(msg_str)
insert= f"""INSERT INTO {table_name} (telemetry_timestamp , device_nm , payload) VALUES ('{event_ts}','iot_test_device', '{msg_str}')"""
return insert
def makeCreateTableStatement(table_name):
statement = f"""CREATE TABLE {table_name} (
( telemetry_timestamp timestamp,
device_nm varchar(200),
payload varchar(2000)
);
"""
return statement
"""
Entry-point for Serverless Function.
:param event: IoT message payload.
:param context: information about current execution context.
:return: sucessfull response statusCode: 200
"""
def msgHandler(event, context):
statusCode = 500 ## Error response by default
if verboseLogging:
logger.info(event)
logger.info(context)
connection_string = getConnString()
if verboseLogging:
logger.info(f'Connecting: {connection_string}')
conn = psycopg2.connect(connection_string)
cursor = conn.cursor()
msg_payload = json.dumps(event["messages"][0])
json_msg = json.loads(msg_payload)
event_payload = base64.b64decode(json_msg["details"]["payload"])
if verboseLogging:
logger.info(f'Event: {event_payload}')
event_ts = json_msg["event_metadata"]["created_at"]
table_name = 'telemetry_hist'
sql = makeInsertStatement(event_ts, event_payload, table_name)
if verboseLogging:
logger.info(f'Exec: {sql}')
try:
cursor.execute(sql)
statusCode = 200
except psycopg2.errors.UndefinedTable as error: ## table not exist - create and repeate insert
conn.rollback()
logger.error( error)
createTable = makeCreateTableStatement(table_name)
cursor.execute(createTable)
conn.commit()
cursor.execute(sql)
statusCode = 200
except Exception as error:
logger.error( error)
conn.commit() # <- We MUST commit to reflect the inserted data
cursor.close()
conn.close()
return {
'statusCode': statusCode,
'headers': {
'Content-Type': 'text/plain'
},
'isBase64Encoded': False
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment