Skip to content

Instantly share code, notes, and snippets.

@PreedhiVivek
Last active March 18, 2024 16:37
Show Gist options
  • Save PreedhiVivek/5ddc71269108b32fc51dc4b0826798b5 to your computer and use it in GitHub Desktop.
Save PreedhiVivek/5ddc71269108b32fc51dc4b0826798b5 to your computer and use it in GitHub Desktop.
[Insights app] Functions common across the Insights app
"""
Script to contain the common functions, used across different microservices, in the Insights app
"""
import os
from datetime import datetime
from dateutil.relativedelta import relativedelta, MO, FR
import requests
from neo4j import GraphDatabase
def initialize_db_driver():
"Initializes and returns the Neo4j driver object"
try:
driver_obj = GraphDatabase.driver(os.environ.get("SURVEY_DB_URI", None),
auth=(os.environ.get("DB_USERNAME", None),
os.environ.get("DB_PASSWORD", None)))
except Exception as error:
print(f"Exception: {error}")
raise Exception("Unable to initialize a Neo4j driver!") from error
return driver_obj
def get_dates():
"Returns last Monday and Friday dates"
today = datetime.today()
last_monday = today + relativedelta(weekday = MO(-1))
last_friday = today + relativedelta(weekday = FR(-1))
return last_monday.strftime('%Y-%m-%d'), last_friday.strftime('%Y-%m-%d')
def get_last_month_start_end_dates():
"Returns previous month's start and end date"
today = datetime.today()
start_date = today + relativedelta(months = -1, day = 1)
end_date = today + relativedelta(months = -1, day = 31)
month_year = start_date.strftime("%B") + " " + start_date.strftime("%Y")
return start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'), month_year
def report_results(insights_text):
"Report the insights fetched on Skype"
try:
headers = {'Content-Type': 'application/json'}
payload = {"msg" : insights_text,
"channel": os.environ.get("SKYPE_CHANNEL_ID", None),
"API_KEY": os.environ.get("SKYPE_API_KEY", None)}
response = requests.post(url=os.environ.get("SKYPE_SENDER_URL", None),
json=payload, headers=headers,
timeout = 10)
if response.status_code == 200:
print(f'Successfully sent insights - {insights_text} to Skype!')
else:
print('Failed to send Skype message!')
except Exception as err:
raise Exception(f'Unable to post message to Skype channel, due to {err}') from err
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment